エンジニアのソフトウェア的愛情

または私は如何にして心配するのを止めてプログラムを・愛する・ようになったか

ElixirのGenServerでTaskを使うための補遺

前回、GenServer の渋滞を解消するために Task を利用する方法を紹介しました。

blog.emattsan.org

これは Task のプロセスが異常終了しないことを前提にしていて、異常終了が予想されるばあいには、その対策を施しておく必要があります。

結論から言うと Task.Supervisor.async_nolink/3 を利用するとよいようです。

詳細は TaskTask.Supervisor に記載されていますので、そちらを参照してみてください。

以下は、GenServer の中で Task プロセスが異常終了したときのふるまいを検証した記録です。

対策しない

まず、対策しなかったときのふるまいを確認します。

コード

do_something/0 を呼ぶと、handle_call/3 の中で Task のプロセスを起動します。

Task のプロセスは起動するとすぐに例外を送出します。

defmodule MyApp.Worker1 do
  use GenServer

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def do_something do
    GenServer.call(__MODULE__, :do_something)
  end

  @impl true
  def init(_opts) do
    {:ok, %{}}
  end

  @impl true
  def handle_call(:do_something, _from, state) do
    Task.async(fn ->
      raise "Boom!"
    end)
    {:noreply, state}
  end

  @impl true
  def handle_info(msg, state) do
    dbg msg
    {:noreply, state}
  end
end

実行

iex -S mix で起動し、実行します。

