前回、GenServer の渋滞を解消するために Task を利用する方法を紹介しました。
これは Task のプロセスが異常終了しないことを前提にしていて、異常終了が予想されるばあいには、その対策を施しておく必要があります。
結論から言うと Task.Supervisor.async_nolink/3
を利用するとよいようです。
詳細は Task や Task.Supervisor に記載されていますので、そちらを参照してみてください。
以下は、GenServer の中で Task プロセスが異常終了したときのふるまいを検証した記録です。
対策しない
まず、対策しなかったときのふるまいを確認します。
コード
do_something/0
を呼ぶと、handle_call/3
の中で Task のプロセスを起動します。
Task のプロセスは起動するとすぐに例外を送出します。
defmodule MyApp.Worker1 do use GenServer def start_link(opts \\ []) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end def do_something do GenServer.call(__MODULE__, :do_something) end @impl true def init(_opts) do {:ok, %{}} end @impl true def handle_call(:do_something, _from, state) do Task.async(fn -> raise "Boom!" end) {:noreply, state} end @impl true def handle_info(msg, state) do dbg msg {:noreply, state} end end
実行
iex -S mix
で起動し、実行します。
iex(1)> {:ok, pid} = MyApp.Worker1.start_link() {:ok, #PID<0.167.0>} iex(2)> Process.info(pid) [ registered_name: MyApp.Worker1, ...略... ] iex(3)> MyApp.Worker1.do_something() 11:32:48.607 [error] Task #PID<0.168.0> started from MyApp.Worker1 terminating ** (RuntimeError) Boom! (my_app 0.1.0) lib/my_app/worker_1.ex:20: anonymous fn/0 in MyApp.Worker1.handle_call/3 (elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2 (elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4 Function: #Function<0.95600963/0 in MyApp.Worker1.handle_call/3> Args: [] ** (EXIT from #PID<0.166.0>) shell process exited with reason: an exception was raised: ** (RuntimeError) Boom! (my_app 0.1.0) lib/my_app/worker_1.ex:20: anonymous fn/0 in MyApp.Worker1.handle_call/3 (elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2 (elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4 Interactive Elixir (1.15.6) - press Ctrl+C to exit (type h() ENTER for help)
タスクとリンクしている MyApp.Worker1
のプロセスが終了し、そのプロセスにリンクしている iex のプロセスも終了して iex が再起動していることがわかります。
EXIT をトラップする
trap_exit
フラグを true
にして EXIT
をトラップする方法があります。
しかし一律でトラップしてしまうため、予想できない影響が出ることも考えられます。
Task.async/1
のドキュメントにも注意を促す但書がついています。
- Setting
:trap_exit
totrue
- trapping exits should be used only in special circumstances as it would make your process immune to not only exits from the task but from any other processes.
コード
init/1
で Process.flag(:trap_exit, true)
を実行し EXIT のトラップを指定しています。
defmodule MyApp.Worker2 do use GenServer def start_link(opts \\ []) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end def do_something do GenServer.call(__MODULE__, :do_something) end @impl true def init(_opts) do Process.flag(:trap_exit, true) {:ok, %{}} end @impl true def handle_call(:do_something, _from, state) do Task.async(fn -> raise "Boom!" end) {:noreply, state} end @impl true def handle_info(msg, state) do dbg msg {:noreply, state} end end
実行
タスクのプロセスで例外が送出されたあと、GenServer のプロセスが :DOWN
以外に :EXIT
のメッセージを受け取っているのがわかります。
iex(1)> {:ok, pid} = MyApp.Worker2.start_link() {:ok, #PID<0.143.0>} iex(2)> Process.info(pid) [ registered_name: MyApp.Worker2, ...略... ] iex(3)> MyApp.Worker2.do_something() 11:34:22.007 [error] Task #PID<0.144.0> started from MyApp.Worker2 terminating ** (RuntimeError) Boom! (my_app 0.1.0) lib/my_app/worker_2.ex:21: anonymous fn/0 in MyApp.Worker2.handle_call/3 (elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2 (elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4 Function: #Function<0.111817085/0 in MyApp.Worker2.handle_call/3> Args: [] [lib/my_app/worker_2.ex:28: MyApp.Worker2.handle_info/2] msg #=> {:EXIT, #PID<0.144.0>, {%RuntimeError{message: "Boom!"}, [ {MyApp.Worker2, :"-handle_call/3-fun-0-", 0, [ file: ~c"lib/my_app/worker_2.ex", line: 21, error_info: %{module: Exception} ]}, {Task.Supervised, :invoke_mfa, 2, [file: ~c"lib/task/supervised.ex", line: 101]}, {Task.Supervised, :reply, 4, [file: ~c"lib/task/supervised.ex", line: 36]} ]}} [lib/my_app/worker_2.ex:28: MyApp.Worker2.handle_info/2] msg #=> {:DOWN, #Reference<0.0.18307.2349639668.3769958402.28027>, :process, #PID<0.144.0>, {%RuntimeError{message: "Boom!"}, [ {MyApp.Worker2, :"-handle_call/3-fun-0-", 0, [ file: ~c"lib/my_app/worker_2.ex", line: 21, error_info: %{module: Exception} ]}, {Task.Supervised, :invoke_mfa, 2, [file: ~c"lib/task/supervised.ex", line: 101]}, {Task.Supervised, :reply, 4, [file: ~c"lib/task/supervised.ex", line: 36]} ]}}
なおこのコードでは call/3
に対して reply を返していないために 5 秒ごにタイムアウトの例外が発生しますが、その点は割愛します。
対処として、タスクのプロセス終了時に GenServer.reply/2
を使って応答を返す方法を前回のブログで説明していますので、そちらも参照してみてください。
リンクしない
EXIT とトラップしないばあいに、タスクのプロセスの異常終了につられて GenServer のプロセスも異常終了するのはリンクしているためです。
Task.Supervisor.async_nolink/3
を利用すると、タスクのプロセスの管理は Task.Supervisor にまかせ、プロセスをリンクせずにタスクを利用することが可能になります。
コード
defmodule MyApp.Worker3 do use GenServer def start_link(opts \\ []) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end def do_something do GenServer.call(__MODULE__, :do_something) end @impl true def init(_opts) do {:ok, %{}} end @impl true def handle_call(:do_something, _from, state) do Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> raise "Boom!" end) {:noreply, state} end @impl true def handle_info(msg, state) do dbg msg {:noreply, state} end end
先に Task.Supervisor のプロセスを起動しておく必要があるので、MyApp.Application を追加しプロセスを起動する設定を記述しておきます。
defmodule MyApp.Application do @moduledoc false use Application @impl true def start(_type, _args) do children = [ {Task.Supervisor, name: MyApp.TaskSupervisor} ] opts = [strategy: :one_for_one, name: MyApp.Supervisor] Supervisor.start_link(children, opts) end end
--sup
オプションなしでプロジェクトを作成したばあいは、mix.exs
の application/0
に mod: {MyApp.Application, []}
を追加することを忘れないでください。
defmodule MyApp.MixProject do use Mix.Project ...略... def application do [ extra_applications: [:logger], mod: {MyApp.Application, []} ] end ...略... end
実行
タスクのプロセスで例外が送出されたあと、:DOWN
のみ受け取っていることがわかります。
iex(1)> {:ok, pid} = MyApp.Worker3.start_link() {:ok, #PID<0.143.0>} iex(2)> Process.info(pid) [ registered_name: MyApp.Worker3, ...略... ] iex(3)> MyApp.Worker3.do_something() 11:36:07.211 [error] Task #PID<0.144.0> started from MyApp.Worker3 terminating ** (RuntimeError) Boom! (my_app 0.1.0) lib/my_app/worker_3.ex:20: anonymous fn/0 in MyApp.Worker3.handle_call/3 (elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2 (elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4 Function: #Function<0.16324509/0 in MyApp.Worker3.handle_call/3> Args: [] [lib/my_app/worker_3.ex:27: MyApp.Worker3.handle_info/2] msg #=> {:DOWN, #Reference<0.0.18307.1968646231.2159083530.717>, :process, #PID<0.144.0>, {%RuntimeError{message: "Boom!"}, [ {MyApp.Worker3, :"-handle_call/3-fun-0-", 0, [ file: ~c"lib/my_app/worker_3.ex", line: 20, error_info: %{module: Exception} ]}, {Task.Supervised, :invoke_mfa, 2, [file: ~c"lib/task/supervised.ex", line: 101]}, {Task.Supervised, :reply, 4, [file: ~c"lib/task/supervised.ex", line: 36]} ]}}
Scalability and partitioning
なおドキュメントにあるように、Task.Supervisor
はシングルプロセスのため、それがボトルネックになる可能性があるとのこと。
それを対処するために PartitionSupervisor
を利用する例が記載されています。
実装してみます。
MyApp.Application で PartitionSupervisor
のプロセスを起動するように変更します。
defmodule MyApp.Application do @moduledoc false use Application @impl true def start(_type, _args) do children = [ {PartitionSupervisor, child_spec: Task.Supervisor, name: MyApp.TaskSupervisors} ] opts = [strategy: :one_for_one, name: MyApp.Supervisor] Supervisor.start_link(children, opts) end end
handle_call/3
でタスクのプロセスを起動しているコードも、PartitionSupervisor
を指定して起動するように変更します。
def handle_call(:do_something, _from, state) do Task.Supervisor.async_nolink({:via, PartitionSupervisor, {MyApp.TaskSupervisors, self()}}, fn -> raise "Boom!" end) {:noreply, state} end
書くのも読むのも大変になってきたので、モジュールを追加してコードを移動します。
MyApp.TaskSupervisor
を追加して、タスクのプロセスを起動する関数を用意します。
defmodule MyApp.TaskSupervisor do def async_nolink(fun, options \\ []) do Task.Supervisor.async_nolink( {:via, PartitionSupervisor, {MyApp.TaskSupervisors, self()}}, fun, options ) end end
この関数を使って handle_call/3
を書き換えます。
def handle_call(:do_something, _from, state) do MyApp.TaskSupervisor.async_nolink(fn -> raise "Boom!" end) {:noreply, state} end
これで見やすくなりました。