エンジニアのソフトウェア的愛情

または私は如何にして心配するのを止めてプログラムを・愛する・ようになったか

Glyph Bitmap Distribution Format (BDF) を Elixir で読み込む

Glyph Bitmap Distribution Format (BDF) というフォントフォーマットがあります。

en.wikipedia.org

記事の最後の方に書いたような理由があって、 BDF を読み込むパッケージを書いています。

道半ばなのですが、お試しで使えるくらいにはまとまったので、一旦出力しておこうと思った次第。

github.com

今回書いた BDF をパースする方法は、仕様上は安全でない可能性があるのですが、それに関しては記事を改めて考察することにしたいと思います。

BDF をコンソールに表示してみる

これを使って実際に文字を表示してみます。 まずは iex でお手軽に試します。

パッケージをインストールする

まだ hex.pm に公開していないので、GitHubリポジトリを指定してインストールしてください。

mix.exs でインストールする場合:

  defp deps do
    [
      {:bdf, github: "mattsan/bdf"}
    ]
  end

iex やスクリプトファイルでインストールする場合:

Mix.install([{:bdf, github: "mattsan/bdf"}])

フォントファイルを入手する

BDF ファイルを用意して読み込みます。

今回はパブリックドメインで公開されている東雲フォントを利用させていただきました。

openlab.ring.gr.jp

表示したいフォントデータを取得する

東雲フォントは文字コードに JIS を利用しています。 必要に応じて JIS X 0213のコード対応表 などを利用して表示したい文字の文字コードを確認し、フォントデータを取得します。

今回はサンプルということで、特に効率などを考慮せず Enum.find/2 で線形検索しています。

{:ok, fonts} = BDF.load("path/to/shnmk16.bdf") # 入手した BDF ファイルのパスを指定します

font = Enum.find(fonts, & &1.encoding == 0x4E6E)

表示する

取得したフォントデータは、点の一つ一つが 1 ビットで表現されているので、それらのビットを見える形に展開して表示します。

Enum.each(font.bitmap, fn row ->
  for <<dot::1 <- <<row::size(font.bbx.bbw)>> >> do
    case dot do
      0 -> " ."
      1 -> "@@"
    end
  end
  |> Enum.join()
  |> IO.puts()
end)

結果。

 . . . . . . . . . . . . . . . .
 . .@@@@@@@@@@@@@@@@@@@@@@@@ . .
 . . . . . . .@@ . . . . . . . .
 .@@@@@@@@@@@@@@@@@@@@@@@@@@@@ .
 .@@ . . . . .@@ . . . . . .@@ .
 .@@@@@@@@@@ .@@ .@@@@@@@@ .@@ .
 .@@ . .@@@@@@@@ . .@@@@@@@@@@ .
 . . . . . . . . . . . . . . . .
 . . .@@@@@@@@@@@@@@@@@@@@ . . .
 . . . . . . . . . . . . . . . .
 .@@@@@@@@@@@@@@@@@@@@@@@@@@@@ .
 . . . . . .@@ . .@@ . .@@ . . .
 . . .@@ . .@@ . .@@ . .@@ . . .
 . . . .@@ .@@ . .@@ .@@ . . . .
 .@@@@@@@@@@@@@@@@@@@@@@@@@@@@ .
 . . . . . . . . . . . . . . . .

BDF を Livebook で表示してみる

Livebook を使えば簡単に画像で確認することもができるので、これもやってみます。

パッケージをインストールする

mattsan/bdf の他に、画像を表示すために kino と、PNG データを生成するために拙作の pngex も合わせてインストールします。

Mix.install([
  {:kino, "~> 0.12.3"},
  {:pngex, "~> 0.1.2"},
  {:bdf, github: "mattsan/bdf"}
])

BDF ファイルを読み込む

読み込みは iex で試したときと変わりません。 ダウンロードした BDF ファイルを読み込みます。

{:ok, fonts} = BDF.load("path/to/shnmk16.bdf")

フォントデータを画像に変換する

pngex を使って画像に変換します。 ここではパレットカラーを使い、 1 ビット / ドットの画像を生成しています。

palette = [
  {255, 255, 255}, # 背景: 白
  {0, 0, 0}        # 文字: 黒
]

font1 = Enum.find(fonts, & &1.encoding == 0x4E6E)
font2 = Enum.find(fonts, & &1.encoding == 0x4C74)

bitmap =
  for row <- font1.bitmap ++ font2.bitmap, into: <<>> do
    <<row::size(font1.bbx.bbw)>>
  end

Pngex.new()
|> Pngex.set_type(:indexed)
|> Pngex.set_depth(:depth1)
|> Pngex.set_size(font1.bbx.bbw, font1.bbx.bbh + font2.bbx.bbh)
|> Pngex.set_palette(palette)
|> Pngex.generate(bitmap)
|> IO.iodata_to_binary()

Livebook での表示の様子。

表示が小さいので縦横 4 倍のサイズにしてみます。

palette = [
  {255, 255, 255}, # 背景: 白
  {0, 0, 0}        # 文字: 黒
]

font1 = Enum.find(fonts, & &1.encoding == 0x4E6E)
font2 = Enum.find(fonts, & &1.encoding == 0x4C74)

bitmap =
  for row <- font1.bitmap ++ font2.bitmap, into: <<>> do
    line =
      for <<dot::1 <- <<row::size(font1.bbx.bbw)>> >>, into: <<>> do
        case dot do
          0 -> <<0::4>>
          1 -> <<0xF::4>>
        end
      end
    String.duplicate(line, 4)
  end

Pngex.new()
|> Pngex.set_type(:indexed)
|> Pngex.set_depth(:depth1)
|> Pngex.set_size(font1.bbx.bbw * 4, (font1.bbx.bbh + font2.bbx.bbh) * 4)
|> Pngex.set_palette(palette)
|> Pngex.generate(bitmap)
|> IO.iodata_to_binary()

解像度の低いディスプレイに似合いそうな表示が得られました。

そんなわけで、Nerves 再起動