iex(1)> {:ok, pid} = MyApp.Worker1.start_link()
{:ok, #PID<0.167.0>}

iex(2)> Process.info(pid)
[
  registered_name: MyApp.Worker1,
  ...略...
]

iex(3)> MyApp.Worker1.do_something()

11:32:48.607 [error] Task #PID<0.168.0> started from MyApp.Worker1 terminating
** (RuntimeError) Boom!
    (my_app 0.1.0) lib/my_app/worker_1.ex:20: anonymous fn/0 in MyApp.Worker1.handle_call/3
    (elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
    (elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4
Function: #Function<0.95600963/0 in MyApp.Worker1.handle_call/3>
    Args: []
** (EXIT from #PID<0.166.0>) shell process exited with reason: an exception was raised:
    ** (RuntimeError) Boom!
        (my_app 0.1.0) lib/my_app/worker_1.ex:20: anonymous fn/0 in MyApp.Worker1.handle_call/3
        (elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
        (elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4

Interactive Elixir (1.15.6) - press Ctrl+C to exit (type h() ENTER for help)

タスクとリンクしている MyApp.Worker1 のプロセスが終了し、そのプロセスにリンクしている iex のプロセスも終了して iex が再起動していることがわかります。

EXIT をトラップする

trap_exit フラグを true にして EXIT をトラップする方法があります。

しかし一律でトラップしてしまうため、予想できない影響が出ることも考えられます。 Task.async/1 のドキュメントにも注意を促す但書がついています。

  • Setting :trap_exit to true - trapping exits should be used only in special circumstances as it would make your process immune to not only exits from the task but from any other processes.

コード

init/1Process.flag(:trap_exit, true) を実行し EXIT のトラップを指定しています。

defmodule MyApp.Worker2 do
  use GenServer

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def do_something do
    GenServer.call(__MODULE__, :do_something)
  end

  @impl true
  def init(_opts) do
    Process.flag(:trap_exit, true)
    {:ok, %{}}
  end

  @impl true
  def handle_call(:do_something, _from, state) do
    Task.async(fn ->
      raise "Boom!"
    end)
    {:noreply, state}
  end

  @impl true
  def handle_info(msg, state) do
    dbg msg
    {:noreply, state}
  end
end

実行

タスクのプロセスで例外が送出されたあと、GenServer のプロセスが :DOWN 以外に :EXIT のメッセージを受け取っているのがわかります。

iex(1)> {:ok, pid} = MyApp.Worker2.start_link()
{:ok, #PID<0.143.0>}

iex(2)> Process.info(pid)
[
  registered_name: MyApp.Worker2,
  ...略...
]

iex(3)> MyApp.Worker2.do_something()

11:34:22.007 [error] Task #PID<0.144.0> started from MyApp.Worker2 terminating
** (RuntimeError) Boom!
    (my_app 0.1.0) lib/my_app/worker_2.ex:21: anonymous fn/0 in MyApp.Worker2.handle_call/3
    (elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
    (elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4
Function: #Function<0.111817085/0 in MyApp.Worker2.handle_call/3>
    Args: []
[lib/my_app/worker_2.ex:28: MyApp.Worker2.handle_info/2]
msg #=> {:EXIT, #PID<0.144.0>,
 {%RuntimeError{message: "Boom!"},
  [
    {MyApp.Worker2, :"-handle_call/3-fun-0-", 0,
     [
       file: ~c"lib/my_app/worker_2.ex",
       line: 21,
       error_info: %{module: Exception}
     ]},
    {Task.Supervised, :invoke_mfa, 2,
     [file: ~c"lib/task/supervised.ex", line: 101]},
    {Task.Supervised, :reply, 4, [file: ~c"lib/task/supervised.ex", line: 36]}
  ]}}

[lib/my_app/worker_2.ex:28: MyApp.Worker2.handle_info/2]
msg #=> {:DOWN, #Reference<0.0.18307.2349639668.3769958402.28027>, :process,
 #PID<0.144.0>,
 {%RuntimeError{message: "Boom!"},
  [
    {MyApp.Worker2, :"-handle_call/3-fun-0-", 0,
     [
       file: ~c"lib/my_app/worker_2.ex",
       line: 21,
       error_info: %{module: Exception}
     ]},
    {Task.Supervised, :invoke_mfa, 2,
     [file: ~c"lib/task/supervised.ex", line: 101]},
    {Task.Supervised, :reply, 4, [file: ~c"lib/task/supervised.ex", line: 36]}
  ]}}

なおこのコードでは call/3 に対して reply を返していないために 5 秒ごにタイムアウトの例外が発生しますが、その点は割愛します。 対処として、タスクのプロセス終了時に GenServer.reply/2 を使って応答を返す方法を前回のブログで説明していますので、そちらも参照してみてください。

リンクしない

EXIT とトラップしないばあいに、タスクのプロセスの異常終了につられて GenServer のプロセスも異常終了するのはリンクしているためです。

Task.Supervisor.async_nolink/3 を利用すると、タスクのプロセスの管理は Task.Supervisor にまかせ、プロセスをリンクせずにタスクを利用することが可能になります。

コード

defmodule MyApp.Worker3 do
  use GenServer

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def do_something do
    GenServer.call(__MODULE__, :do_something)
  end

  @impl true
  def init(_opts) do
    {:ok, %{}}
  end

  @impl true
  def handle_call(:do_something, _from, state) do
    Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn ->
      raise "Boom!"
    end)
    {:noreply, state}
  end

  @impl true
  def handle_info(msg, state) do
    dbg msg
    {:noreply, state}
  end
end

先に Task.Supervisor のプロセスを起動しておく必要があるので、MyApp.Application を追加しプロセスを起動する設定を記述しておきます。

defmodule MyApp.Application do
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    children = [
      {Task.Supervisor, name: MyApp.TaskSupervisor}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

--sup オプションなしでプロジェクトを作成したばあいは、mix.exsapplication/0mod: {MyApp.Application, []} を追加することを忘れないでください。

defmodule MyApp.MixProject do
  use Mix.Project

  ......

  def application do
    [
      extra_applications: [:logger],
      mod: {MyApp.Application, []}
    ]
  end

  ......
end

実行

タスクのプロセスで例外が送出されたあと、:DOWN のみ受け取っていることがわかります。

iex(1)> {:ok, pid} = MyApp.Worker3.start_link()
{:ok, #PID<0.143.0>}

iex(2)> Process.info(pid)
[
  registered_name: MyApp.Worker3,
  ...略...
]

iex(3)> MyApp.Worker3.do_something()

11:36:07.211 [error] Task #PID<0.144.0> started from MyApp.Worker3 terminating
** (RuntimeError) Boom!
    (my_app 0.1.0) lib/my_app/worker_3.ex:20: anonymous fn/0 in MyApp.Worker3.handle_call/3
    (elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
    (elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4
Function: #Function<0.16324509/0 in MyApp.Worker3.handle_call/3>
    Args: []
[lib/my_app/worker_3.ex:27: MyApp.Worker3.handle_info/2]
msg #=> {:DOWN, #Reference<0.0.18307.1968646231.2159083530.717>, :process,
 #PID<0.144.0>,
 {%RuntimeError{message: "Boom!"},
  [
    {MyApp.Worker3, :"-handle_call/3-fun-0-", 0,
     [
       file: ~c"lib/my_app/worker_3.ex",
       line: 20,
       error_info: %{module: Exception}
     ]},
    {Task.Supervised, :invoke_mfa, 2,
     [file: ~c"lib/task/supervised.ex", line: 101]},
    {Task.Supervised, :reply, 4, [file: ~c"lib/task/supervised.ex", line: 36]}
  ]}}

Scalability and partitioning

なおドキュメントにあるように、Task.Supervisor はシングルプロセスのため、それがボトルネックになる可能性があるとのこと。

それを対処するために PartitionSupervisor を利用する例が記載されています。 実装してみます。

MyApp.Application で PartitionSupervisor のプロセスを起動するように変更します。

defmodule MyApp.Application do
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    children = [
      {PartitionSupervisor, child_spec: Task.Supervisor, name: MyApp.TaskSupervisors}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

handle_call/3 でタスクのプロセスを起動しているコードも、PartitionSupervisor を指定して起動するように変更します。

  def handle_call(:do_something, _from, state) do
    Task.Supervisor.async_nolink({:via, PartitionSupervisor, {MyApp.TaskSupervisors, self()}}, fn ->
      raise "Boom!"
    end)
    {:noreply, state}
  end

書くのも読むのも大変になってきたので、モジュールを追加してコードを移動します。

MyApp.TaskSupervisor を追加して、タスクのプロセスを起動する関数を用意します。

defmodule MyApp.TaskSupervisor do
  def async_nolink(fun, options \\ []) do
    Task.Supervisor.async_nolink(
      {:via, PartitionSupervisor, {MyApp.TaskSupervisors, self()}},
      fun,
      options
    )
  end
end

この関数を使って handle_call/3 を書き換えます。

  def handle_call(:do_something, _from, state) do
    MyApp.TaskSupervisor.async_nolink(fn ->
      raise "Boom!"
    end)
    {:noreply, state}
  end

これで見やすくなりました。

ElixirのGenServer.callの渋滞を解消するための覚書

時間のかかる処理を複数実行したいとき、各々の処理にプロセスを起動して並行に実行することがあります。 しかし、同じ GenServer プロセスに対して GenServer.call/3 で呼び出してしまうと、GenServer.handle_call/3 で処理が順番待ちになってしまい並行した恩恵を受けることができません。

例として GenServer を実装したモジュール MyApp.Worker の、処理に 1 秒かかる関数 do_heavy/1 を呼び出すことを考えます。

きちんと時間を計測するには Benchee などを利用した方がよいのですが、今回は簡易的に。

# benchmark.exs

start_time = Time.utc_now()

1..3
|> Enum.map(fn n ->
  Task.async(fn ->
    n
    |> MyApp.Worker.do_heavy()
    |> IO.inspect()
  end)
end)
|> Task.await_many()

end_time = Time.utc_now()

Time.diff(end_time, start_time, :millisecond) |> IO.puts()

MyApp.Worker を実装します。 呼び出されると 1 秒待ってから引数で渡された値を返すだけです。

defmodule MyApp.Worker do
  use GenServer

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def do_heavy(n) do
    GenServer.call(__MODULE__, {:do_heavy, n})
  end

  def init(_) do
    {:ok, %{}}
  end

  def handle_call({:do_heavy, n}, _from, state) do
    Process.sleep(1_000)
    {:reply, {:ok, n}, state}
  end
end

MyApp.Worker のプロセスを起動する MyApp.Application も実装します。

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [MyApp.Worker]
    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

冒頭に書いたスクリプトを実行します。 Task.async/1 で別プロセスで実行しているにもかかわらず、全体で 3 秒かかりました。

$ mix run benchmark.exs
{:ok, 1}
{:ok, 2}
{:ok, 3}
3005

そこで GenServer 内部の処理を非同期化します。

まず Task.async/1を使って処理を別プロセスで実行します。 タスクのリファレンス値(タスクを識別するために利用する値)をキーにして呼び出し元を示す from の値を state に記録します。

Task.async/1 の処理が終了すると、タスクのリファレンス値と結果をタプルにしたメッセージが送られるので GenServer.handle_info/2 で値を受け取ります。 受け取ったリファレンス値で state に記録した呼び出し元の情報を取り出します。

最後に GenServer.reply/2 を使って、MyApp.Worker.do_heavy/1 を呼び出したプロセスに処理結果を返します。

注意点として。 Task.async/1 の終了時には結果のメッセージ以外に :DOWN から始まる 5 要素のタプルもメッセージとして送信するので、これもハンドルする必要があります。

メッセージの詳細やきちんとしたハンドルのしかたについては、Elixir のドキュメントの Compatibility with OTP behaviours を参照してみてください。

なお、一般的にタスクも Task.Supervisor で管理した方がよいのですが、話を簡単にするためにここでは Task を直接使っています。

defmodule MyApp.Worker do
  use GenServer

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def do_heavy(n) do
    GenServer.call(__MODULE__, {:do_heavy, n})
  end

  def init(_) do
    {:ok, %{}}
  end

  def handle_call({:do_heavy, n}, from, state) do
    task =
      Task.async(fn ->
        Process.sleep(1_000)
        {:ok, n}
      end)

    {:noreply, Map.put(state, task.ref, from)}
  end

  def handle_info({ref, {:ok, n}}, state) do
    case Map.get(state, ref) do
      nil ->
        {:noreply, state}

      from ->
        GenServer.reply(from, {:ok, n})
        {:noreply, Map.delete(state, ref)}
    end
  end

  def handle_info({:DOWN, _, _, _, _}, state) do
    {:noreply, state}
  end
end

書き換えたものを実行してみます。

$ mix run benchmark.exs
{:ok, 1}
{:ok, 2}
{:ok, 3}
1010

今度は 3 つの処理が並行実行されるので、全体でも 1 秒で完了しました。

ElixirでPNGをつくる

ウェブアプリケーションを作っていると、表示する画像を動的に生成したいケースに遭遇します。 探してみると画像ファイルを生成するツールやライブラリは見つかるのですが、ウェブアプリケーションでは画像データを HTTP のレスポンスとして送信できればよいので、ファイルを介さずに扱えると便利です。

そんなことを考えつつ画像フォーマットを調べていたら、PNG の構造が思っていたよりもずっと単純ということに気がつきました。 そんなわけで、今回は PNG の生成を Elixir で書いてみることにします。

PNG の構造

実装するにあたって、最初に最低限の PNG の情報を整理します。

ここでは設定値の細かな説明は省略しますので、詳しくは PNG の仕様や PNG - Wikipedia などを参照してみてください。

最小限の構成

PNG は、ファイルヘッダといくつかのチャンクと呼ばれる構造で構成されています。 チャンクはいくつか種類がありますが、ヘッダ情報を格納する IHDR チャンク、画像データを格納する IDAT チャンク、終端を表す IEND チャンクの 3 つが必須で、これらを一つずつ持つ構造が最小の構成になります(パレットカラーを利用するばあいは PLTE チャンクも必要ですが、今回は割愛)。

ファイルヘッダ

ファイルヘッダは、その内容が PNG であることをあらわす固定のデータです。 ファイルはこのデータを先頭に配置しなければなりません。

チャンク

チャンクの構造

ファイルヘッダ以外のデータは、チャンクという形式で格納されています。 チャンクは、データの長さ、チャンクタイプ、チャンクデータ、CRC からなるデータです。

長さ、タイプ、CRC はそれぞれ 4 バイトで、ビッグエンディアンで格納されます。 データは可変長で 0 のばあいもあります。

CRC はタイプとデータを連結した CRC-32 の値です。

IHDR チャンク

画像の大きさや使用する色の情報を格納する固定長のチャンクです。

width と height は画像の幅と高さをあらわす 4 バイトの数値でビッグエンディアンで格納されます。

bit depth は画素あたりのビットを表し、color type はフルカラーやグレースケール、アルファチャネルの有無などの情報を表します。

残りのデータで圧縮、フィルタリング、インタレースといった制御ができますが、この記事では省略します。

IDAT チャンク

画像イメージを格納するチャンクです。 構造は単純で、各ラインごとに適用するフィルタの種類をあらわす 1 バイトのデータを先頭に追加して、全体を Deflate で圧縮(いわゆる ZIP 圧縮)したものです。

一つのラインあたりのデータは、画像幅と画素あたりのバイト数(例えば 8 ビットフルカラーであれば、画素あたり 3 バイト)をかけた値になります。 画素あたりのビット数が 8 未満で、バイト列にしたときに端数が出るばあいは、パディングを追加してバイト区切りにそろえる必要があります。

IEND チャンク

終端をあらわすチャンクです。 チャンクデータを持たないため事実上固定データです。

実装に必要な知識

次に。 Elixir で実装するにあたり、必要な情報を整理しておきます。

画像データの圧縮

Elixir そのもにはデータ圧縮のライブラリは提供されていませんが、Erlangzlib が用意されているので、これを利用します。

www.erlang.org

CRC

CRC-32 の関数も Erlangerlang:crc32/1 が用意されているので、これを利用します。

www.erlang.org

IO data

最後に。 今回は IO data という、普段はあまり意識しないデータ構造を利用するので、その説明を補足しておきます。

IO data は Elixir のデータ構造の一つですが、ドキュメントにあるように複雑なものではありません。 ドキュメントでは次のように記載されています。

hexdocs.pm

IO data is a data type that can be used as a more efficient alternative to binaries in certain situations.

(IO data は、特定の状況でバイナリのより効率的な代替手段として使用できるデータ型です。)

A term of type IO data is a binary or a list containing bytes (integers within the 0..255 range) or nested IO data. The type is recursive.

(IO data の項目の型は、バイナリまたはバイト(0..255範囲内の整数)またはネストされた IO data を含むリストです。型は再帰的です。)

たとえば、次のデータは IO.puts/1 で出力すると ABCD と表示される IO data の例です。

  • [0x41, 0x42, 0x43, 0x44]
  • [0x41, [0x42, [0x43, [0x44]]]]
  • [[0x41, 0x42], [0x43, 0x44]]
  • [[0x41, 0x42], "CD"]
  • <<0x041, 0x42, 0x43, 0x44>>
  • [<<0x041, 0x42>>, <<0x43, 0x44>>]

このように柔軟な構造をしているので、「バイナリか、0 から 255 までの整数か、IO data そのものをリストで連結すればなんとかなる」便利なデータ構造になっています。 この柔軟さのおかげで、リストのネストやバイナリとの混在を、処理の途中でほとんど気にすることなく扱うことが可能になります。

先に紹介した zlib の関数に入力するデータや出力されるデータも IO data の形式になっています。

これらを踏まえて。 PNG 画像の生成を Elixir で実装してみます。

実装

チャンクを構築する

まず、チャンクを構築する関数を書きます。

引数の type がチャンクタイプ、data がデータ本体です。 どちらも IO data 形式です。

チャンクの仕様にのっとり、長さ、タイプ、データ、CRC を連結したものを返します。 長さと CRC はビッグエンディアンの 32 ビット(4 バイト)のデータにしたいため、::big-32 を指定してバイナリデータに変換しています。

返却する値も、バイナリや IO data をリストにしたものなので、これも IO data 形式です。

  defp chunk(type, data) do
    length = IO.iodata_length(data)
    crc = :erlang.crc32([type, data])

    [<<length::big-32>>, type, data, <<crc::big-32>>]
  end

圧縮する

Erlangzlib を利用してデータを圧縮します。 一連の手続きを一つの関数にまとめただけでライブラリの使い方そのままです。

ここで引数の data の値も戻り値も IO data 形式です。

  defp zip(data) do
    z = :zlib.open()
    :ok = :zlib.deflateInit(z)
    compressed = :zlib.deflate(z, data, :finish)
    :ok = :zlib.deflateEnd(z)
    :zlib.close(z)
    compressed
  end

画像を作成する

サイズが、幅width 、高さ height の 8 ビットフルカラーの画像データ bitmap から PNG のデータを作成する関数です。 bit_depthcolor_type の値を変更すれば、他の画像フォーマットの PNG データを作成できます。

関数の処理を簡単にするために、ここでは bitmap は、RGB の 3 バイトごとにリストかバイナリにしたものを要素としたリストで受け取ることを前提にしています。 具体的には [[r, g, b], [r, g, b], ...] もしくは [<<r, g, b>>, <<r, g, b>>, ...] という構造になっている必要があります。

data の値を作成する部分で、bitmap をラインごとに分割して、その先頭に 1 バイトの filter type を挿入しています。 その後 data を圧縮したものが idat の値になります

chunk/2zip/1 は先に説明した関数と同じものです。

ここでも戻り値は IO data 形式です。

defmodule Png do
  @file_header <<0x89, "PNG", 0x0D, 0x0A, 0x1A, 0x0A>>

  def generate(width, height, bitmap) do
    bit_depth = 8          # 8 bits per pixel
    color_type = 2         # 2 = true color
    compression_method = 0 # compression: none
    filter_method = 0      # filter: none
    interlace_method = 0   # no interlace

    ihdr = <<
      width::big-32,
      height::big-32,
      bit_depth,
      color_type,
      compression_method,
      filter_method,
      interlace_method
    >>

    data =
      bitmap
      |> Enum.chunk_every(width)
      |> Enum.map(fn row ->
        [filter_method, row]
      end)

    idat = zip(data)

    [
      @file_header,
      chunk("IHDR", ihdr),
      chunk("IDAT", idat),
      chunk("IEND", "")
    ]
  end

  defp chunk(type, data) do
    length = IO.iodata_length(data)
    crc = :erlang.crc32([type, data])

    [<<length::big-32>>, type, data, <<crc::big-32>>]
  end

  defp zip(data) do
    z = :zlib.open()
    :ok = :zlib.deflateInit(z)
    compressed = :zlib.deflate(z, data, :finish)
    :ok = :zlib.deflateEnd(z)
    :zlib.close(z)
    compressed
  end
end

実行

PNG ファイルを作成する

作成したモジュールを利用して実際に PNG ファイルを作成します。

ここでは 512x512 のグラデーションパタンのファイルを作成しています。

データは IO data 形式で生成されるため、File.write/2 を使ってそのままファイルに書き出すことが可能です。

width = 512
height = 512

bitmap =
  for row <- 0..(height - 1), col <- 0..(width - 1) do
    [
      min(row, 255),
      min(col, 255),
      min(min(511 - row, 511 - col), 255)
    ]
  end

png = Png.generate(width, height, bitmap)

File.write("grad.png", png)

Livebook で使う

Livebook で Kino をインストールすれば、生成した PNG データを Livebook 上で確認することができます。

hexdocs.pm

Livebook で新しいノートブックを開いて、Setup で Kino をインストールします。

Mix.install([:kino])

次に PNG ファイルを作成するために記述したコードを Livebook に貼り付けます。

ここで最後に File.write/2 でファイルに保存しているコードを IO.iodata_to_binary/1 に書き換えます。

IO.iodata_to_binary/1 は名前のとおり IO data をバイナリに変換するもので、バイナリ形式になった PNG データは Kino が自動的に画像として表示してくれます。

png = Png.generate(width, height, bitmap)

IO.iodata_to_binary(png)

宣伝

最後にちょっと宣伝です。 ここまで書いた内容をパッケージにまとめたものを hex.pm で公開しています。 グレースケールやパレットカラーにも対応しています。 よろしければどうぞ。

hex.pm

いつか読むはずっと読まない:ひとのあいだと書いて人間

ソフトウェアは、人そのものから作られるプロダクトなわけですが。 人それだけではなく、人と人との関係もまたソフトウェアの源泉の一つなのだなと思う今日この頃。

まさに、「ソフトウェアは人が人のためにつくるもの」、なのだと。

OTP標準装備のハンドラの利用手段がElixirのLoggerに標準装備されたので、その覚書

Elixir Logger に Erlang/OTP logger がやってきた

およそ半年前、1 月にこのような記事を書きました。

blog.emattsan.org

ログローテーションなどを装備したファイル出力を可能にするハンドラが OTP 21.0 で装備されていて、それを Elixir から利用しようという話でした。

先ごろ公開された Elixir 1.15.0 では、これらの仕組みが Elixir の Logger に統合され、もっと自然な形で利用できるようになりました。

Integration with Erlang/OTP logger

This release provides additional features such as global logger metadata and file logging (with rotation and compaction) out-of-the-box!

This release also soft-deprecates Elixir's Logger Backends in favor of Erlang's Logger handlers. Elixir will automatically convert your :console backend configuration into the new configuration. Previously, you would set:

https://github.com/elixir-lang/elixir/blob/v1.15/CHANGELOG.md#integration-with-erlangotp-logger

今回はその覚書です。

使ってみる

設定

実のところ。 ブログの記事にするまでもなく、使い方は Logger のドキュメントの Configuration に例として書かれています。

設定ファイルに次のような設定を書くだけで、ログのファイル出力とローテーションが手に入ります。

import Config

config :logger, :default_handler,
  config: [
    file: ~c"system.log",
    filesync_repeat_interval: 5000,
    file_check: 5000,
    max_no_bytes: 10_000_000,
    max_no_files: 5,
    compress_on_rotate: true
  ]

https://hexdocs.pm/logger/Logger.html#module-configuration

気をつけるところとしては、これは OTP の設定を記述しているので、ファイル名は文字リストで指定する必要があるという点です。

Elixir の文字列(バイナリ)を指定してしまうと次のようなメッセージが表示され、どこにもログが出力されない状態になるのでご注意を。

Could not attach default Logger handler: {:handler_not_added, {:invalid_config, :logger_std_h, %{file: "/Users/matsumotoeiji/Documents/my_app/log/system.log"}}}

個々の設定の説明は OTP のドキュメントに記載されているので参照してください。

www.erlang.org

今回は、ログローテーションを確認したいのでファイルサイズの上限を小さくし、log/ ディレクトリに出力されるようにファイルパスを変更して利用することにします。

config :logger, :default_handler,
  config: [
    file: ~c"log/system.log",
    ...
    max_no_bytes: 1_000,
    ...
  ]

実行

単純なログを出力するサンプルを書いて、実行してみます。

defmodule MyApp do
  require Logger

  def hello do
    1..1_000_000
    |> Enum.each(fn n -> Logger.info("Hello #{n}")
    end)
  end
end
my_app $ iex -S mix
Erlang/OTP 25 [erts-13.2.2.1] [source] [64-bit] [smp:10:10] [ds:10:10:10] [async-threads:1] [jit] [dtrace]

Interactive Elixir (1.15.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> MyApp.hello()
:ok

log/ ディレクトリを確認すると、記録途中の log/system.log のほかに 5 つの圧縮ファイルを確認できます。

-rw-r--r--  1 matsumotoeiji  staff  817  6 24 08:35 log/system.log
-rw-r--r--  1 matsumotoeiji  staff  139  6 24 08:35 log/system.log.0.gz
-rw-r--r--  1 matsumotoeiji  staff  144  6 24 08:35 log/system.log.1.gz
-rw-r--r--  1 matsumotoeiji  staff  128  6 24 08:35 log/system.log.2.gz
-rw-r--r--  1 matsumotoeiji  staff  148  6 24 08:35 log/system.log.3.gz
-rw-r--r--  1 matsumotoeiji  staff  139  6 24 08:35 log/system.log.4.gz

ファイルが 5 つなのは設定で max_no_files: 5 を、圧縮されているのは ompress_on_rotate: true を指定したためです。 また事前にディレクトリを作成しなくても、ログファイル作成時に自動的にディレクトリを作成してくれているのがわかります。

ちなみに。 最初のログファイルは、ハンドラが Logger に割り当てられたタイミングで作成されるようで、今回のように設定ファイルに記述したばあいは起動時に作成されていました。

my_app $ rm -rf log
my_app $ iex -S mix
Erlang/OTP 25 [erts-13.2.2.1] [source] [64-bit] [smp:10:10] [ds:10:10:10] [async-threads:1] [jit] [dtrace]

Interactive Elixir (1.15.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> File.ls("log")
{:ok, ["system.log"]}

装飾を解く

ここで log/system.log をエディタなどで開いてみると ANSI エスケープシーケンス が含まれているのがわかります(表現の都合で、"エスケープコード + [" を "^[[" と書いています)。

^[[22m
08:46:33.127 [info] Hello 794990
^[[0m^[[22m
08:46:33.127 [info] Hello 794991
...

エスケープシーケンスによる着色はコンソールで見る分には便利ですが、ファイルで扱うときは邪魔なことが多いのでこれをオフにします。

出力形式の指定も設定ファイルでおこないますが、今回ここにも変更が入り handler の設定と formatter の設定が分離されました。

ドキュメントにありますが、従来の次のような記述は、

config :logger, :console,
  level: :error,
  format: "$time $message $metadata"

このように解釈されるようになっています。

config :logger, :default_handler,
  level: :error

config :logger, :default_formatter,
  format: "$time $message $metadata"

https://hexdocs.pm/logger/Logger.html#module-backends-and-backwards-compatibility

今後は :default_handler, :default_formatter を使った記述が求められるようです。

さて。 着色をオフにするために、 colors: [enabled: false] を指定します。

import Config

config :logger, :default_handler,
  config: [
    ...
  ]

config :logger, :default_formatter,
  colors: [enabled: false]

設定の詳細は Logger.Formatter のドキュメント を参照してください。

再度ログを出力してみると、こんどはエスケープシーケンスが出力されていないことが確認できると思います。

09:07:22.784 [info] Hello 813833

09:07:22.784 [info] Hello 813834

...

いつか読むはずっと読まない:自然言語の中の構造

自然言語にも当然のように構造があるわけですが、意味でなく形式でことが多々あるというのが興味深い話。 例えば「特定の語が現れたら後続のどこかに特定の助詞が現れないと文章として違和感を感じる」とのことで、たしかに意味はわかるのだけれど読んでいて座りのよくない文章がいくつも例示されています。

よい文章が書けるかはともかく、こういった点を気をつけて、多少なりとも読みやすい文章が書けたらよいな、と思ったり思わなかったり。

ちなみに。 先に「白と黒のとびら: オートマトンと形式言語をめぐる冒険」などを読んでいたので気づかなかったのですが、著者はもともと言語学を学びそこから情報科学の分野に移られたとのこと。 そのことにご本人は思うところがあるようですが、一読者としてはふだん触れる機会の少ない分野とをつないでくれる著者のような存在は、とてもありがたく感じます。

Elixirで書く極々シンプルなWebサーバの覚え書き

ちょっとした Web サーバが欲しくなったときに使えるコードの覚え書きです。

基本の形は Plug のドキュメントに書かれている通りです。

hexdocs.pm

任意の Content-Type のレスポンスを返せるように :mimerl を使って Content-Type を設定するようにしたのが唯一の工夫です。

hex.pm

Mix.install([:plug, :plug_cowboy, :mimerl])

defmodule MyPlug do
  import Plug.Conn

  def init(options) do
    # initialize options
    options
  end

  def call(conn, _opts) do
    path = Path.expand("./" <> conn.request_path, File.cwd!())

    context_type = :mimerl.filename(path)

    conn
    |> put_resp_content_type(context_type)
    |> send_resp(200, File.read!(path))
  end
end

webserver = {Plug.Cowboy, plug: MyPlug, scheme: :http, options: [port: 4000]}
{:ok, _} = Supervisor.start_link([webserver], strategy: :one_for_one)

Process.sleep(:infinity)

あとはファイルに保存して Elixir で実行するだけです。

# 上記のコードを web_server.exs というファイル名で保存し、elixir コマンドで実行する
$ elixir web_server.exs

Phoenix Framework を利用したことがあれば Plug のしくみは馴染みのあるものなので、手を加えるのも比較的容易かと思います。

Phoenix LiveView で無限スクロール

背景という名の釈明

先日から仕事で無限スクロールの処理の修正に関わっていて、「これこそ Phoenix LiveView を使えば簡単なのにな」などと感じたので、LiveView の無限スクロールの記事を書くことにしました。

記事にするコードを書き終え、あとは文章を書くだけ、というところで Elixir Forum を開いたら、Phoenix LiveView で書く無限スクロールの話題が Fly.io の「The Phoenix Files」に掲載されているのを見つけました。

fly.io

正しくは。 無限スクロールを含むチャットアプリケーションの話題なので、わたしが書こうとしていたものよりずっと充実した内容になっています。

さてどうしたものか、と三日三晩ほど思案しましたが、自分のための備忘録ぐらいにはなるだろうと観念して記事にすることにしました。 そもそもこのブログ記事は、大体は自分のための備忘録ですし。

そんなわけで無限スクロールです。

アイテムを一覧表示する、まずは手動で

まずは前段階として、アイテムを一覧表示し、続きは手動で読み込む LiveView を準備します。

lib/my_app_web/live/item_live.ex

MyApp.Item.list_items/2 でアイテムの一覧を取得し表示する単純な LiveView です。 ただし一覧の取得に時間がかかることを前提として、一覧取得の処理を Task.async/1 で実行して、実行中は「Loading...」のメッセージを表示するようにしました。

defmodule MyAppWeb.ItemLive do
  use MyAppWeb, :live_view

  alias MyApp.Item

  @impl true
  def render(assigns) do
    ~H"""
    <.header>Items</.header>

    <div id="items" class="border-t border-b divide-y" phx-update="stream">
      <div :for={{dom_id, item} <- @streams.items} id={dom_id} class="py-4">
        <%= item.content %>
      </div>
    </div>

    <div class="flex justify-center mt-4">
      <.button :if={!@loading} type="button" phx-click="load">load</.button>
      <span :if={@loading}>Loading...</span>
    </div>
    """
  end

  @impl true
  def mount(_params, _session, socket) do
    task =
      if connected?(socket) do
        start_loading_task(0)
      else
        nil
      end

    {
      :ok,
      socket
      |> stream(:items, [])
      |> assign(
        next_page: 1,
        loading: true, # ページを開いたときに読み込みを開始するので最初を true にしています
        task: task
      )
    }
  end

  @impl true
  def handle_event("load", _params, socket) do
    if socket.assigns.loading do
      # 読み込み中に load イベントを受け取ったばあいは何もしないようにします
      {:noreply, socket}
    else
      page = socket.assigns.next_page

      task = start_loading_task(page)

      {:noreply, assign(socket, next_page: page + 1, loading: true, task: task)}
    end
  end

  # タスクの結果を受け取りストリームに追加します
  @impl true
  def handle_info({ref, items}, socket) when socket.assigns.task.ref == ref do
    socket =
      items
      |> Enum.reduce(socket, &stream_insert(&2, :items, &1))
      |> assign(loading: false, task: nil)

    {:noreply, socket}
  end

  # `DOWN` などの、タスクの結果以外のメッセージをハンドルします
  def handle_info(_, socket) do
    {:noreply, socket}
  end

  defp start_loading_task(page) do
    Task.async(fn ->
      Item.list_items(page * 10, 10)
    end)
  end
end

lib/my_app/item.ex

データを取得するモジュールと関数です。 MyApp.Item.list_items/2 は、ちゃんとデータベースのインタフェースとして実装するのもよいのですが、簡単のため今回も擬似的なふるまいをするコードにしています。

defmodule MyApp.Item do
  defstruct [:id, :content]

  def new(id, content) do
    %__MODULE__{id: id, content: content}
  end

  def list_items(offset, size) do
    offset..(offset + size - 1)
    |> Enum.map(fn i ->
      Process.sleep(100) # 1 アイテムあたり取得に 100ms かかる状況を擬似的に表現
      new(i, "Item-#{i}")
    end)
  end
end

lib/my_app_web/router.ex

最後にルーティングを追加します。

     pipe_through :browser
 
     get "/", PageController, :home
+    live "/items", ItemLive
   end
 
   # Other scopes may use custom stacks.

実行

サーバを起動して、 http://localhost:4000/items にアクセスします。 次のような表示になると思います。

mix phx.server

「Load」ボタンを押すと表示が「Loading...」というメッセージに替わり、およそ1秒後にアイテムが10件追加で表示されます。

ここで、ボタンを押す代わりに、ボタンの位置がページ上に表示されるころのタイミングで自動的に読み込むようにすれば、無限スクロールを実現できるはずです。

アイテムを一覧表示する、今度は自動で

assets/js/app.js

ここで非常に残念で悲しいお知らせです。 「ボタンの位置がページ上に表示されるころのタイミングで自動的に読み込むように」するためには、今はまだ JavaScript でフックを記述しなければなりません。

残念でなりませんが、LiveView のおかげで記述しないければならない JavaScript の量を最小限にできるので、それで気を取り直して先に進ことにします。

今回、ある特定の要素がページ上に表示されたら読み込みを開始することにし、そのためのイベントの生成には「交差オブザーバー API」を利用します。

developer.mozilla.org

まず assets/js/app.js に次のコードを追加します。

const Hooks = {}
Hooks.AutoLoad = {
  mounted() {
    this.intersectionObserver = new IntersectionObserver((entries) => {
      if (entries[0].isIntersecting) {
        this.pushEvent('load')
      }
    })
    this.intersectionObserver.observe(this.el)
  },

  disconnected() {
    this.intersectionObserver.unobserve(this.el)
  }
}

次に既存のコードを修正して LiveSocket ソケットのオブジェクトを生成するときのオプションに、上で書いたフックを設定します。

 import topbar from "../vendor/topbar"
 
 let csrfToken = document.querySelector("meta[name='csrf-token']").getAttribute("content")
-let liveSocket = new LiveSocket("/live", Socket, {params: {_csrf_token: csrfToken}})
+let liveSocket = new LiveSocket("/live", Socket, {params: {_csrf_token: csrfToken}, hooks: Hooks})
 
 // Show progress bar on live navigation and form submits
 topbar.config({barColors: {0: "#29d"}, shadowColor: "rgba(0, 0, 0, .3)"})

これにより LiveView で AutoLoad のフックを設定した要素が画面上に現れると load イベントが発生するようになりました。

lib/my_app_web/live/item_live.ex

最後に LiveView を修正します。

まず、ボタンとメッセージを格納している divphx-hook="AutoLoad" を追加します。 フックを設定する要素は ID も設定されている必要があります。 今回はフックのために設定する以外では ID を利用していませんので、ここでは適当に "auto-load" という値を設定しています。

そして、イベントを発生させるボタンは不要になるので、単純なメッセージ表示に変更します。

       </div>
     </div>
 
-    <div class="flex justify-center mt-4">
+    <div id="auto-load" class="flex justify-center mt-4" phx-hook="AutoLoad">
-      <.button :if={!@loading} type="button" phx-click="load">load</.button>
+      <span :if={!@loading}>Load</span>
       <span :if={@loading}>Loading...</span>
     </div>
     """

これで無限スクロールへの書き換えは完了です。 ボタンをクリックしたときと同じイベントを利用しているので、読み込みの処理は変更する必要がありません。

実行

Phoenix は、開発環境であればコードの変更時のリロードが機能しているので、サーバを起動したまま上の2つのコードを変更したのでしたら、すでに表示が切り替わっていると思います。

一覧表示をスクロールして「Load」のメッセージが表示されると、すぐに「Loading...」に切り替わり、読み込みが始まることが確認できます。

実は弱点(欠点)が一つ

すでに気がつかれている方もいらっしゃると思いますが。

この実装では、一覧表示の下にあるメッセージがスクロールして現れるのではなく、最初からページ中に表示されているばあいは読み込みが発生しません。 最初に一覧するアイテムの数をページサイズに合わせて調節できるとよいのですが、書かなければならない JavaScript のコードが増えそうなので、今回は考えないことにしました。

JavaScript のコードを駆逐しつつページサイズに合わせた読み込みをする方法は、宿題とさせてください。

いつか読むはずっと読まない:単弓類、雌伏のとき

「かはく」こと国立科学博物館恐竜博2023 が始まりました。 かはくにはもう3年くらい足を運べていないので、この機に休暇をとって丸一日堪能してこようかと思います。

Phoenix 1.7 とともに LiveView Streams がやってきた

Elixir の Web フレームワークである Phoenix Framework に、待望の 1.7 がやってきました。

phoenixframework.org

やはり今回のバージョンの最大のポイントは、Controller を用いた静的な View と LiveView のテンプレートが、コンポーネントという形で統合されたところだと思います。 これで同じ部品を Controller でも LiveView でも利用できるようになりました。

これについては、いろいろな人がいろいろな記事を書いてくださると思うので、わたしは別のポイントに注目したいと思います。

リリース記事のタイトルにもある「LiveView Streams」です。

これまでの LiveView は要素の削除が苦手だった

苦手というよりも、フレームワークとして専用の機能が提供されていなかった、が正確かもしれません。 フレームワークが持つ機能を組み合わせ、たりない部分をコードで補うことで実現しなければなりませんでした。

このことについては 2020年12月に記事にも書いています。 実に2年以上前。

blog.emattsan.org

コレクションを扱う

上記のような削除の問題は、ある集まり、コレクションを扱うときに重大になってきます。 データベースを扱うケースは、まさにそのようなケースです。 Web アプリケーションのフレームワークとして「ちょっと遅れをとっている」と、個人的に感じていた部分でした。

それも、今回のバージョンアップで、それも解消されたのです。

内部的には、コレクションを抱え込むモジュールが定義され、挿入や削除の操作が標準装備されました。 フロントエンド側にも対応する操作が実装されているため、最小限のデータのやり取りで、コレクションの要素の挿入や一部の更新や削除ができるようになっています。

さっそくその恩恵を、簡単なコードで見てゆきます。

サンプルアプリケーション MyAPP

まずは mix phx.new コマンドを最新の 1.7 に更新して、サンプルのプロジェクトを作成してください。

$ mix archive.install hex phx_new
$ mix phx.new my_app
$ cd my_app

データを扱うモジュール

本格的にデータベースを利用してもよいのですが、簡単に済ませたいのと、データベースなどに影響されない本質だけを見てゆきたいので、データを扱うモジュールを一から書いてゆきます。

ここで MyApp.Items.Item はデータを格納する構造体、 MyApp.Items はデータを扱うモジュールです。 MyApp.Items.Item のリストを LiveView Stream のコレクションとするわけですが、コレクションの要素になるデータには :id という名前のキーが必要になるようです(要素自体に ID を持たせずに済ませる方法もありますが今回は省略)。

また LiveView Streams の効果がわかりやすいように、 MyApp.Items を使ったデータの更新は非同期で、結果はメッセージで受け取るようにしました。 結果を受け取りたいプロセスは、MyApp.Items.subscribe_items/0 で購読登録をし、更新時にブロードキャストされるメッセージを処理するようにします。

defmodule MyApp.Items.Item do
  defstruct [:id, :name]
end
defmodule MyApp.Items do
  use GenServer

  alias MyApp.Items.Item

  @name __MODULE__

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: @name)
  end

  def subscribe_items do
    Phoenix.PubSub.subscribe(MyApp.PubSub, "items")
  end

  def list_items do
    GenServer.call(@name, :list_items)
  end

  def create_item do
    GenServer.cast(@name, :create_item)
  end

  def delete_item(id) when is_binary(id) do
    delete_item(String.to_integer(id))
  end

  def delete_item(id) do
    GenServer.cast(@name, {:delete_item, id})
  end

  def init(opts) do
    size = opts[:size] || 10

    items =
      1..size
      |> Enum.map(fn id ->
        %Item{id: id, name: "item-#{id}"}
      end)

    {:ok, %{items: items, next_id: size + 1}}
  end

  def handle_call(:list_items, _from, %{items: items} = state) do
    {:reply, items, state}
  end

  def handle_cast(:create_item, %{items: items, next_id: next_id} = state) do
    created_item = %Item{id: next_id, name: "item-#{next_id}"}

    Phoenix.PubSub.broadcast(MyApp.PubSub, "items", {:item_created, created_item})

    {:noreply, %{state | items: items ++ [created_item], next_id: next_id + 1}}
  end

  def handle_cast({:delete_item, id}, %{items: items} = state) do
    case Enum.split_with(items, &(&1.id == id)) do
      {[deleted_item], rest} ->
        Phoenix.PubSub.broadcast(MyApp.PubSub, "items", {:item_deleted, deleted_item})
        {:noreply, put_in(state.items, rest)}

      _ ->
        {:noreply, state}
    end
  end
end

最後に、アプリケーションの起動時にプロセスを起動するため MyApp.Application の children に MyApp.Items を追加します。

defmodule MyApp.Application do
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    children = [
      MyApp.Items, # 追加
      # ...
    ]

    # ...
  end
end

LiveView 本体

次にデータを表示する LiveView のモジュールを追加します。 コレクションの要素を一覧するだけのシンプルなものです。

最上部に表示されている「create」ボタンで要素を追加し、各要素欄にある「delete」ボタンでその要素を削除します。 データのところで説明したように処理を非同期にしているので、ボタンを押した時の処理は MyApp.Items.Item の関数を呼び出すだけで、表示の更新はブロードキャストされたメッセージを受け取った時に行うようにしています。

ここでポイントは 4 つ。

  1. コレクションの割り当てに Phoenix.IiveView.stream/4 を使う
  2. 要素の構築は :for={{dom_id, item} <- @streams.items} id={dom_id} のように、@streams.items から ID とデータのペアを取り出し、ID を設定する
  3. 要素の追加に Phoenix.IiveView.stream_insert/4 を使う
  4. 要素の削除に Phoenix.IiveView.stream_delete/3 を使う

ちなみに、要素の更新は Phoenix.IiveView.stream_insert/4 で行います。 ID が同じ要素があるばあいは更新し、ないばあいは挿入するという動きをします。 そのために ID の指定が必須になっています。

2年あまり前に苦戦した要素の削除が、わずか一行ですんでしまいました。 またコレクションの更新を非同期の PubSub で実現したので、すべての LiveView で同時に要素の追加や削除が行われます。

defmodule MyAppWeb.ItemLive do
  use MyAppWeb, :live_view

  alias MyApp.Items
  alias MyApp.Items.Item

  def mount(_params, _session, socket) do
    items =
      if connected?(socket) do
        Items.subscribe_items()
        Items.list_items()
      else
        []
      end

    socket = stream(socket, :items, items)

    {:ok, socket}
  end

  def render(assigns) do
    ~H"""
    <h1 class="text-4xl font-bold mb-2">Items</h1>
    <div class="my-2"><.button phx-click="create-item">create</.button></div>
    <div class="border rounded-lg grid grid-cols-1 divide-y">
      <div :for={{dom_id, item} <- @streams.items} id={dom_id} class="h-12 p-2 flex items-center">
        <span class="flex-1"><%= item.name %></span>
        <.button phx-click="delete-item" phx-value-id={item.id}>delete</.button>
      </div>
    </div>
    """
  end

  def handle_event("create-item", _params, socket) do
    Items.create_item()

    {:noreply, socket}
  end

  def handle_event("delete-item", %{"id" => id}, socket) do
    Items.delete_item(id)

    {:noreply, socket}
  end

  def handle_info({:item_created, created_item}, socket) do
    {:noreply, stream_insert(socket, :items, created_item)}
  end

  def handle_info({:item_deleted, deleted_item}, socket) do
    {:noreply, stream_delete(socket, :items, deleted_item)}
  end
end

ルーティング

最後にルーティングの追加です。

defmodule MyAppWeb.Router do
  use MyAppWeb, :router

  # ...

  scope "/", MyAppWeb do
    pipe_through :browser

    # ...

    live "/items", ItemLive # 追加
  end

  # ...
end

実行

mix phx.server コマンドでアプリケーションを起動し、ブラウザで http://localhost:4000/items を開くと、次のような一覧が表示されると思います。 「create」ボタンを押すと要素が一つ追加され、「delete」ボタンを押すとその要素が削除されます。 ブラウザの複数のウィンドウでこのページを開いて追加したり削除したりすると、連動するのが見て取れると思います。

動作としては「ただそれだけ」のことなのですが、ただそれだけのことを、ただこれでだけで実現できるインパクトは決して小さくありません。

LiveView Streams にかぎらず、今回の 1.7 に搭載された機能は Phoenix をもう一段高いレベルに押し上げた感じが、個人的にはしています。

余談

このサンプルコードを書き上げたあと、Elixir Forum の Phoenix 1.7 のリリースを告げるスレッドの中に、次の投稿を見つけて「やはり」と思ったのでした。

josevalim
The win for streams will come when pushing updates to LV via PubSub. It is not in the generated code but streams get us closer to that and adding the remaining PubSub bits should be easy.

elixirforum.com