前回、GenServer の渋滞を解消するために Task を利用する方法を紹介しました。
blog.emattsan.org
これは Task のプロセスが異常終了しないことを前提にしていて、異常終了が予想されるばあいには、その対策を施しておく必要があります。
結論から言うと Task.Supervisor.async_nolink/3
を利用するとよいようです。
詳細は Task や Task.Supervisor に記載されていますので、そちらを参照してみてください。
以下は、GenServer の中で Task プロセスが異常終了したときのふるまいを検証した記録です。
対策しない
まず、対策しなかったときのふるまいを確認します。
コード
do_something/0
を呼ぶと、handle_call/3
の中で Task のプロセスを起動します。
Task のプロセスは起動するとすぐに例外を送出します。
defmodule MyApp.Worker1 do
use GenServer
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def do_something do
GenServer.call(__MODULE__, :do_something)
end
@impl true
def init() do
{:ok, %{}}
end
@impl true
def handle_call(:do_something, , state) do
Task.async(fn ->
raise "Boom!"
end)
{:noreply, state}
end
@impl true
def handle_info(msg, state) do
dbg msg
{:noreply, state}
end
end
実行
iex -S mix
で起動し、実行します。
iex(1)> {:ok, pid} = MyApp.Worker1.start_link()
{:ok, #PID<0.167.0>}
iex(2)> Process.info(pid)
[
registered_name: MyApp.Worker1,
...略...
]
iex(3)> MyApp.Worker1.do_something()
11:32:48.607 [error] Task #PID<0.168.0> started from MyApp.Worker1 terminating
** (RuntimeError) Boom!
(my_app 0.1.0) lib/my_app/worker_1.ex:20: anonymous fn/0 in MyApp.Worker1.handle_call/3
(elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
(elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4
Function: #Function<0.95600963/0 in MyApp.Worker1.handle_call/3>
Args: []
** (EXIT from #PID<0.166.0>) shell process exited with reason: an exception was raised:
** (RuntimeError) Boom!
(my_app 0.1.0) lib/my_app/worker_1.ex:20: anonymous fn/0 in MyApp.Worker1.handle_call/3
(elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
(elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4
Interactive Elixir (1.15.6) - press Ctrl+C to exit (type h() ENTER for help)
タスクとリンクしている MyApp.Worker1
のプロセスが終了し、そのプロセスにリンクしている iex のプロセスも終了して iex が再起動していることがわかります。
EXIT をトラップする
trap_exit
フラグを true
にして EXIT
をトラップする方法があります。
しかし一律でトラップしてしまうため、予想できない影響が出ることも考えられます。
Task.async/1
のドキュメントにも注意を促す但書がついています。
- Setting
:trap_exit
to true
- trapping exits should be used only in special circumstances as it would make your process immune to not only exits from the task but from any other processes.
コード
init/1
で Process.flag(:trap_exit, true)
を実行し EXIT のトラップを指定しています。
defmodule MyApp.Worker2 do
use GenServer
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def do_something do
GenServer.call(__MODULE__, :do_something)
end
@impl true
def init() do
Process.flag(:trap_exit, true)
{:ok, %{}}
end
@impl true
def handle_call(:do_something, , state) do
Task.async(fn ->
raise "Boom!"
end)
{:noreply, state}
end
@impl true
def handle_info(msg, state) do
dbg msg
{:noreply, state}
end
end
実行
タスクのプロセスで例外が送出されたあと、GenServer のプロセスが :DOWN
以外に :EXIT
のメッセージを受け取っているのがわかります。
iex(1)> {:ok, pid} = MyApp.Worker2.start_link()
{:ok, #PID<0.143.0>}
iex(2)> Process.info(pid)
[
registered_name: MyApp.Worker2,
...略...
]
iex(3)> MyApp.Worker2.do_something()
11:34:22.007 [error] Task #PID<0.144.0> started from MyApp.Worker2 terminating
** (RuntimeError) Boom!
(my_app 0.1.0) lib/my_app/worker_2.ex:21: anonymous fn/0 in MyApp.Worker2.handle_call/3
(elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
(elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4
Function: #Function<0.111817085/0 in MyApp.Worker2.handle_call/3>
Args: []
[lib/my_app/worker_2.ex:28: MyApp.Worker2.handle_info/2]
msg #=> {:EXIT, #PID<0.144.0>,
{%RuntimeError{message: "Boom!"},
[
{MyApp.Worker2, :"-handle_call/3-fun-0-", 0,
[
file: ~c"lib/my_app/worker_2.ex",
line: 21,
error_info: %{module: Exception}
]},
{Task.Supervised, :invoke_mfa, 2,
[file: ~c"lib/task/supervised.ex", line: 101]},
{Task.Supervised, :reply, 4, [file: ~c"lib/task/supervised.ex", line: 36]}
]}}
[lib/my_app/worker_2.ex:28: MyApp.Worker2.handle_info/2]
msg #=> {:DOWN, #Reference<0.0.18307.2349639668.3769958402.28027>, :process,
#PID<0.144.0>,
{%RuntimeError{message: "Boom!"},
[
{MyApp.Worker2, :"-handle_call/3-fun-0-", 0,
[
file: ~c"lib/my_app/worker_2.ex",
line: 21,
error_info: %{module: Exception}
]},
{Task.Supervised, :invoke_mfa, 2,
[file: ~c"lib/task/supervised.ex", line: 101]},
{Task.Supervised, :reply, 4, [file: ~c"lib/task/supervised.ex", line: 36]}
]}}
なおこのコードでは call/3
に対して reply を返していないために 5 秒ごにタイムアウトの例外が発生しますが、その点は割愛します。
対処として、タスクのプロセス終了時に GenServer.reply/2
を使って応答を返す方法を前回のブログで説明していますので、そちらも参照してみてください。
リンクしない
EXIT とトラップしないばあいに、タスクのプロセスの異常終了につられて GenServer のプロセスも異常終了するのはリンクしているためです。
Task.Supervisor.async_nolink/3
を利用すると、タスクのプロセスの管理は Task.Supervisor にまかせ、プロセスをリンクせずにタスクを利用することが可能になります。
コード
defmodule MyApp.Worker3 do
use GenServer
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def do_something do
GenServer.call(__MODULE__, :do_something)
end
@impl true
def init() do
{:ok, %{}}
end
@impl true
def handle_call(:do_something, , state) do
Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn ->
raise "Boom!"
end)
{:noreply, state}
end
@impl true
def handle_info(msg, state) do
dbg msg
{:noreply, state}
end
end
先に Task.Supervisor のプロセスを起動しておく必要があるので、MyApp.Application を追加しプロセスを起動する設定を記述しておきます。
defmodule MyApp.Application do
@moduledoc false
use Application
@impl true
def start(, ) do
children = [
{Task.Supervisor, name: MyApp.TaskSupervisor}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
--sup
オプションなしでプロジェクトを作成したばあいは、mix.exs
の application/0
に mod: {MyApp.Application, []}
を追加することを忘れないでください。
defmodule MyApp.MixProject do
use Mix.Project
...略...
def application do
[
extra_applications: [:logger],
mod: {MyApp.Application, []}
]
end
...略...
end
実行
タスクのプロセスで例外が送出されたあと、:DOWN
のみ受け取っていることがわかります。
iex(1)> {:ok, pid} = MyApp.Worker3.start_link()
{:ok, #PID<0.143.0>}
iex(2)> Process.info(pid)
[
registered_name: MyApp.Worker3,
...略...
]
iex(3)> MyApp.Worker3.do_something()
11:36:07.211 [error] Task #PID<0.144.0> started from MyApp.Worker3 terminating
** (RuntimeError) Boom!
(my_app 0.1.0) lib/my_app/worker_3.ex:20: anonymous fn/0 in MyApp.Worker3.handle_call/3
(elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
(elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4
Function: #Function<0.16324509/0 in MyApp.Worker3.handle_call/3>
Args: []
[lib/my_app/worker_3.ex:27: MyApp.Worker3.handle_info/2]
msg #=> {:DOWN, #Reference<0.0.18307.1968646231.2159083530.717>, :process,
#PID<0.144.0>,
{%RuntimeError{message: "Boom!"},
[
{MyApp.Worker3, :"-handle_call/3-fun-0-", 0,
[
file: ~c"lib/my_app/worker_3.ex",
line: 20,
error_info: %{module: Exception}
]},
{Task.Supervised, :invoke_mfa, 2,
[file: ~c"lib/task/supervised.ex", line: 101]},
{Task.Supervised, :reply, 4, [file: ~c"lib/task/supervised.ex", line: 36]}
]}}
Scalability and partitioning
なおドキュメントにあるように、Task.Supervisor
はシングルプロセスのため、それがボトルネックになる可能性があるとのこと。
それを対処するために PartitionSupervisor
を利用する例が記載されています。
実装してみます。
MyApp.Application で PartitionSupervisor
のプロセスを起動するように変更します。
defmodule MyApp.Application do
@moduledoc false
use Application
@impl true
def start(, ) do
children = [
{PartitionSupervisor, child_spec: Task.Supervisor, name: MyApp.TaskSupervisors}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
handle_call/3
でタスクのプロセスを起動しているコードも、PartitionSupervisor
を指定して起動するように変更します。
def handle_call(:do_something, , state) do
Task.Supervisor.async_nolink({:via, PartitionSupervisor, {MyApp.TaskSupervisors, self()}}, fn ->
raise "Boom!"
end)
{:noreply, state}
end
書くのも読むのも大変になってきたので、モジュールを追加してコードを移動します。
MyApp.TaskSupervisor
を追加して、タスクのプロセスを起動する関数を用意します。
defmodule MyApp.TaskSupervisor do
def async_nolink(fun, options \\ []) do
Task.Supervisor.async_nolink(
{:via, PartitionSupervisor, {MyApp.TaskSupervisors, self()}},
fun,
options
)
end
end
この関数を使って handle_call/3
を書き換えます。
def handle_call(:do_something, , state) do
MyApp.TaskSupervisor.async_nolink(fn ->
raise "Boom!"
end)
{:noreply, state}
end
これで見やすくなりました。