開発が進んでいる様子のコメントがされつつも、なかなか公開に至っていなかった Circuits.GPIO ですが、満を持してバージョン 2.0 が今月公開されました。

hex.pm

それに刺激を受けて、5 年ほど放置していた Nerves プログラミングを再開。

nerves-project.org

(デバイスRPI-ZERO-WH Raspberry Pi Zero WH【ピンヘッダ実装済】 + WaveShare 13891 1.44インチ 128×128 LCDディスプレイHAT for RaspberryPi

そのうち、何かおもしろいものを出力できるといいな。 そのうち。

いつか読むはずっと読まない:神経網計画

nextpublishing.jp pragprog.com pragprog.com

40年のソフトウェア的愛情〜または私は如何にして心配するのを止めてプログラマであったか

以前、プログラミングを始めて 30 年が経ちましたという記事を書きました。

blog.emattsan.org

それから 10 年が過ぎました。 気づくと 40 年。

職業プログラマに転向して 10 年。 今もプログラマを続けています。

好きでプログラミングを続けているとはいえ、仕事をしていると疲労もストレスも感じます。 そんなときは好きなプログラミングで疲れを癒す。

そんな日々を送っています。 たぶんこれからもそんな日々を続けるのだろう、と思いをはせる年の瀬。

41 年目もよろしくお願いします。

Phoenix LiveView の assign_async と async_result

Phoenix LiveView で値を socket に assign するとき、その値が例えば Web API などで取得しなければならないとき、一旦待ち状態を設定して、それから Task.async などを利用して、結果が得られたら取得できた値を設定し直すという非同期の処理を行うわけですが。

そういった使い方が多かったためなのか、 LiveView 0.20.0 では非同期で assign をおこなう Phoenix.LiveView.assign_async/3 と、その状態を表示する Phoenix.Component.async_result/1 が追加されていました。 9 月末に追加されていたのですが、すっかり見落としていました。

関数の追加に合わせて、ドキュメントにも一節が追加されています。

今後、繰り返し利用することになりそうなので、これらの使い方を確認してみました。

assign_async & async_result

基本的な使い方はそれほど難しくありません。

まず、値の設定には assign/3 の代わりに assign_async/3 を利用します。

第 2 引数は assign/3 と同じようにキーを指定しますが、第 3 引数には非同期で実行する関数を指定します。 その関数は、戻り値として {:ok, result}{:error, reason} の形の値を返す必要があります。 また result の値は assign_async/3 の第 2 引数に渡したキーを持つマップでなければなりません。

assign_async(socket, :foo, fn ->
  # 非同期で実行したい処理

  {:ok, %{foo: 42}} # 処理に成功したばあい、第 2 引数に渡したキーと同じキーを持つマップを返す
end)

assign_async/3 の第 3 引数に渡した関数の結果は async_result/1 で受けることができます。

async_result/1assign で指定したキーを指定し、:let で値を受け取ります。 内部のブロックは関数が成功して値を返するまで表示されません。

また async_result/1:loading:failed の 2 つのスロットを持っています。

:loading は関数が完了するまでのあいだ表示さるものです。

:failed は関数がエラーを返したときに表示されます。 エラーの内容は :let で受けることができます。 内容は関数の戻り値そのままになっています。

これらのスロットは省略可能です。

defmodule MyAppWeb.PageLive do
  use MyAppWeb, :live_view

  def render(assigns) do
    ~H"""
    <.async_result :let={result} assign={@result}>
      result = <%= result %>
      <:loading>waiting...</:loading>
      <:failed :let={ {:error, reason} }><%= reason %></:failed>
    </.async_result>
    """
  end

  def mount(_params, _session, socket) do
    socket =
      socket
      |> assign_async(:result, fn ->
        Process.sleep(2_000)     # 時間のかかる処理の代わり
        case :rand.uniform(2) do # 2 分の 1 で成功/失敗を返す
          1 -> {:ok, %{result: 42}}
          2 -> {:error, "no result"}
        end
      end)

    {:ok, socket}
  end
end

ちなみに。 関数の戻り値をマップにしなければならない理由は、一回の結果で複数の値を返せるようにするためのようです。 次の例ように assign_async/3 の第 2 引数はリストで複数のキーを指定することができ、そのキーごとに async_result/1 を記述することができます。

  def render(assigns) do
    ~H"""
    <.async_result :let={foo} assign={@foo}><%= foo %></.async_result>
    <.async_result :let={bar} assign={@bar}><%= bar %></.async_result>
    <.async_result :let={baz} assign={@baz}><%= baz %></.async_result>
    """
  end

  def mount(_params, _session, socket) do
    socket =
      socket
      |> assign_async([:foo, :bar, :baz], fn ->
        Process.sleep(2_000)
        {:ok, %{foo: "Foo", bar: "Bar", baz: "Baz"}}
      end)
    {:ok, socket}
  end

ページを表示したあとに、ページ上のイベントで処理を実行したいばあいも assign_async/3 が利用できます。

defmodule MyAppWeb.PageLive do
  use MyAppWeb, :live_view

  def render(assigns) do
    ~H"""
    <.async_result :let={result} assign={@result}>
      result = <%= result %>
      <:loading>waiting...</:loading>
      <:failed :let={{:error, reason}}><%= reason %></:failed>
    </.async_result>

    <div>
      <.button phx-click="rerun">再実行</.button>
    </div>
    """
  end

  def mount(_params, _session, socket) do
    socket =
      socket
      |> assign_async(:result, &run/0)

    {:ok, socket}
  end

  # 再実行イベントのハンドラ
  def handle_event("rerun", _params, socket) do
    socket =
      socket
      |> assign_async(:result, &run/0)

    {:noreply, socket}
  end

  # 再実行イベントでも利用するので、独立した関数に分離した
  defp run do
    Process.sleep(2_000)

    case :rand.uniform(2) do
      1 -> {:ok, %{result: 42}}
      2 -> {:error, "no result"}
    end
  end
end

Phoenix.LiveView.AsyncResult

ちなみに assign_async/3 で割り当てられた値はどのようになっているかというと。

rener/1 の中に <%= inspect(@result) %> を挿入するなどしてその値を覗いてみます。

  • 実行中
%Phoenix.LiveView.AsyncResult{ok?: false, loading: [:result], failed: nil, result: nil}
  • 成功
%Phoenix.LiveView.AsyncResult{ok?: true, loading: nil, failed: nil, result: 42}
  • 失敗
%Phoenix.LiveView.AsyncResult{ok?: false, loading: nil, failed: {:error, "no result"}, result: nil}

構造体 Phoenix.LiveView.AsyncResult に値が格納されていることがわかります。

このことを理解しておくことが、もう一つの関数を利用するときに重要になります。

start_async & handle_async

今回のバージョンアップで、 assign_async/3async_result/1 に加えてもう一つ、関数 start_async/3 が追加されています。

start_async/3 を使うと assign_async/3 と比べべてより細かな制御ができるようになっています。

その代わり、構造体 Phoenix.LiveView.AsyncResult のキーへの割り当てと、結果を反映するためにハンドラ handle_async/3 は自分で記述しなければなりません。

start_async/3

まず、 Phoenix.LiveView.AsyncResult.loading/0-1 を使ってキーに割り当てる値を作成します。 ここで引数に任意の値を渡すことができます。

assign(socket, :result, Phoenix.LiveView.AsyncResult.loading())

# もしくは、引数に任意の値を指定する
assign(socket, :result, Phoenix.LiveView.AsyncResult.loading("実行中"))

引数で渡した値は、スロット :loading:let を使って受け取ることができます。

<:loading :let={loading}><%= loading %></:loading>

キーへの割り当てができたら、 start_async/3 で非同期処理を実行します。

    socket =
      socket
      |> assign(:result, Phoenix.LiveView.AsyncResult.loading("実行中"))
      |> start_async(:result, &run/0)

なお、今回は使用しませんが、構造体の値を更新する loading/2 も用意されています。 非同期処理を多段階で実行するときに活用できそうです。

socket
|> assign(:result, Phoenix.LiveView.AsyncResult.loading(1))

...

socket
|> update(:result, &Phoenix.LiveView.AsyncResult.loading(&1, &1.loading + 1))
<:loading :let={step}>ステップ <%= step %> を実行中</:loading>

handle_async/3

結果は、ハンドラ handle_async/3 で受け取ります。 第 1 引数には assign したキーを指定します。

タスク成功時

注意が必要なのは第 2 引数で、ここには非同期処理を実行したタスクの結果が渡されてきます。 {:ok, task_result} の形で受け取る値はタスクの実行結果であり、処理の実行結果ではありません。 処理の実行結果は task_result の内容を調べる必要があります。

結果を表示に反映するには、最初に割り当てた値を ok/2 または failed/2 を使って更新します。

ハンドラの全体は次のようになります。 成功時、失敗時、それぞれ扱う値がやや込み入っているので注意が必要です。

  def handle_async(:result, {:ok, task_result}, socket) do
    socket =
      case task_result do
        {:ok, %{result: result}} ->
          # 関数の成功時の処理
          socket
          |> update(:result, &Phoenix.LiveView.AsyncResult.ok(&1, result))

        {:error, _reason} = error ->
          # 関数の失敗時の処理
          socket
          |> update(:result, &Phoenix.LiveView.AsyncResult.failed(&1, error))
      end

    {:noreply, socket}
  end

タスク失敗時

handle_async/3 の第 2 引数はタスクの実行結果なので、処理が完了したときの結果は :ok のタプルで渡されてきますが、例外などで処理が完了しなかったばあいには :exit のタプルが渡されてきます。

そのときの第 2 引数は {:exit, reason} という形になり、原因が例外だったばあいには result{exception, stacktrace} の形になります。

ハンドリングした例外の内容は failed/2 で設定して表示することができます。

  def handle_async(:result, {:exit, {exception, _stacktrace}}, socket) do
    socket =
      socket
      |> update(:result, &Phoenix.LiveView.AsyncResult.failed(&1, {:error, Exception.message(exception)}))

    {:noreply, socket}
  end

まとめ

コードの全体はこのようになりました。

defmodule MyAppWeb.PageLive do
  use MyAppWeb, :live_view

  def render(assigns) do
    ~H"""
    <.async_result :let={result} assign={@result}>
      result = <%= result %>
      <:loading :let={loading}><%= loading %></:loading>
      <:failed :let={{:error, reason}}><%= reason %></:failed>
    </.async_result>

    <div>
      <.button phx-click="rerun">再実行</.button>
    </div>
    """
  end

  def mount(_params, _session, socket) do
    socket =
      socket
      |> assign(:result, Phoenix.LiveView.AsyncResult.loading("実行中"))
      |> start_async(:result, &run/0)

    {:ok, socket}
  end

  # 再実行イベントのハンドラ
  def handle_event("rerun", _params, socket) do
    socket =
      socket
      |> assign(:result, Phoenix.LiveView.AsyncResult.loading("再実行中"))
      |> start_async(:result, &run/0)

    {:noreply, socket}
  end

  # 関数完了イベントのハンドラ
  def handle_async(:result, {:ok, task_result}, socket) do
    socket =
      case task_result do
        {:ok, %{result: result}} ->
          socket
          |> update(:result, &Phoenix.LiveView.AsyncResult.ok(&1, result))

        {:error, _reason} = error ->
          socket
          |> update(:result, &Phoenix.LiveView.AsyncResult.failed(&1, error))
      end

    {:noreply, socket}
  end

  # 例外発生時のハンドラ
  def handle_async(:result, {:exit, {exception, _stacktrace}}, socket) do
    socket =
      socket
      |> update(:result, &Phoenix.LiveView.AsyncResult.failed(&1, {:error, Exception.message(exception)}))

    {:noreply, socket}
  end

  defp run do
    Process.sleep(2_000)

    case :rand.uniform(3) do # 3 分の 1 で成功/失敗/例外を返す
      1 -> {:ok, %{result: 42}}
      2 -> {:error, "no result"}
      3 -> raise "Boom"
    end
  end
end

いつか読むはずっと読まない:恐竜前、恐竜後

恐竜前の単弓類の時代と、恐竜後の単弓類の時代。

ElixirでCompositeパタン風な構造を扱う時の覚書

en.wikipedia.org

Compositeパタンは、再起的な構造を表現するときにしばしば顔を出すパタンで、node と leaf に同じイタンフェースを与えることで、それらを同一視して再起的に扱えるようにするしくみです。

Elixir は型付けが動的なので、特別な工夫をしなくても再起的な構造を作れますが、同一視して扱えるようにするには少し工夫必要です。

例えば。 一つの値をもつ leaf と、leaf もしくは node を二つ持つ node からtree を構築し、traverse ですべての値を渡り歩きたいばあい。

leaf1 = Tree.new_leaf(1)
leaf2 = Tree.new_leaf(2)
leaf3 = Tree.new_leaf(3)
leaf4 = Tree.new_leaf(4)
leaf5 = Tree.new_leaf(5)
leaf6 = Tree.new_leaf(6)

tree =
  Tree.new_node(
    Tree.new_node(
      Tree.new_node(leaf1, leaf2),
      leaf3
    ),
    Tree.new_node(
      leaf4,
      Tree.new_node(leaf5, leaf6)
    )
  )

Tree.Component.traverse(tree, fn a -> IO.inspect(a) end)
$ mix run sample.exs 
1
2
3
4
5
6

このとき Node が持つ二つの要素が Leaf なのか Node なのかを区別せずに関数を適用したいわけですが、その関数が LeafNode をパタンマッチで識別するようでは実装が硬直してしまいます。

このようなばあい、traverse/2プロトコルで定義し、それぞれの構造体で定義することで実現します。

具体的には、次のような実装が考えられます。

defmodule Tree do
  alias Tree.{Leaf, Node}

  def new_leaf(value) do
    Leaf.new(value)
  end

  def new_node(left, right) do
    Node.new(left, right)
  end
end
defprotocol Tree.Component do
  def traverse(component, fun)
end
defmodule Tree.Leaf do
  defstruct [:value]

  def new(value) do
    %__MODULE__{value: value}
  end

  defimpl Tree.Component do
    def traverse(%Tree.Leaf{value: value}, fun) do
      fun.(value)
    end
  end
end
defmodule Tree.Node do
  defstruct [:left, :right]

  def new(left, right) do
    %__MODULE__{left: left, right: right}
  end

  defimpl Tree.Component do
    def traverse(%Tree.Node{left: left, right: right}, fun) do
      Tree.Component.traverse(left, fun)
      Tree.Component.traverse(right, fun)
    end
  end
end

ここで問題になるのが、プロトコルを実装していない値が含まれてしまったばあいです。

Node 向けの traverse/2 の実装は、leftrighttraverse/2 を定義したプロトコルを実装した構造体の値であることを前提にしています。 ですが、Elixir は型付けが動的なゆえに、このままでは任意の値を格納した Node の値を作れてしまいます。

そこで Tree.new_node/2Node の値を作るときに、引数に渡せる値を制限することにします。

まず、ガードの is_struct/1 を使うことで、引数に与えることができる値を構造体の値に制限します。

is_struct/2 を使えば、構造体の種類も限定できますが、今回は Leaf の値も Node の値も受け付けたいので都合がよくありません。 is_struct(left, Tree.Leaf) or is_struct(left, Tree.Node) と書けなくはないですが、プロトコルを利用するときの利点であったコードの柔軟性が失われてしまいます。

プロトコルの実装状況をガードで検証したいところですが、残念ながらそのようなガードは用意されていないようです。 ただ、関数は用意されているのでこれを利用することにします。

Protocol.assert_impl!/2 は、第 1 引数にプロトコルを、第 2 引数に構造体の型になるモジュールを取ります。 また Elixir の構造体の値は、__struct__ を参照することでその値の方になるモジュールがわかるようになっています。

leaf = Tree.new_leaf(123)
#=> %Tree.Leaf{value: 123}

leaf.__struct__
#=> Tree.Leaf

node = Tree.new_node(Tree.new_leaf(123), Tree.new_leaf(456))
#=> %Tree.Node{left: %Tree.Leaf{value: 123}, right: %Tree.Leaf{value: 456}}

node.__struct__
#=> Tree.Node

これらを組み合わせてプロトコルを実装した型で引数を制限することにします。 ここでは Tree.new_node/2 の引数を検証することで、期待しない値が tree に組み込まれるのを防ぐことにします。

これで少なくとも traverse/2 を適用する時点でなく、値を構築する時点で異常を検出できるようになりました。

defmodule Tree do
  alias Tree.{Leaf, Node}

  def new_leaf(value) do
    Leaf.new(value)
  end

  def new_node(left, right) when is_struct(left) and is_struct(right) do
    Protocol.assert_impl!(Tree.Component, left.__struct__)
    Protocol.assert_impl!(Tree.Component, right.__struct__)

    Node.new(left, right)
  end
end

なお、補足として。 Tree.Node の内部構造や Tree.Node.new/2 はあくまで非公開というのが前提になります。

ElixirのGenServerでTaskを使うための補遺

前回、GenServer の渋滞を解消するために Task を利用する方法を紹介しました。

blog.emattsan.org

これは Task のプロセスが異常終了しないことを前提にしていて、異常終了が予想されるばあいには、その対策を施しておく必要があります。

結論から言うと Task.Supervisor.async_nolink/3 を利用するとよいようです。

詳細は TaskTask.Supervisor に記載されていますので、そちらを参照してみてください。

以下は、GenServer の中で Task プロセスが異常終了したときのふるまいを検証した記録です。

対策しない

まず、対策しなかったときのふるまいを確認します。

コード

do_something/0 を呼ぶと、handle_call/3 の中で Task のプロセスを起動します。

Task のプロセスは起動するとすぐに例外を送出します。

defmodule MyApp.Worker1 do
  use GenServer

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def do_something do
    GenServer.call(__MODULE__, :do_something)
  end

  @impl true
  def init(_opts) do
    {:ok, %{}}
  end

  @impl true
  def handle_call(:do_something, _from, state) do
    Task.async(fn ->
      raise "Boom!"
    end)
    {:noreply, state}
  end

  @impl true
  def handle_info(msg, state) do
    dbg msg
    {:noreply, state}
  end
end

実行

iex -S mix で起動し、実行します。

iex(1)> {:ok, pid} = MyApp.Worker1.start_link()
{:ok, #PID<0.167.0>}

iex(2)> Process.info(pid)
[
  registered_name: MyApp.Worker1,
  ...略...
]

iex(3)> MyApp.Worker1.do_something()

11:32:48.607 [error] Task #PID<0.168.0> started from MyApp.Worker1 terminating
** (RuntimeError) Boom!
    (my_app 0.1.0) lib/my_app/worker_1.ex:20: anonymous fn/0 in MyApp.Worker1.handle_call/3
    (elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
    (elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4
Function: #Function<0.95600963/0 in MyApp.Worker1.handle_call/3>
    Args: []
** (EXIT from #PID<0.166.0>) shell process exited with reason: an exception was raised:
    ** (RuntimeError) Boom!
        (my_app 0.1.0) lib/my_app/worker_1.ex:20: anonymous fn/0 in MyApp.Worker1.handle_call/3
        (elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
        (elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4

Interactive Elixir (1.15.6) - press Ctrl+C to exit (type h() ENTER for help)

タスクとリンクしている MyApp.Worker1 のプロセスが終了し、そのプロセスにリンクしている iex のプロセスも終了して iex が再起動していることがわかります。

EXIT をトラップする

trap_exit フラグを true にして EXIT をトラップする方法があります。

しかし一律でトラップしてしまうため、予想できない影響が出ることも考えられます。 Task.async/1 のドキュメントにも注意を促す但書がついています。

  • Setting :trap_exit to true - trapping exits should be used only in special circumstances as it would make your process immune to not only exits from the task but from any other processes.

コード

init/1Process.flag(:trap_exit, true) を実行し EXIT のトラップを指定しています。

defmodule MyApp.Worker2 do
  use GenServer

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def do_something do
    GenServer.call(__MODULE__, :do_something)
  end

  @impl true
  def init(_opts) do
    Process.flag(:trap_exit, true)
    {:ok, %{}}
  end

  @impl true
  def handle_call(:do_something, _from, state) do
    Task.async(fn ->
      raise "Boom!"
    end)
    {:noreply, state}
  end

  @impl true
  def handle_info(msg, state) do
    dbg msg
    {:noreply, state}
  end
end

実行

タスクのプロセスで例外が送出されたあと、GenServer のプロセスが :DOWN 以外に :EXIT のメッセージを受け取っているのがわかります。

iex(1)> {:ok, pid} = MyApp.Worker2.start_link()
{:ok, #PID<0.143.0>}

iex(2)> Process.info(pid)
[
  registered_name: MyApp.Worker2,
  ...略...
]

iex(3)> MyApp.Worker2.do_something()

11:34:22.007 [error] Task #PID<0.144.0> started from MyApp.Worker2 terminating
** (RuntimeError) Boom!
    (my_app 0.1.0) lib/my_app/worker_2.ex:21: anonymous fn/0 in MyApp.Worker2.handle_call/3
    (elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
    (elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4
Function: #Function<0.111817085/0 in MyApp.Worker2.handle_call/3>
    Args: []
[lib/my_app/worker_2.ex:28: MyApp.Worker2.handle_info/2]
msg #=> {:EXIT, #PID<0.144.0>,
 {%RuntimeError{message: "Boom!"},
  [
    {MyApp.Worker2, :"-handle_call/3-fun-0-", 0,
     [
       file: ~c"lib/my_app/worker_2.ex",
       line: 21,
       error_info: %{module: Exception}
     ]},
    {Task.Supervised, :invoke_mfa, 2,
     [file: ~c"lib/task/supervised.ex", line: 101]},
    {Task.Supervised, :reply, 4, [file: ~c"lib/task/supervised.ex", line: 36]}
  ]}}

[lib/my_app/worker_2.ex:28: MyApp.Worker2.handle_info/2]
msg #=> {:DOWN, #Reference<0.0.18307.2349639668.3769958402.28027>, :process,
 #PID<0.144.0>,
 {%RuntimeError{message: "Boom!"},
  [
    {MyApp.Worker2, :"-handle_call/3-fun-0-", 0,
     [
       file: ~c"lib/my_app/worker_2.ex",
       line: 21,
       error_info: %{module: Exception}
     ]},
    {Task.Supervised, :invoke_mfa, 2,
     [file: ~c"lib/task/supervised.ex", line: 101]},
    {Task.Supervised, :reply, 4, [file: ~c"lib/task/supervised.ex", line: 36]}
  ]}}

なおこのコードでは call/3 に対して reply を返していないために 5 秒ごにタイムアウトの例外が発生しますが、その点は割愛します。 対処として、タスクのプロセス終了時に GenServer.reply/2 を使って応答を返す方法を前回のブログで説明していますので、そちらも参照してみてください。

リンクしない

EXIT とトラップしないばあいに、タスクのプロセスの異常終了につられて GenServer のプロセスも異常終了するのはリンクしているためです。

Task.Supervisor.async_nolink/3 を利用すると、タスクのプロセスの管理は Task.Supervisor にまかせ、プロセスをリンクせずにタスクを利用することが可能になります。

コード

defmodule MyApp.Worker3 do
  use GenServer

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def do_something do
    GenServer.call(__MODULE__, :do_something)
  end

  @impl true
  def init(_opts) do
    {:ok, %{}}
  end

  @impl true
  def handle_call(:do_something, _from, state) do
    Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn ->
      raise "Boom!"
    end)
    {:noreply, state}
  end

  @impl true
  def handle_info(msg, state) do
    dbg msg
    {:noreply, state}
  end
end

先に Task.Supervisor のプロセスを起動しておく必要があるので、MyApp.Application を追加しプロセスを起動する設定を記述しておきます。

defmodule MyApp.Application do
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    children = [
      {Task.Supervisor, name: MyApp.TaskSupervisor}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

--sup オプションなしでプロジェクトを作成したばあいは、mix.exsapplication/0mod: {MyApp.Application, []} を追加することを忘れないでください。

defmodule MyApp.MixProject do
  use Mix.Project

  ......

  def application do
    [
      extra_applications: [:logger],
      mod: {MyApp.Application, []}
    ]
  end

  ......
end

実行

タスクのプロセスで例外が送出されたあと、:DOWN のみ受け取っていることがわかります。

iex(1)> {:ok, pid} = MyApp.Worker3.start_link()
{:ok, #PID<0.143.0>}

iex(2)> Process.info(pid)
[
  registered_name: MyApp.Worker3,
  ...略...
]

iex(3)> MyApp.Worker3.do_something()

11:36:07.211 [error] Task #PID<0.144.0> started from MyApp.Worker3 terminating
** (RuntimeError) Boom!
    (my_app 0.1.0) lib/my_app/worker_3.ex:20: anonymous fn/0 in MyApp.Worker3.handle_call/3
    (elixir 1.15.6) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
    (elixir 1.15.6) lib/task/supervised.ex:36: Task.Supervised.reply/4
Function: #Function<0.16324509/0 in MyApp.Worker3.handle_call/3>
    Args: []
[lib/my_app/worker_3.ex:27: MyApp.Worker3.handle_info/2]
msg #=> {:DOWN, #Reference<0.0.18307.1968646231.2159083530.717>, :process,
 #PID<0.144.0>,
 {%RuntimeError{message: "Boom!"},
  [
    {MyApp.Worker3, :"-handle_call/3-fun-0-", 0,
     [
       file: ~c"lib/my_app/worker_3.ex",
       line: 20,
       error_info: %{module: Exception}
     ]},
    {Task.Supervised, :invoke_mfa, 2,
     [file: ~c"lib/task/supervised.ex", line: 101]},
    {Task.Supervised, :reply, 4, [file: ~c"lib/task/supervised.ex", line: 36]}
  ]}}

Scalability and partitioning

なおドキュメントにあるように、Task.Supervisor はシングルプロセスのため、それがボトルネックになる可能性があるとのこと。

それを対処するために PartitionSupervisor を利用する例が記載されています。 実装してみます。

MyApp.Application で PartitionSupervisor のプロセスを起動するように変更します。

defmodule MyApp.Application do
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    children = [
      {PartitionSupervisor, child_spec: Task.Supervisor, name: MyApp.TaskSupervisors}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

handle_call/3 でタスクのプロセスを起動しているコードも、PartitionSupervisor を指定して起動するように変更します。

  def handle_call(:do_something, _from, state) do
    Task.Supervisor.async_nolink({:via, PartitionSupervisor, {MyApp.TaskSupervisors, self()}}, fn ->
      raise "Boom!"
    end)
    {:noreply, state}
  end

書くのも読むのも大変になってきたので、モジュールを追加してコードを移動します。

MyApp.TaskSupervisor を追加して、タスクのプロセスを起動する関数を用意します。

defmodule MyApp.TaskSupervisor do
  def async_nolink(fun, options \\ []) do
    Task.Supervisor.async_nolink(
      {:via, PartitionSupervisor, {MyApp.TaskSupervisors, self()}},
      fun,
      options
    )
  end
end

この関数を使って handle_call/3 を書き換えます。

  def handle_call(:do_something, _from, state) do
    MyApp.TaskSupervisor.async_nolink(fn ->
      raise "Boom!"
    end)
    {:noreply, state}
  end

これで見やすくなりました。

ElixirのGenServer.callの渋滞を解消するための覚書

時間のかかる処理を複数実行したいとき、各々の処理にプロセスを起動して並行に実行することがあります。 しかし、同じ GenServer プロセスに対して GenServer.call/3 で呼び出してしまうと、GenServer.handle_call/3 で処理が順番待ちになってしまい並行した恩恵を受けることができません。

例として GenServer を実装したモジュール MyApp.Worker の、処理に 1 秒かかる関数 do_heavy/1 を呼び出すことを考えます。

きちんと時間を計測するには Benchee などを利用した方がよいのですが、今回は簡易的に。

# benchmark.exs

start_time = Time.utc_now()

1..3
|> Enum.map(fn n ->
  Task.async(fn ->
    n
    |> MyApp.Worker.do_heavy()
    |> IO.inspect()
  end)
end)
|> Task.await_many()

end_time = Time.utc_now()

Time.diff(end_time, start_time, :millisecond) |> IO.puts()

MyApp.Worker を実装します。 呼び出されると 1 秒待ってから引数で渡された値を返すだけです。

defmodule MyApp.Worker do
  use GenServer

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def do_heavy(n) do
    GenServer.call(__MODULE__, {:do_heavy, n})
  end

  def init(_) do
    {:ok, %{}}
  end

  def handle_call({:do_heavy, n}, _from, state) do
    Process.sleep(1_000)
    {:reply, {:ok, n}, state}
  end
end

MyApp.Worker のプロセスを起動する MyApp.Application も実装します。

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [MyApp.Worker]
    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

冒頭に書いたスクリプトを実行します。 Task.async/1 で別プロセスで実行しているにもかかわらず、全体で 3 秒かかりました。

$ mix run benchmark.exs
{:ok, 1}
{:ok, 2}
{:ok, 3}
3005

そこで GenServer 内部の処理を非同期化します。

まず Task.async/1を使って処理を別プロセスで実行します。 タスクのリファレンス値(タスクを識別するために利用する値)をキーにして呼び出し元を示す from の値を state に記録します。

Task.async/1 の処理が終了すると、タスクのリファレンス値と結果をタプルにしたメッセージが送られるので GenServer.handle_info/2 で値を受け取ります。 受け取ったリファレンス値で state に記録した呼び出し元の情報を取り出します。

最後に GenServer.reply/2 を使って、MyApp.Worker.do_heavy/1 を呼び出したプロセスに処理結果を返します。

注意点として。 Task.async/1 の終了時には結果のメッセージ以外に :DOWN から始まる 5 要素のタプルもメッセージとして送信するので、これもハンドルする必要があります。

メッセージの詳細やきちんとしたハンドルのしかたについては、Elixir のドキュメントの Compatibility with OTP behaviours を参照してみてください。

なお、一般的にタスクも Task.Supervisor で管理した方がよいのですが、話を簡単にするためにここでは Task を直接使っています。

defmodule MyApp.Worker do
  use GenServer

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def do_heavy(n) do
    GenServer.call(__MODULE__, {:do_heavy, n})
  end

  def init(_) do
    {:ok, %{}}
  end

  def handle_call({:do_heavy, n}, from, state) do
    task =
      Task.async(fn ->
        Process.sleep(1_000)
        {:ok, n}
      end)

    {:noreply, Map.put(state, task.ref, from)}
  end

  def handle_info({ref, {:ok, n}}, state) do
    case Map.get(state, ref) do
      nil ->
        {:noreply, state}

      from ->
        GenServer.reply(from, {:ok, n})
        {:noreply, Map.delete(state, ref)}
    end
  end

  def handle_info({:DOWN, _, _, _, _}, state) do
    {:noreply, state}
  end
end

書き換えたものを実行してみます。

$ mix run benchmark.exs
{:ok, 1}
{:ok, 2}
{:ok, 3}
1010

今度は 3 つの処理が並行実行されるので、全体でも 1 秒で完了しました。

ElixirでPNGをつくる

ウェブアプリケーションを作っていると、表示する画像を動的に生成したいケースに遭遇します。 探してみると画像ファイルを生成するツールやライブラリは見つかるのですが、ウェブアプリケーションでは画像データを HTTP のレスポンスとして送信できればよいので、ファイルを介さずに扱えると便利です。

そんなことを考えつつ画像フォーマットを調べていたら、PNG の構造が思っていたよりもずっと単純ということに気がつきました。 そんなわけで、今回は PNG の生成を Elixir で書いてみることにします。

PNG の構造

実装するにあたって、最初に最低限の PNG の情報を整理します。

ここでは設定値の細かな説明は省略しますので、詳しくは PNG の仕様や PNG - Wikipedia などを参照してみてください。

最小限の構成

PNG は、ファイルヘッダといくつかのチャンクと呼ばれる構造で構成されています。 チャンクはいくつか種類がありますが、ヘッダ情報を格納する IHDR チャンク、画像データを格納する IDAT チャンク、終端を表す IEND チャンクの 3 つが必須で、これらを一つずつ持つ構造が最小の構成になります(パレットカラーを利用するばあいは PLTE チャンクも必要ですが、今回は割愛)。

ファイルヘッダ

ファイルヘッダは、その内容が PNG であることをあらわす固定のデータです。 ファイルはこのデータを先頭に配置しなければなりません。

チャンク

チャンクの構造

ファイルヘッダ以外のデータは、チャンクという形式で格納されています。 チャンクは、データの長さ、チャンクタイプ、チャンクデータ、CRC からなるデータです。

長さ、タイプ、CRC はそれぞれ 4 バイトで、ビッグエンディアンで格納されます。 データは可変長で 0 のばあいもあります。

CRC はタイプとデータを連結した CRC-32 の値です。

IHDR チャンク

画像の大きさや使用する色の情報を格納する固定長のチャンクです。

width と height は画像の幅と高さをあらわす 4 バイトの数値でビッグエンディアンで格納されます。

bit depth は画素あたりのビットを表し、color type はフルカラーやグレースケール、アルファチャネルの有無などの情報を表します。

残りのデータで圧縮、フィルタリング、インタレースといった制御ができますが、この記事では省略します。

IDAT チャンク

画像イメージを格納するチャンクです。 構造は単純で、各ラインごとに適用するフィルタの種類をあらわす 1 バイトのデータを先頭に追加して、全体を Deflate で圧縮(いわゆる ZIP 圧縮)したものです。

一つのラインあたりのデータは、画像幅と画素あたりのバイト数(例えば 8 ビットフルカラーであれば、画素あたり 3 バイト)をかけた値になります。 画素あたりのビット数が 8 未満で、バイト列にしたときに端数が出るばあいは、パディングを追加してバイト区切りにそろえる必要があります。

IEND チャンク

終端をあらわすチャンクです。 チャンクデータを持たないため事実上固定データです。

実装に必要な知識

次に。 Elixir で実装するにあたり、必要な情報を整理しておきます。

画像データの圧縮

Elixir そのもにはデータ圧縮のライブラリは提供されていませんが、Erlangzlib が用意されているので、これを利用します。

www.erlang.org

CRC

CRC-32 の関数も Erlangerlang:crc32/1 が用意されているので、これを利用します。

www.erlang.org

IO data

最後に。 今回は IO data という、普段はあまり意識しないデータ構造を利用するので、その説明を補足しておきます。

IO data は Elixir のデータ構造の一つですが、ドキュメントにあるように複雑なものではありません。 ドキュメントでは次のように記載されています。

hexdocs.pm

IO data is a data type that can be used as a more efficient alternative to binaries in certain situations.

(IO data は、特定の状況でバイナリのより効率的な代替手段として使用できるデータ型です。)

A term of type IO data is a binary or a list containing bytes (integers within the 0..255 range) or nested IO data. The type is recursive.

(IO data の項目の型は、バイナリまたはバイト(0..255範囲内の整数)またはネストされた IO data を含むリストです。型は再帰的です。)

たとえば、次のデータは IO.puts/1 で出力すると ABCD と表示される IO data の例です。

  • [0x41, 0x42, 0x43, 0x44]
  • [0x41, [0x42, [0x43, [0x44]]]]
  • [[0x41, 0x42], [0x43, 0x44]]
  • [[0x41, 0x42], "CD"]
  • <<0x041, 0x42, 0x43, 0x44>>
  • [<<0x041, 0x42>>, <<0x43, 0x44>>]

このように柔軟な構造をしているので、「バイナリか、0 から 255 までの整数か、IO data そのものをリストで連結すればなんとかなる」便利なデータ構造になっています。 この柔軟さのおかげで、リストのネストやバイナリとの混在を、処理の途中でほとんど気にすることなく扱うことが可能になります。

先に紹介した zlib の関数に入力するデータや出力されるデータも IO data の形式になっています。

これらを踏まえて。 PNG 画像の生成を Elixir で実装してみます。

実装

チャンクを構築する

まず、チャンクを構築する関数を書きます。

引数の type がチャンクタイプ、data がデータ本体です。 どちらも IO data 形式です。

チャンクの仕様にのっとり、長さ、タイプ、データ、CRC を連結したものを返します。 長さと CRC はビッグエンディアンの 32 ビット(4 バイト)のデータにしたいため、::big-32 を指定してバイナリデータに変換しています。

返却する値も、バイナリや IO data をリストにしたものなので、これも IO data 形式です。

  defp chunk(type, data) do
    length = IO.iodata_length(data)
    crc = :erlang.crc32([type, data])

    [<<length::big-32>>, type, data, <<crc::big-32>>]
  end

圧縮する

Erlangzlib を利用してデータを圧縮します。 一連の手続きを一つの関数にまとめただけでライブラリの使い方そのままです。

ここで引数の data の値も戻り値も IO data 形式です。

  defp zip(data) do
    z = :zlib.open()
    :ok = :zlib.deflateInit(z)
    compressed = :zlib.deflate(z, data, :finish)
    :ok = :zlib.deflateEnd(z)
    :zlib.close(z)
    compressed
  end

画像を作成する

サイズが、幅width 、高さ height の 8 ビットフルカラーの画像データ bitmap から PNG のデータを作成する関数です。 bit_depthcolor_type の値を変更すれば、他の画像フォーマットの PNG データを作成できます。

関数の処理を簡単にするために、ここでは bitmap は、RGB の 3 バイトごとにリストかバイナリにしたものを要素としたリストで受け取ることを前提にしています。 具体的には [[r, g, b], [r, g, b], ...] もしくは [<<r, g, b>>, <<r, g, b>>, ...] という構造になっている必要があります。

data の値を作成する部分で、bitmap をラインごとに分割して、その先頭に 1 バイトの filter type を挿入しています。 その後 data を圧縮したものが idat の値になります

chunk/2zip/1 は先に説明した関数と同じものです。

ここでも戻り値は IO data 形式です。

defmodule Png do
  @file_header <<0x89, "PNG", 0x0D, 0x0A, 0x1A, 0x0A>>

  def generate(width, height, bitmap) do
    bit_depth = 8          # 8 bits per pixel
    color_type = 2         # 2 = true color
    compression_method = 0 # compression: none
    filter_method = 0      # filter: none
    interlace_method = 0   # no interlace

    ihdr = <<
      width::big-32,
      height::big-32,
      bit_depth,
      color_type,
      compression_method,
      filter_method,
      interlace_method
    >>

    data =
      bitmap
      |> Enum.chunk_every(width)
      |> Enum.map(fn row ->
        [filter_method, row]
      end)

    idat = zip(data)

    [
      @file_header,
      chunk("IHDR", ihdr),
      chunk("IDAT", idat),
      chunk("IEND", "")
    ]
  end

  defp chunk(type, data) do
    length = IO.iodata_length(data)
    crc = :erlang.crc32([type, data])

    [<<length::big-32>>, type, data, <<crc::big-32>>]
  end

  defp zip(data) do
    z = :zlib.open()
    :ok = :zlib.deflateInit(z)
    compressed = :zlib.deflate(z, data, :finish)
    :ok = :zlib.deflateEnd(z)
    :zlib.close(z)
    compressed
  end
end

実行

PNG ファイルを作成する

作成したモジュールを利用して実際に PNG ファイルを作成します。

ここでは 512x512 のグラデーションパタンのファイルを作成しています。

データは IO data 形式で生成されるため、File.write/2 を使ってそのままファイルに書き出すことが可能です。

width = 512
height = 512

bitmap =
  for row <- 0..(height - 1), col <- 0..(width - 1) do
    [
      min(row, 255),
      min(col, 255),
      min(min(511 - row, 511 - col), 255)
    ]
  end

png = Png.generate(width, height, bitmap)

File.write("grad.png", png)

Livebook で使う

Livebook で Kino をインストールすれば、生成した PNG データを Livebook 上で確認することができます。

hexdocs.pm

Livebook で新しいノートブックを開いて、Setup で Kino をインストールします。

Mix.install([:kino])

次に PNG ファイルを作成するために記述したコードを Livebook に貼り付けます。

ここで最後に File.write/2 でファイルに保存しているコードを IO.iodata_to_binary/1 に書き換えます。

IO.iodata_to_binary/1 は名前のとおり IO data をバイナリに変換するもので、バイナリ形式になった PNG データは Kino が自動的に画像として表示してくれます。

png = Png.generate(width, height, bitmap)

IO.iodata_to_binary(png)

宣伝

最後にちょっと宣伝です。 ここまで書いた内容をパッケージにまとめたものを hex.pm で公開しています。 グレースケールやパレットカラーにも対応しています。 よろしければどうぞ。

hex.pm

いつか読むはずっと読まない:ひとのあいだと書いて人間

ソフトウェアは、人そのものから作られるプロダクトなわけですが。 人それだけではなく、人と人との関係もまたソフトウェアの源泉の一つなのだなと思う今日この頃。

まさに、「ソフトウェアは人が人のためにつくるもの」、なのだと。