前回、GenServer の渋滞を解消するために Task を利用する方法を紹介しました。
これは Task のプロセスが異常終了しないことを前提にしていて、異常終了が予想されるばあいには、その対策を施しておく必要があります。
結論から言うと Task.Supervisor.async_nolink/3 を利用するとよいようです。
詳細は Task や Task.Supervisor に記載されていますので、そちらを参照してみてください。
以下は、GenServer の中で Task プロセスが異常終了したときのふるまいを検証した記録です。
対策しない
まず、対策しなかったときのふるまいを確認します。
コード
do_something/0 を呼ぶと、handle_call/3 の中で Task のプロセスを起動します。
Task のプロセスは起動するとすぐに例外を送出します。
defmodule MyApp.Worker1 do use GenServer def start_link(opts \\ []) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end def do_something do GenServer.call(__MODULE__, :do_something) end @impl true def init(_opts) do {:ok, %{}} end @impl true def handle_call(:do_something, _from, state) do Task.async(fn -> raise "Boom!" end) {:noreply, state} end @impl true def handle_info(msg, state) do dbg msg {:noreply, state} end end
実行
iex -S mix で起動し、実行します。
iex(1)> {:ok, pid} = MyApp.Worker1.start_link()
{:ok, #PID<0.167.0>}
iex(2)> Process.info(pid)
[
registered_name: MyApp.Worker1,
...略...
]
iex(3)> MyApp.Worker1.do_something()
11:32:48.607 [error] Task #PID<0.168.0> started from MyApp.Worker1 terminating
** (RuntimeError) Boom!
(my_app 0.1.0) lib/my_app/worker_1.ex:20: anonymous fn/0 in MyApp.Worker1.handle_call/3
(elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
(elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4
Function: #Function<0.95600963/0 in MyApp.Worker1.handle_call/3>
Args: []
** (EXIT from #PID<0.166.0>) shell process exited with reason: an exception was raised:
** (RuntimeError) Boom!
(my_app 0.1.0) lib/my_app/worker_1.ex:20: anonymous fn/0 in MyApp.Worker1.handle_call/3
(elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
(elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4
Interactive Elixir (1.15.6) - press Ctrl+C to exit (type h() ENTER for help)
タスクとリンクしている MyApp.Worker1 のプロセスが終了し、そのプロセスにリンクしている iex のプロセスも終了して iex が再起動していることがわかります。
EXIT をトラップする
trap_exit フラグを true にして EXIT をトラップする方法があります。
しかし一律でトラップしてしまうため、予想できない影響が出ることも考えられます。
Task.async/1 のドキュメントにも注意を促す但書がついています。
- Setting
:trap_exittotrue- trapping exits should be used only in special circumstances as it would make your process immune to not only exits from the task but from any other processes.
コード
init/1 で Process.flag(:trap_exit, true) を実行し EXIT のトラップを指定しています。
defmodule MyApp.Worker2 do use GenServer def start_link(opts \\ []) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end def do_something do GenServer.call(__MODULE__, :do_something) end @impl true def init(_opts) do Process.flag(:trap_exit, true) {:ok, %{}} end @impl true def handle_call(:do_something, _from, state) do Task.async(fn -> raise "Boom!" end) {:noreply, state} end @impl true def handle_info(msg, state) do dbg msg {:noreply, state} end end
実行
タスクのプロセスで例外が送出されたあと、GenServer のプロセスが :DOWN 以外に :EXIT のメッセージを受け取っているのがわかります。
iex(1)> {:ok, pid} = MyApp.Worker2.start_link()
{:ok, #PID<0.143.0>}
iex(2)> Process.info(pid)
[
registered_name: MyApp.Worker2,
...略...
]
iex(3)> MyApp.Worker2.do_something()
11:34:22.007 [error] Task #PID<0.144.0> started from MyApp.Worker2 terminating
** (RuntimeError) Boom!
(my_app 0.1.0) lib/my_app/worker_2.ex:21: anonymous fn/0 in MyApp.Worker2.handle_call/3
(elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
(elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4
Function: #Function<0.111817085/0 in MyApp.Worker2.handle_call/3>
Args: []
[lib/my_app/worker_2.ex:28: MyApp.Worker2.handle_info/2]
msg #=> {:EXIT, #PID<0.144.0>,
{%RuntimeError{message: "Boom!"},
[
{MyApp.Worker2, :"-handle_call/3-fun-0-", 0,
[
file: ~c"lib/my_app/worker_2.ex",
line: 21,
error_info: %{module: Exception}
]},
{Task.Supervised, :invoke_mfa, 2,
[file: ~c"lib/task/supervised.ex", line: 101]},
{Task.Supervised, :reply, 4, [file: ~c"lib/task/supervised.ex", line: 36]}
]}}
[lib/my_app/worker_2.ex:28: MyApp.Worker2.handle_info/2]
msg #=> {:DOWN, #Reference<0.0.18307.2349639668.3769958402.28027>, :process,
#PID<0.144.0>,
{%RuntimeError{message: "Boom!"},
[
{MyApp.Worker2, :"-handle_call/3-fun-0-", 0,
[
file: ~c"lib/my_app/worker_2.ex",
line: 21,
error_info: %{module: Exception}
]},
{Task.Supervised, :invoke_mfa, 2,
[file: ~c"lib/task/supervised.ex", line: 101]},
{Task.Supervised, :reply, 4, [file: ~c"lib/task/supervised.ex", line: 36]}
]}}
なおこのコードでは call/3 に対して reply を返していないために 5 秒ごにタイムアウトの例外が発生しますが、その点は割愛します。
対処として、タスクのプロセス終了時に GenServer.reply/2 を使って応答を返す方法を前回のブログで説明していますので、そちらも参照してみてください。
リンクしない
EXIT とトラップしないばあいに、タスクのプロセスの異常終了につられて GenServer のプロセスも異常終了するのはリンクしているためです。
Task.Supervisor.async_nolink/3 を利用すると、タスクのプロセスの管理は Task.Supervisor にまかせ、プロセスをリンクせずにタスクを利用することが可能になります。
コード
defmodule MyApp.Worker3 do use GenServer def start_link(opts \\ []) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end def do_something do GenServer.call(__MODULE__, :do_something) end @impl true def init(_opts) do {:ok, %{}} end @impl true def handle_call(:do_something, _from, state) do Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> raise "Boom!" end) {:noreply, state} end @impl true def handle_info(msg, state) do dbg msg {:noreply, state} end end
先に Task.Supervisor のプロセスを起動しておく必要があるので、MyApp.Application を追加しプロセスを起動する設定を記述しておきます。
defmodule MyApp.Application do @moduledoc false use Application @impl true def start(_type, _args) do children = [ {Task.Supervisor, name: MyApp.TaskSupervisor} ] opts = [strategy: :one_for_one, name: MyApp.Supervisor] Supervisor.start_link(children, opts) end end
--sup オプションなしでプロジェクトを作成したばあいは、mix.exs の application/0 に mod: {MyApp.Application, []} を追加することを忘れないでください。
defmodule MyApp.MixProject do use Mix.Project ...略... def application do [ extra_applications: [:logger], mod: {MyApp.Application, []} ] end ...略... end
実行
タスクのプロセスで例外が送出されたあと、:DOWN のみ受け取っていることがわかります。
iex(1)> {:ok, pid} = MyApp.Worker3.start_link()
{:ok, #PID<0.143.0>}
iex(2)> Process.info(pid)
[
registered_name: MyApp.Worker3,
...略...
]
iex(3)> MyApp.Worker3.do_something()
11:36:07.211 [error] Task #PID<0.144.0> started from MyApp.Worker3 terminating
** (RuntimeError) Boom!
(my_app 0.1.0) lib/my_app/worker_3.ex:20: anonymous fn/0 in MyApp.Worker3.handle_call/3
(elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
(elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4
Function: #Function<0.16324509/0 in MyApp.Worker3.handle_call/3>
Args: []
[lib/my_app/worker_3.ex:27: MyApp.Worker3.handle_info/2]
msg #=> {:DOWN, #Reference<0.0.18307.1968646231.2159083530.717>, :process,
#PID<0.144.0>,
{%RuntimeError{message: "Boom!"},
[
{MyApp.Worker3, :"-handle_call/3-fun-0-", 0,
[
file: ~c"lib/my_app/worker_3.ex",
line: 20,
error_info: %{module: Exception}
]},
{Task.Supervised, :invoke_mfa, 2,
[file: ~c"lib/task/supervised.ex", line: 101]},
{Task.Supervised, :reply, 4, [file: ~c"lib/task/supervised.ex", line: 36]}
]}}
Scalability and partitioning
なおドキュメントにあるように、Task.Supervisor はシングルプロセスのため、それがボトルネックになる可能性があるとのこと。
それを対処するために PartitionSupervisor を利用する例が記載されています。
実装してみます。
MyApp.Application で PartitionSupervisor のプロセスを起動するように変更します。
defmodule MyApp.Application do @moduledoc false use Application @impl true def start(_type, _args) do children = [ {PartitionSupervisor, child_spec: Task.Supervisor, name: MyApp.TaskSupervisors} ] opts = [strategy: :one_for_one, name: MyApp.Supervisor] Supervisor.start_link(children, opts) end end
handle_call/3 でタスクのプロセスを起動しているコードも、PartitionSupervisor を指定して起動するように変更します。
def handle_call(:do_something, _from, state) do Task.Supervisor.async_nolink({:via, PartitionSupervisor, {MyApp.TaskSupervisors, self()}}, fn -> raise "Boom!" end) {:noreply, state} end
書くのも読むのも大変になってきたので、モジュールを追加してコードを移動します。
MyApp.TaskSupervisor を追加して、タスクのプロセスを起動する関数を用意します。
defmodule MyApp.TaskSupervisor do def async_nolink(fun, options \\ []) do Task.Supervisor.async_nolink( {:via, PartitionSupervisor, {MyApp.TaskSupervisors, self()}}, fun, options ) end end
この関数を使って handle_call/3 を書き換えます。
def handle_call(:do_something, _from, state) do MyApp.TaskSupervisor.async_nolink(fn -> raise "Boom!" end) {:noreply, state} end
これで見やすくなりました。