エンジニアのソフトウェア的愛情

または私は如何にして心配するのを止めてプログラムを・愛する・ようになったか

Phoenix.PubSub を Phoenix 以外で利用するための素振り

檄を飛ばす

檄を方々に急いで出し,決起を促す。

スーパー大辞林」より

Phoenx.PubSub は Phoenix名前空間にありますが Phoenix のプロジェクト以外でも利用できます。

バージョンが 2 になってシンプルに扱いやすくなったということで素振りをしてみました。

やること

  • Phoenix.PubSub を使って publisher から subscribers にメッセージを送る
  • アプリケーション起動時に subscribers のプロセスを起動するようにしてみる

準備

プロジェクトを作る

$ mix new geki --sub
$ cd geki

依存するパッケージに Phoenix.PubSub を追加する

Phoenix.PubSub のバージョンを確認。

$ mix hex.info phoenix_pubsub
Distributed PubSub and Presence platform

Config: {:phoenix_pubsub, "~> 2.0"}
...

mix.exs に追加。

defmodule Geki.MixProject do
  use Mix.Project

  # ...

  defp deps do
    [
      {:phoenix_pubsub, "~> 2.0"}
    ]
  end
end

Phoenix.PubSub を子プロセスに追加する

lib/geki/application.ex を編集してアプリケーションの子プロセスに Phoenix.PubSub を追加します。 ここではプロセスに Geki.PubSub という名前をつけています。

defmodule Geki.Application do
  @moduledoc false

  use Application

  def start(_type, _args) do
    children = [
      {Phoenix.PubSub, name: Geki.PubSub}
    ]

    opts = [strategy: :one_for_one, name: Geki.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Subscriber モジュールを用意する

Subscriber を記述するファイル lib/geki/subscriber.ex を追加します。 ここではGenServer で実装します。

任意のメッセージを受信したら、それを単純にログに出力します。

defmodule Geki.Subscriber do
  use GenServer

  require Logger

  alias Phoenix.PubSub

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  def init(opts) do
    name = opts[:name]
    pubsub = opts[:pubsub]
    topic = opts[:topic]

    PubSub.subscribe(pubsub, topic)

    {:ok, %{name: name}}
  end

  def handle_info(event, state) do
    Logger.info("Subscriber '#{state.name}' received #{inspect(event)}")

    {:noreply, state}
  end
end

プロセスを起動しメッセージを送る

iex 上でふるまいを確認します。

$ iex -S mix

Subscriber のプロセスを起動する

Geki.Subscriber のプロセスを起動します。 ここでは foo, bar, baz と名前をつけた 3 つのプロセスを起動しています。

iex> Geki.Subscriber.start_link(name: :foo, pubsub: Geki.PubSub, topic: "geki")
{:ok, #PID<0.204.0>}
iex> Geki.Subscriber.start_link(name: :bar, pubsub: Geki.PubSub, topic: "geki")
{:ok, #PID<0.206.0>}
iex> Geki.Subscriber.start_link(name: :baz, pubsub: Geki.PubSub, topic: "geki")
{:ok, #PID<0.208.0>}

メッセージを送る

Phoenix.PubSub.broadcast/4 を使ってメッセージをブロードキャストします。

iex> Phoenix.PubSub.broadcast(Geki.PubSub, "geki", "Hello")
:ok

12:12:23.140 [info]  Subscriber 'bar' received "Hello"

12:12:23.140 [info]  Subscriber 'foo' received "Hello"

12:12:23.140 [info]  Subscriber 'baz' received "Hello"

すこし使いやすく

Publisher モジュールを追加する

特定の topic 向けの Publisher を用意してみます。 状態を記憶するために GenServer で実装します。

Publisher を記述するファイル lib/geki/publisher.ex を追加します。

defmodule Geki.Publisher do
  use GenServer

  def start_link(opts) do
    name = opts[:name]

    GenServer.start_link(__MODULE__, opts, name: name)
  end

  def publish(name, event) do
    GenServer.cast(name, {:publish, event})
  end

  def init(opts) do
    pubsub = opts[:pubsub]
    topic = opts[:topic]

    {:ok, %{pubsub: pubsub, topic: topic}}
  end

  def handle_cast({:publish, event}, state) do
    Phoenix.PubSub.broadcast(state.pubsub, state.topic, event)

    {:noreply, state}
  end
end

Publisher と Subscriber を子プロセスに追加する

アプリケーションの起動時に Publisher と Subscriber のプロセスも起動するようにします。

lib/geki/application.ex を編集してアプリケーションの子プロセスに Publisher と Subscriber を追加します。

今回の Subscriber の実装では GenServer.start/3 でプロセスの名前を指定していないので、複数の Subscriber の起動を指定するとプロセスの識別ができないため起動のときにエラーになってしまいます。

そのため Supervisor.child_spec/2 でプロセスに ID をしています。

defmodule Geki.Application do
  @moduledoc false

  use Application

  def start(_type, _args) do
    children = [
      {Phoenix.PubSub, name: Geki.PubSub},
      {Geki.Publisher, name: :geki, pubsub: Geki.PubSub, topic: "geki"},
      Supervisor.child_spec({Geki.Subscriber, name: :foo, pubsub: Geki.PubSub, topic: "geki"}, id: :foo),
      Supervisor.child_spec({Geki.Subscriber, name: :bar, pubsub: Geki.PubSub, topic: "geki"}, id: :bar),
      Supervisor.child_spec({Geki.Subscriber, name: :baz, pubsub: Geki.PubSub, topic: "geki"}, id: :baz),
    ]

    opts = [strategy: :one_for_one, name: Geki.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

起動してブロードキャストする

$ iex -S mix
iex> Geki.Publisher.publish(:geki, "Hello")                
:ok

12:40:33.437 [info]  Subscriber 'foo' received "Hello"
 
12:40:33.437 [info]  Subscriber 'baz' received "Hello"

12:40:33.437 [info]  Subscriber 'bar' received "Hello"

いつか読むはずっと読まない:FUTURES

この三週間ばかり部屋の大掃除をしているのですが。 なぜかこの画集が二冊見つかりました。

FUTURE

FUTURE

  • 作者:鶴田謙二
  • 発売日: 2011/11/30
  • メディア: 大型本