檄を飛ばす
檄を方々に急いで出し,決起を促す。
「スーパー大辞林」より
Phoenx.PubSub は Phoenix の名前空間にありますが Phoenix のプロジェクト以外でも利用できます。
バージョンが 2 になってシンプルに扱いやすくなったということで素振りをしてみました。
やること
- Phoenix.PubSub を使って publisher から subscribers にメッセージを送る
- アプリケーション起動時に subscribers のプロセスを起動するようにしてみる
準備
プロジェクトを作る
$ mix new geki --sub $ cd geki
依存するパッケージに Phoenix.PubSub を追加する
Phoenix.PubSub のバージョンを確認。
$ mix hex.info phoenix_pubsub Distributed PubSub and Presence platform Config: {:phoenix_pubsub, "~> 2.0"} ...
mix.exs
に追加。
defmodule Geki.MixProject do use Mix.Project # ... defp deps do [ {:phoenix_pubsub, "~> 2.0"} ] end end
Phoenix.PubSub を子プロセスに追加する
lib/geki/application.ex
を編集してアプリケーションの子プロセスに Phoenix.PubSub を追加します。
ここではプロセスに Geki.PubSub
という名前をつけています。
defmodule Geki.Application do @moduledoc false use Application def start(_type, _args) do children = [ {Phoenix.PubSub, name: Geki.PubSub} ] opts = [strategy: :one_for_one, name: Geki.Supervisor] Supervisor.start_link(children, opts) end end
Subscriber モジュールを用意する
Subscriber を記述するファイル lib/geki/subscriber.ex
を追加します。
ここではGenServer
で実装します。
任意のメッセージを受信したら、それを単純にログに出力します。
defmodule Geki.Subscriber do use GenServer require Logger alias Phoenix.PubSub def start_link(opts) do GenServer.start_link(__MODULE__, opts) end def init(opts) do name = opts[:name] pubsub = opts[:pubsub] topic = opts[:topic] PubSub.subscribe(pubsub, topic) {:ok, %{name: name}} end def handle_info(event, state) do Logger.info("Subscriber '#{state.name}' received #{inspect(event)}") {:noreply, state} end end
プロセスを起動しメッセージを送る
iex
上でふるまいを確認します。
$ iex -S mix
Subscriber のプロセスを起動する
Geki.Subscriber
のプロセスを起動します。
ここでは foo
, bar
, baz
と名前をつけた 3 つのプロセスを起動しています。
iex> Geki.Subscriber.start_link(name: :foo, pubsub: Geki.PubSub, topic: "geki") {:ok, #PID<0.204.0>} iex> Geki.Subscriber.start_link(name: :bar, pubsub: Geki.PubSub, topic: "geki") {:ok, #PID<0.206.0>} iex> Geki.Subscriber.start_link(name: :baz, pubsub: Geki.PubSub, topic: "geki") {:ok, #PID<0.208.0>}
メッセージを送る
Phoenix.PubSub.broadcast/4
を使ってメッセージをブロードキャストします。
iex> Phoenix.PubSub.broadcast(Geki.PubSub, "geki", "Hello") :ok 12:12:23.140 [info] Subscriber 'bar' received "Hello" 12:12:23.140 [info] Subscriber 'foo' received "Hello" 12:12:23.140 [info] Subscriber 'baz' received "Hello"
すこし使いやすく
Publisher モジュールを追加する
特定の topic 向けの Publisher を用意してみます。
状態を記憶するために GenServer
で実装します。
Publisher を記述するファイル lib/geki/publisher.ex
を追加します。
defmodule Geki.Publisher do use GenServer def start_link(opts) do name = opts[:name] GenServer.start_link(__MODULE__, opts, name: name) end def publish(name, event) do GenServer.cast(name, {:publish, event}) end def init(opts) do pubsub = opts[:pubsub] topic = opts[:topic] {:ok, %{pubsub: pubsub, topic: topic}} end def handle_cast({:publish, event}, state) do Phoenix.PubSub.broadcast(state.pubsub, state.topic, event) {:noreply, state} end end
Publisher と Subscriber を子プロセスに追加する
アプリケーションの起動時に Publisher と Subscriber のプロセスも起動するようにします。
lib/geki/application.ex
を編集してアプリケーションの子プロセスに Publisher と Subscriber を追加します。
今回の Subscriber の実装では GenServer.start/3
でプロセスの名前を指定していないので、複数の Subscriber の起動を指定するとプロセスの識別ができないため起動のときにエラーになってしまいます。
そのため Supervisor.child_spec/2
でプロセスに ID をしています。
defmodule Geki.Application do @moduledoc false use Application def start(_type, _args) do children = [ {Phoenix.PubSub, name: Geki.PubSub}, {Geki.Publisher, name: :geki, pubsub: Geki.PubSub, topic: "geki"}, Supervisor.child_spec({Geki.Subscriber, name: :foo, pubsub: Geki.PubSub, topic: "geki"}, id: :foo), Supervisor.child_spec({Geki.Subscriber, name: :bar, pubsub: Geki.PubSub, topic: "geki"}, id: :bar), Supervisor.child_spec({Geki.Subscriber, name: :baz, pubsub: Geki.PubSub, topic: "geki"}, id: :baz), ] opts = [strategy: :one_for_one, name: Geki.Supervisor] Supervisor.start_link(children, opts) end end
起動してブロードキャストする
$ iex -S mix
iex> Geki.Publisher.publish(:geki, "Hello") :ok 12:40:33.437 [info] Subscriber 'foo' received "Hello" 12:40:33.437 [info] Subscriber 'baz' received "Hello" 12:40:33.437 [info] Subscriber 'bar' received "Hello"
いつか読むはずっと読まない:FUTURES
この三週間ばかり部屋の大掃除をしているのですが。 なぜかこの画集が二冊見つかりました。