エンジニアのソフトウェア的愛情

または私は如何にして心配するのを止めてプログラムを・愛する・ようになったか

ElixirのGenServerでTaskを使うための補遺

前回、GenServer の渋滞を解消するために Task を利用する方法を紹介しました。

blog.emattsan.org

これは Task のプロセスが異常終了しないことを前提にしていて、異常終了が予想されるばあいには、その対策を施しておく必要があります。

結論から言うと Task.Supervisor.async_nolink/3 を利用するとよいようです。

詳細は TaskTask.Supervisor に記載されていますので、そちらを参照してみてください。

以下は、GenServer の中で Task プロセスが異常終了したときのふるまいを検証した記録です。

対策しない

まず、対策しなかったときのふるまいを確認します。

コード

do_something/0 を呼ぶと、handle_call/3 の中で Task のプロセスを起動します。

Task のプロセスは起動するとすぐに例外を送出します。

defmodule MyApp.Worker1 do
  use GenServer

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def do_something do
    GenServer.call(__MODULE__, :do_something)
  end

  @impl true
  def init(_opts) do
    {:ok, %{}}
  end

  @impl true
  def handle_call(:do_something, _from, state) do
    Task.async(fn ->
      raise "Boom!"
    end)
    {:noreply, state}
  end

  @impl true
  def handle_info(msg, state) do
    dbg msg
    {:noreply, state}
  end
end

実行

iex -S mix で起動し、実行します。

