時間のかかる処理を複数実行したいとき、各々の処理にプロセスを起動して並行に実行することがあります。
しかし、同じ GenServer プロセスに対して GenServer.call/3
で呼び出してしまうと、GenServer.handle_call/3
で処理が順番待ちになってしまい並行した恩恵を受けることができません。
例として GenServer を実装したモジュール MyApp.Worker
の、処理に 1 秒かかる関数 do_heavy/1
を呼び出すことを考えます。
きちんと時間を計測するには Benchee などを利用した方がよいのですが、今回は簡易的に。
# benchmark.exs start_time = Time.utc_now() 1..3 |> Enum.map(fn n -> Task.async(fn -> n |> MyApp.Worker.do_heavy() |> IO.inspect() end) end) |> Task.await_many() end_time = Time.utc_now() Time.diff(end_time, start_time, :millisecond) |> IO.puts()
MyApp.Worker
を実装します。
呼び出されると 1 秒待ってから引数で渡された値を返すだけです。
defmodule MyApp.Worker do use GenServer def start_link(opts) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end def do_heavy(n) do GenServer.call(__MODULE__, {:do_heavy, n}) end def init(_) do {:ok, %{}} end def handle_call({:do_heavy, n}, _from, state) do Process.sleep(1_000) {:reply, {:ok, n}, state} end end
MyApp.Worker
のプロセスを起動する MyApp.Application
も実装します。
defmodule MyApp.Application do use Application def start(_type, _args) do children = [MyApp.Worker] opts = [strategy: :one_for_one, name: MyApp.Supervisor] Supervisor.start_link(children, opts) end end
冒頭に書いたスクリプトを実行します。
Task.async/1
で別プロセスで実行しているにもかかわらず、全体で 3 秒かかりました。
$ mix run benchmark.exs {:ok, 1} {:ok, 2} {:ok, 3} 3005
そこで GenServer 内部の処理を非同期化します。
まず Task.async/1
を使って処理を別プロセスで実行します。
タスクのリファレンス値(タスクを識別するために利用する値)をキーにして呼び出し元を示す from
の値を state
に記録します。
Task.async/1
の処理が終了すると、タスクのリファレンス値と結果をタプルにしたメッセージが送られるので GenServer.handle_info/2
で値を受け取ります。
受け取ったリファレンス値で state
に記録した呼び出し元の情報を取り出します。
最後に GenServer.reply/2
を使って、MyApp.Worker.do_heavy/1
を呼び出したプロセスに処理結果を返します。
注意点として。
Task.async/1
の終了時には結果のメッセージ以外に :DOWN
から始まる 5 要素のタプルもメッセージとして送信するので、これもハンドルする必要があります。
メッセージの詳細やきちんとしたハンドルのしかたについては、Elixir のドキュメントの Compatibility with OTP behaviours を参照してみてください。
なお、一般的にタスクも Task.Supervisor
で管理した方がよいのですが、話を簡単にするためにここでは Task
を直接使っています。
defmodule MyApp.Worker do use GenServer def start_link(opts) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end def do_heavy(n) do GenServer.call(__MODULE__, {:do_heavy, n}) end def init(_) do {:ok, %{}} end def handle_call({:do_heavy, n}, from, state) do task = Task.async(fn -> Process.sleep(1_000) {:ok, n} end) {:noreply, Map.put(state, task.ref, from)} end def handle_info({ref, {:ok, n}}, state) do case Map.get(state, ref) do nil -> {:noreply, state} from -> GenServer.reply(from, {:ok, n}) {:noreply, Map.delete(state, ref)} end end def handle_info({:DOWN, _, _, _, _}, state) do {:noreply, state} end end
書き換えたものを実行してみます。
$ mix run benchmark.exs {:ok, 1} {:ok, 2} {:ok, 3} 1010
今度は 3 つの処理が並行実行されるので、全体でも 1 秒で完了しました。