エンジニアのソフトウェア的愛情

または私は如何にして心配するのを止めてプログラムを・愛する・ようになったか

真 Ruby でオブジェクトへ非同期メッセージを送る

前回のあらすじ

Ruby で非同期メッセージを扱いたいと思い立った筆者。

ActiveJob を使ってオブジェクトへ非同期メッセージを送れたと思ったのも束の間、メッセージを送った先のオブジェクト、実は宛先にしたオブジェクトのクローンだった。

どうすればいいのだろか…?

blog.emattsan.org

Ractor

Ractor を使えばいいんじゃないだろうか。

というわけで、今回は非同期メッセージの送受信を Ractor を使って実現することを考えます。

Ruby 3.0 で導入され、Ruby 4.0 現在も熟成期間にある新機能。

プログラミング一般としてはアクターモデル自体は目新しいものではありませんが、Ruby という文脈の中で最終的にどのように成熟するのか楽しみにしている機能です。

irb(main):001> Ractor.new { puts 'Hi' }
(irb):1: warning: Ractor API is experimental and may change in future versions of Ruby.
Hi
=> #<Ractor:#2 (irb):1 terminated>

Ruby 4.0 でも experimental の警告が出る

なお Ractor そのものの解説は公式ドキュメントを参照ください。

docs.ruby-lang.org

Ractor でオブジェクトを包む

あるクラス Bar があり、そのインスタンス bar を Ractor の中で動作させることを考えます。

Ractor へメッセージを送り、Ractor は受け取ったメッセージを bar に渡します。

class Bar
  def do_something(a, b, c)
    puts "bar performing (#{a}, #{b}, #{c})"
  end
end
bar_ractor = Ractor.new do
  bar = Bar.new
  a, b, c = receive
  bar.do_something(a, b, c)
end

メッセージを送るには #<< もしくは #send を利用します。

bar_ractor << [1, 2, 3] # もしくは bar_ractor.send([1, 2, 3])
bar performing (1, 2, 3) # <- Ractor の中から標準出力へ出力された文字列

ここで、メッセージを受信して処理を実行を終えてしまえば Ractor のプロセスはそこで終了です。

終了してしまった Ractor オブジェクトへメッセージを送ると例外が発生します。

bar_ractor << [1, 2, 3]
#=> The port was already closed (Ractor::ClosedError)

メッセージを繰り返し受け付けることができるように、メッセージ待ちループを作ることにします。

bar_ractor = Ractor.new do
  bar = Bar.new
  loop do
    a, b, c = receive
    bar.do_something(a, b, c)
  end
end

これで繰り返しメッセージを送れるようになりました。

bar_ractor << [1, 2, 3]
bar performing (1, 2, 3)

bar_ractor << [2, 4, 8]
bar performing (2, 4, 8)

前回の ActiveJob を使った実装では、メッセージを送るごとにプロセスが起動していました。 メッセージ送信ごとにプロセスが起動するので、複数のメッセージを同じプロセスで処理することができません。

今回、同じプロセスでメッセージを待つ方法へ変更したことで、継続して同じオブジェクトを操作できるようになりました。

送信先のオブジェクトのようにふるまう Proxy を作る

このままでは扱いにくいので、Proxy クラスを定義します

class Proxy
  def initialize
    @ractor = Ractor.new do
      bar = Bar.new
      loop do
        method, args = receive
        bar.send(method, *args)
      end
    end
  end
  
  def method_missing(name, *args)
    @ractor << [name, args]
  end
end

これで Bar のオブジェクトのようにふるまう Proxy ができました。

bar = Proxy.new

bar.do_something(1, 2, 3)
bar performing (1, 2, 3)

bar.do_something(2, 3, 4)
bar performing (2, 3, 4)

ただし、このままではメッセージを送信する一方で相手のオブジェクトから情報を引き出すことはできません。 Logger のように一方向でよいばあいを除いて、何か結果が得られるようにしたいのが普通です。

それも非同期メッセージで実現します。

メッセージで値を返す

Ruby 4.0 から Ractor::Port という仕組みが追加されました。

docs.ruby-lang.org

Ractor::Port のオブジェクトはメッセージを送受信する経路になります。 送信はどの Ractor のプロセスからも可能ですが、受信はオブジェクトを作成したプロセスにしかできないというルールがあります。

# Ractor::Port オブジェクトを作成する
port = Ractor::Port.new

# Ractor オブジェクトに Ractor::Port オブジェクトを渡す
ractor = Ractor.new(port) do |port|
  # Ractor::Port オブジェクトにメッセージを送信する
  port << 'Hi'
end

# Ractor::Port オブジェクトからメッセージを受信する
port.receive
#=> "Hi"

Ractor::Port が導入されたことで送受信がかなりすっきりした印象です。

ここに至るまでどのような検討を重ねたのか、興味深い話がブログにまとめられています。

product.st.inc

さて、これをふまえて。

数値を一つ状態として持つオブジェクトを考えます。 #inc はその数値をカウントアップし、カウントアップ後の値を返します。

