エンジニアのソフトウェア的愛情

または私は如何にして心配するのを止めてプログラムを・愛する・ようになったか

ElixirのGenServer.callの渋滞を解消するための覚書

時間のかかる処理を複数実行したいとき、各々の処理にプロセスを起動して並行に実行することがあります。 しかし、同じ GenServer プロセスに対して GenServer.call/3 で呼び出してしまうと、GenServer.handle_call/3 で処理が順番待ちになってしまい並行した恩恵を受けることができません。

例として GenServer を実装したモジュール MyApp.Worker の、処理に 1 秒かかる関数 do_heavy/1 を呼び出すことを考えます。

きちんと時間を計測するには Benchee などを利用した方がよいのですが、今回は簡易的に。

# benchmark.exs

start_time = Time.utc_now()

1..3
|> Enum.map(fn n ->
  Task.async(fn ->
    n
    |> MyApp.Worker.do_heavy()
    |> IO.inspect()
  end)
end)
|> Task.await_many()

end_time = Time.utc_now()

Time.diff(end_time, start_time, :millisecond) |> IO.puts()

MyApp.Worker を実装します。 呼び出されると 1 秒待ってから引数で渡された値を返すだけです。

defmodule MyApp.Worker do
  use GenServer

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def do_heavy(n) do
    GenServer.call(__MODULE__, {:do_heavy, n})
  end

  def init(_) do
    {:ok, %{}}
  end

  def handle_call({:do_heavy, n}, _from, state) do
    Process.sleep(1_000)
    {:reply, {:ok, n}, state}
  end
end

MyApp.Worker のプロセスを起動する MyApp.Application も実装します。

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [MyApp.Worker]
    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

冒頭に書いたスクリプトを実行します。 Task.async/1 で別プロセスで実行しているにもかかわらず、全体で 3 秒かかりました。

$ mix run benchmark.exs
{:ok, 1}
{:ok, 2}
{:ok, 3}
3005

そこで GenServer 内部の処理を非同期化します。

まず Task.async/1を使って処理を別プロセスで実行します。 タスクのリファレンス値(タスクを識別するために利用する値)をキーにして呼び出し元を示す from の値を state に記録します。

Task.async/1 の処理が終了すると、タスクのリファレンス値と結果をタプルにしたメッセージが送られるので GenServer.handle_info/2 で値を受け取ります。 受け取ったリファレンス値で state に記録した呼び出し元の情報を取り出します。

最後に GenServer.reply/2 を使って、MyApp.Worker.do_heavy/1 を呼び出したプロセスに処理結果を返します。

注意点として。 Task.async/1 の終了時には結果のメッセージ以外に :DOWN から始まる 5 要素のタプルもメッセージとして送信するので、これもハンドルする必要があります。

メッセージの詳細やきちんとしたハンドルのしかたについては、Elixir のドキュメントの Compatibility with OTP behaviours を参照してみてください。

なお、一般的にタスクも Task.Supervisor で管理した方がよいのですが、話を簡単にするためにここでは Task を直接使っています。

defmodule MyApp.Worker do
  use GenServer

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def do_heavy(n) do
    GenServer.call(__MODULE__, {:do_heavy, n})
  end

  def init(_) do
    {:ok, %{}}
  end

  def handle_call({:do_heavy, n}, from, state) do
    task =
      Task.async(fn ->
        Process.sleep(1_000)
        {:ok, n}
      end)

    {:noreply, Map.put(state, task.ref, from)}
  end

  def handle_info({ref, {:ok, n}}, state) do
    case Map.get(state, ref) do
      nil ->
        {:noreply, state}

      from ->
        GenServer.reply(from, {:ok, n})
        {:noreply, Map.delete(state, ref)}
    end
  end

  def handle_info({:DOWN, _, _, _, _}, state) do
    {:noreply, state}
  end
end

書き換えたものを実行してみます。

$ mix run benchmark.exs
{:ok, 1}
{:ok, 2}
{:ok, 3}
1010

今度は 3 つの処理が並行実行されるので、全体でも 1 秒で完了しました。