iex(1)> {:ok, pid} = MyApp.Worker1.start_link()
{:ok, #PID<0.167.0>}

iex(2)> Process.info(pid)
[
  registered_name: MyApp.Worker1,
  ...略...
]

iex(3)> MyApp.Worker1.do_something()

11:32:48.607 [error] Task #PID<0.168.0> started from MyApp.Worker1 terminating
** (RuntimeError) Boom!
    (my_app 0.1.0) lib/my_app/worker_1.ex:20: anonymous fn/0 in MyApp.Worker1.handle_call/3
    (elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
    (elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4
Function: #Function<0.95600963/0 in MyApp.Worker1.handle_call/3>
    Args: []
** (EXIT from #PID<0.166.0>) shell process exited with reason: an exception was raised:
    ** (RuntimeError) Boom!
        (my_app 0.1.0) lib/my_app/worker_1.ex:20: anonymous fn/0 in MyApp.Worker1.handle_call/3
        (elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
        (elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4

Interactive Elixir (1.15.6) - press Ctrl+C to exit (type h() ENTER for help)

タスクとリンクしている MyApp.Worker1 のプロセスが終了し、そのプロセスにリンクしている iex のプロセスも終了して iex が再起動していることがわかります。

EXIT をトラップする

trap_exit フラグを true にして EXIT をトラップする方法があります。

しかし一律でトラップしてしまうため、予想できない影響が出ることも考えられます。 Task.async/1 のドキュメントにも注意を促す但書がついています。

  • Setting :trap_exit to true - trapping exits should be used only in special circumstances as it would make your process immune to not only exits from the task but from any other processes.

コード

init/1Process.flag(:trap_exit, true) を実行し EXIT のトラップを指定しています。

defmodule MyApp.Worker2 do
  use GenServer

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def do_something do
    GenServer.call(__MODULE__, :do_something)
  end

  @impl true
  def init(_opts) do
    Process.flag(:trap_exit, true)
    {:ok, %{}}
  end

  @impl true
  def handle_call(:do_something, _from, state) do
    Task.async(fn ->
      raise "Boom!"
    end)
    {:noreply, state}
  end

  @impl true
  def handle_info(msg, state) do
    dbg msg
    {:noreply, state}
  end
end

実行

タスクのプロセスで例外が送出されたあと、GenServer のプロセスが :DOWN 以外に :EXIT のメッセージを受け取っているのがわかります。

iex(1)> {:ok, pid} = MyApp.Worker2.start_link()
{:ok, #PID<0.143.0>}

iex(2)> Process.info(pid)
[
  registered_name: MyApp.Worker2,
  ...略...
]

iex(3)> MyApp.Worker2.do_something()

11:34:22.007 [error] Task #PID<0.144.0> started from MyApp.Worker2 terminating
** (RuntimeError) Boom!
    (my_app 0.1.0) lib/my_app/worker_2.ex:21: anonymous fn/0 in MyApp.Worker2.handle_call/3
    (elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
    (elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4
Function: #Function<0.111817085/0 in MyApp.Worker2.handle_call/3>
    Args: []
[lib/my_app/worker_2.ex:28: MyApp.Worker2.handle_info/2]
msg #=> {:EXIT, #PID<0.144.0>,
 {%RuntimeError{message: "Boom!"},
  [
    {MyApp.Worker2, :"-handle_call/3-fun-0-", 0,
     [
       file: ~c"lib/my_app/worker_2.ex",
       line: 21,
       error_info: %{module: Exception}
     ]},
    {Task.Supervised, :invoke_mfa, 2,
     [file: ~c"lib/task/supervised.ex", line: 101]},
    {Task.Supervised, :reply, 4, [file: ~c"lib/task/supervised.ex", line: 36]}
  ]}}

[lib/my_app/worker_2.ex:28: MyApp.Worker2.handle_info/2]
msg #=> {:DOWN, #Reference<0.0.18307.2349639668.3769958402.28027>, :process,
 #PID<0.144.0>,
 {%RuntimeError{message: "Boom!"},
  [
    {MyApp.Worker2, :"-handle_call/3-fun-0-", 0,
     [
       file: ~c"lib/my_app/worker_2.ex",
       line: 21,
       error_info: %{module: Exception}
     ]},
    {Task.Supervised, :invoke_mfa, 2,
     [file: ~c"lib/task/supervised.ex", line: 101]},
    {Task.Supervised, :reply, 4, [file: ~c"lib/task/supervised.ex", line: 36]}
  ]}}

なおこのコードでは call/3 に対して reply を返していないために 5 秒ごにタイムアウトの例外が発生しますが、その点は割愛します。 対処として、タスクのプロセス終了時に GenServer.reply/2 を使って応答を返す方法を前回のブログで説明していますので、そちらも参照してみてください。

リンクしない

EXIT とトラップしないばあいに、タスクのプロセスの異常終了につられて GenServer のプロセスも異常終了するのはリンクしているためです。

Task.Supervisor.async_nolink/3 を利用すると、タスクのプロセスの管理は Task.Supervisor にまかせ、プロセスをリンクせずにタスクを利用することが可能になります。

コード

defmodule MyApp.Worker3 do
  use GenServer

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def do_something do
    GenServer.call(__MODULE__, :do_something)
  end

  @impl true
  def init(_opts) do
    {:ok, %{}}
  end

  @impl true
  def handle_call(:do_something, _from, state) do
    Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn ->
      raise "Boom!"
    end)
    {:noreply, state}
  end

  @impl true
  def handle_info(msg, state) do
    dbg msg
    {:noreply, state}
  end
end

先に Task.Supervisor のプロセスを起動しておく必要があるので、MyApp.Application を追加しプロセスを起動する設定を記述しておきます。

defmodule MyApp.Application do
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    children = [
      {Task.Supervisor, name: MyApp.TaskSupervisor}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

--sup オプションなしでプロジェクトを作成したばあいは、mix.exsapplication/0mod: {MyApp.Application, []} を追加することを忘れないでください。

defmodule MyApp.MixProject do
  use Mix.Project

  ......

  def application do
    [
      extra_applications: [:logger],
      mod: {MyApp.Application, []}
    ]
  end

  ......
end

実行

タスクのプロセスで例外が送出されたあと、:DOWN のみ受け取っていることがわかります。

iex(1)> {:ok, pid} = MyApp.Worker3.start_link()
{:ok, #PID<0.143.0>}

iex(2)> Process.info(pid)
[
  registered_name: MyApp.Worker3,
  ...略...
]

iex(3)> MyApp.Worker3.do_something()

11:36:07.211 [error] Task #PID<0.144.0> started from MyApp.Worker3 terminating
** (RuntimeError) Boom!
    (my_app 0.1.0) lib/my_app/worker_3.ex:20: anonymous fn/0 in MyApp.Worker3.handle_call/3
    (elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
    (elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4
Function: #Function<0.16324509/0 in MyApp.Worker3.handle_call/3>
    Args: []
[lib/my_app/worker_3.ex:27: MyApp.Worker3.handle_info/2]
msg #=> {:DOWN, #Reference<0.0.18307.1968646231.2159083530.717>, :process,
 #PID<0.144.0>,
 {%RuntimeError{message: "Boom!"},
  [
    {MyApp.Worker3, :"-handle_call/3-fun-0-", 0,
     [
       file: ~c"lib/my_app/worker_3.ex",
       line: 20,
       error_info: %{module: Exception}
     ]},
    {Task.Supervised, :invoke_mfa, 2,
     [file: ~c"lib/task/supervised.ex", line: 101]},
    {Task.Supervised, :reply, 4, [file: ~c"lib/task/supervised.ex", line: 36]}
  ]}}

Scalability and partitioning

なおドキュメントにあるように、Task.Supervisor はシングルプロセスのため、それがボトルネックになる可能性があるとのこと。

それを対処するために PartitionSupervisor を利用する例が記載されています。 実装してみます。

MyApp.Application で PartitionSupervisor のプロセスを起動するように変更します。

defmodule MyApp.Application do
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    children = [
      {PartitionSupervisor, child_spec: Task.Supervisor, name: MyApp.TaskSupervisors}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

handle_call/3 でタスクのプロセスを起動しているコードも、PartitionSupervisor を指定して起動するように変更します。

  def handle_call(:do_something, _from, state) do
    Task.Supervisor.async_nolink({:via, PartitionSupervisor, {MyApp.TaskSupervisors, self()}}, fn ->
      raise "Boom!"
    end)
    {:noreply, state}
  end

書くのも読むのも大変になってきたので、モジュールを追加してコードを移動します。

MyApp.TaskSupervisor を追加して、タスクのプロセスを起動する関数を用意します。

defmodule MyApp.TaskSupervisor do
  def async_nolink(fun, options \\ []) do
    Task.Supervisor.async_nolink(
      {:via, PartitionSupervisor, {MyApp.TaskSupervisors, self()}},
      fun,
      options
    )
  end
end

この関数を使って handle_call/3 を書き換えます。

  def handle_call(:do_something, _from, state) do
    MyApp.TaskSupervisor.async_nolink(fn ->
      raise "Boom!"
    end)
    {:noreply, state}
  end

これで見やすくなりました。