class Bar
  def initialize
    @count = 0
  end
  
  def inc
    @count += 1
  end
end

これを Proxy で包みます。 先ほどの例では port オブジェクトを Ractor.new の引数で渡しましたが、今回は一回ごとにオブジェクトを作成して送信するメッセージに載せて送ります。

class Proxy
  def initialize
    @ractor = Ractor.new do
      bar = Bar.new
      loop do
        sender, method, args = receive
        sender << bar.send(method, *args)
      end
    end
    
    def method_missing(name, *args)
      port = Ractor::Port.new
      @ractor << [port, name, args]
      port.receive
    end
  end
end

実行。

bar = Proxy.new
bar.inc
#=> 1
bar.inc
#=> 2
bar.inc
#=> 3

これで Ractor のプロセスの中にいるオブジェクトの値を取り出すことができるようになりました。

ただ #receive は処理をブロックするので、この例では非同期メッセージを利用する利点はありません。

これが役に立つケースを考えます。

非同期と同期を分けて併用する

まず、数値のカウントアップと参照をそれぞれ #inc#count に分けます。

class Bar
  attr_reader :count

  def initialize
    @count = 0
  end

  def inc
    @count += 1
  end
end

次に、前回でも実装したように、末尾に _async をつけた場合のみ非同期になるような細工を追加します。 逆につけなかったばあいには Ractor::Port を利用してメッセージが送り返されるのを待つようにしました。

class Proxy
  def initialize
    @ractor = Ractor.new do
      bar = Bar.new

      loop do
        case receive
        in [method, args]
          # メッセージに sender が含まれないばあいは、メッセージを送り返さない
          bar.send(method, *args)

        in [sender, method, args]
          # メッセージに sender が含まれるばあいは、メソッドの値を sender へ送り返す
          sender << bar.send(method, *args)

        end
      end
    end
  end

  def method_missing(name, *args)
    case name.to_s.match(/\A(?<selector>.+)_async\z/)
    in {selector:}
       # 末尾に _async がついているばあいは、返信先をつけない
      @ractor << [selector, args]

    else
       # 末尾に _async がついていないばあいは、返信先を送り返信を待つ
      port = Ractor::Port.new
      @ractor << [port, name, args]
      port.receive

    end
  end
end
bar = Proxy.new
bar.inc_async # 非同期
bar.inc_async # 非同期
bar.inc_async # 非同期
bar.count     # 同期
#=> 3

ここで #inc の処理が相対的に時間のかかる処理と想像してください(数値がファイルに保存されていてカウントアップ毎に読み出しと書き込みが発生するとか)。 そのようなばあいに一方的にメッセージを送ることで、並列で処理をさせることができます。 送った側はその間に別の処理を進め、結果が必要になったときに #count で結果を参照すればよいわけです。

処理が終わっていなければ、port.receive でブロックされるので処理が終わるまで待たされることにはなりますが、不完全な結果が返されることもありません。

ちょっとだけ汎用的な Proxy

ここまで定義した Proxy は Bar 専用でしたが、実装を見ての通りメッセージ送受信の中継をしているだけです。 作成するオブジェクトを引数で指定できれば、ちょっとだけ汎用的な Proxy を定義できそうです。

class Proxy
  def initialize(klass, *args, **kwargs)
    @ractor = Ractor.new(klass, args, kwargs) do |klass, args, kwargs|
      bar = klass.send(:new, *args, **kwargs)

      loop do
        case receive
        in [method, args]
          bar.send(method, *args)
        in [sender, method, args]
          sender << bar.send(method, *args)
        end
      end
    end
  end

  def method_missing(name, *args)
    case name.to_s.match(/\A(?<selector>.+)_async\z/)
    in {selector:}
      @ractor << [selector, args]
    else
      port = Ractor::Port.new
      @ractor << [port, name, args]
      port.receive
    end
  end
end

引数つきのコンストラクタを持つクラスで試してみます。

class Bar
  attr_reader :count

  def initialize(init:)
    @count = init
  end

  def inc
    @count += 1
  end
end
bar = Proxy.new(Bar, init: 123)
bar.count
#=> 123
bar.inc_async
bar.count
#=> 124

よさそうな感じです。

Erlang の gen_server / Elixir の GenServer

ここで試したことは、Erlang の gen_server を念頭に置いて実装しました。 Elixir にも GenServer があり、これは gen_server を Elixir から利用しやすくラップしたライブラリです。

www.erlang.org

hexdocs.pm

先に紹介したブログ記事にも Erlang への言及があり、Erlang 由来のモニター機能が追加されたことが説明されています。

互いに影響を与え合いつつ、それぞれの言語らしさが表現されるのか、今後も楽しみです。

いつか読むはずっと読まない:自己とは

メッセージを相手に送ったつもりがクローンに送っていたり、クローンに送ったのであっても事実上相手に送ったのと違いがなかったり。 オブジェクトのアイデンティティの扱いには注意を払う必要があることを学びました。

ましてや自己とはなんなのか。 悩ましい。