前回のあらすじ
Ruby で非同期メッセージを扱いたいと思い立った筆者。
ActiveJob を使ってオブジェクトへ非同期メッセージを送れたと思ったのも束の間、メッセージを送った先のオブジェクト、実は宛先にしたオブジェクトのクローンだった。
どうすればいいのだろか…?
Ractor
Ractor を使えばいいんじゃないだろうか。
というわけで、今回は非同期メッセージの送受信を Ractor を使って実現することを考えます。
Ruby 3.0 で導入され、Ruby 4.0 現在も熟成期間にある新機能。
プログラミング一般としてはアクターモデル自体は目新しいものではありませんが、Ruby という文脈の中で最終的にどのように成熟するのか楽しみにしている機能です。
irb(main):001> Ractor.new { puts 'Hi' } (irb):1: warning: Ractor API is experimental and may change in future versions of Ruby. Hi => #<Ractor:#2 (irb):1 terminated>
(Ruby 4.0 でも experimental の警告が出る)
なお Ractor そのものの解説は公式ドキュメントを参照ください。
Ractor でオブジェクトを包む
あるクラス Bar があり、そのインスタンス bar を Ractor の中で動作させることを考えます。
Ractor へメッセージを送り、Ractor は受け取ったメッセージを bar に渡します。
class Bar def do_something(a, b, c) puts "bar performing (#{a}, #{b}, #{c})" end end
bar_ractor = Ractor.new do bar = Bar.new a, b, c = receive bar.do_something(a, b, c) end
メッセージを送るには #<< もしくは #send を利用します。
bar_ractor << [1, 2, 3] # もしくは bar_ractor.send([1, 2, 3]) bar performing (1, 2, 3) # <- Ractor の中から標準出力へ出力された文字列
ここで、メッセージを受信して処理を実行を終えてしまえば Ractor のプロセスはそこで終了です。
終了してしまった Ractor オブジェクトへメッセージを送ると例外が発生します。
bar_ractor << [1, 2, 3] #=> The port was already closed (Ractor::ClosedError)
メッセージを繰り返し受け付けることができるように、メッセージ待ちループを作ることにします。
bar_ractor = Ractor.new do bar = Bar.new loop do a, b, c = receive bar.do_something(a, b, c) end end
これで繰り返しメッセージを送れるようになりました。
bar_ractor << [1, 2, 3] bar performing (1, 2, 3) bar_ractor << [2, 4, 8] bar performing (2, 4, 8)
前回の ActiveJob を使った実装では、メッセージを送るごとにプロセスが起動していました。 メッセージ送信ごとにプロセスが起動するので、複数のメッセージを同じプロセスで処理することができません。
今回、同じプロセスでメッセージを待つ方法へ変更したことで、継続して同じオブジェクトを操作できるようになりました。
送信先のオブジェクトのようにふるまう Proxy を作る
このままでは扱いにくいので、Proxy クラスを定義します
class Proxy def initialize @ractor = Ractor.new do bar = Bar.new loop do method, args = receive bar.send(method, *args) end end end def method_missing(name, *args) @ractor << [name, args] end end
これで Bar のオブジェクトのようにふるまう Proxy ができました。
bar = Proxy.new bar.do_something(1, 2, 3) bar performing (1, 2, 3) bar.do_something(2, 3, 4) bar performing (2, 3, 4)
ただし、このままではメッセージを送信する一方で相手のオブジェクトから情報を引き出すことはできません。 Logger のように一方向でよいばあいを除いて、何か結果が得られるようにしたいのが普通です。
それも非同期メッセージで実現します。
メッセージで値を返す
Ruby 4.0 から Ractor::Port という仕組みが追加されました。
Ractor::Port のオブジェクトはメッセージを送受信する経路になります。
送信はどの Ractor のプロセスからも可能ですが、受信はオブジェクトを作成したプロセスにしかできないというルールがあります。
# Ractor::Port オブジェクトを作成する port = Ractor::Port.new # Ractor オブジェクトに Ractor::Port オブジェクトを渡す ractor = Ractor.new(port) do |port| # Ractor::Port オブジェクトにメッセージを送信する port << 'Hi' end # Ractor::Port オブジェクトからメッセージを受信する port.receive #=> "Hi"
Ractor::Port が導入されたことで送受信がかなりすっきりした印象です。
ここに至るまでどのような検討を重ねたのか、興味深い話がブログにまとめられています。
さて、これをふまえて。
数値を一つ状態として持つオブジェクトを考えます。
#inc はその数値をカウントアップし、カウントアップ後の値を返します。
class Bar def initialize @count = 0 end def inc @count += 1 end end
これを Proxy で包みます。
先ほどの例では port オブジェクトを Ractor.new の引数で渡しましたが、今回は一回ごとにオブジェクトを作成して送信するメッセージに載せて送ります。
class Proxy def initialize @ractor = Ractor.new do bar = Bar.new loop do sender, method, args = receive sender << bar.send(method, *args) end end def method_missing(name, *args) port = Ractor::Port.new @ractor << [port, name, args] port.receive end end end
実行。
bar = Proxy.new bar.inc #=> 1 bar.inc #=> 2 bar.inc #=> 3
これで Ractor のプロセスの中にいるオブジェクトの値を取り出すことができるようになりました。
ただ #receive は処理をブロックするので、この例では非同期メッセージを利用する利点はありません。
これが役に立つケースを考えます。
非同期と同期を分けて併用する
まず、数値のカウントアップと参照をそれぞれ #inc と #count に分けます。
class Bar attr_reader :count def initialize @count = 0 end def inc @count += 1 end end
次に、前回でも実装したように、末尾に _async をつけた場合のみ非同期になるような細工を追加します。
逆につけなかったばあいには Ractor::Port を利用してメッセージが送り返されるのを待つようにしました。
class Proxy def initialize @ractor = Ractor.new do bar = Bar.new loop do case receive in [method, args] # メッセージに sender が含まれないばあいは、メッセージを送り返さない bar.send(method, *args) in [sender, method, args] # メッセージに sender が含まれるばあいは、メソッドの値を sender へ送り返す sender << bar.send(method, *args) end end end end def method_missing(name, *args) case name.to_s.match(/\A(?<selector>.+)_async\z/) in {selector:} # 末尾に _async がついているばあいは、返信先をつけない @ractor << [selector, args] else # 末尾に _async がついていないばあいは、返信先を送り返信を待つ port = Ractor::Port.new @ractor << [port, name, args] port.receive end end end
bar = Proxy.new bar.inc_async # 非同期 bar.inc_async # 非同期 bar.inc_async # 非同期 bar.count # 同期 #=> 3
ここで #inc の処理が相対的に時間のかかる処理と想像してください(数値がファイルに保存されていてカウントアップ毎に読み出しと書き込みが発生するとか)。
そのようなばあいに一方的にメッセージを送ることで、並列で処理をさせることができます。
送った側はその間に別の処理を進め、結果が必要になったときに #count で結果を参照すればよいわけです。
処理が終わっていなければ、port.receive でブロックされるので処理が終わるまで待たされることにはなりますが、不完全な結果が返されることもありません。
ちょっとだけ汎用的な Proxy
ここまで定義した Proxy は Bar 専用でしたが、実装を見ての通りメッセージ送受信の中継をしているだけです。 作成するオブジェクトを引数で指定できれば、ちょっとだけ汎用的な Proxy を定義できそうです。
class Proxy def initialize(klass, *args, **kwargs) @ractor = Ractor.new(klass, args, kwargs) do |klass, args, kwargs| bar = klass.send(:new, *args, **kwargs) loop do case receive in [method, args] bar.send(method, *args) in [sender, method, args] sender << bar.send(method, *args) end end end end def method_missing(name, *args) case name.to_s.match(/\A(?<selector>.+)_async\z/) in {selector:} @ractor << [selector, args] else port = Ractor::Port.new @ractor << [port, name, args] port.receive end end end
引数つきのコンストラクタを持つクラスで試してみます。
class Bar attr_reader :count def initialize(init:) @count = init end def inc @count += 1 end end
bar = Proxy.new(Bar, init: 123) bar.count #=> 123 bar.inc_async bar.count #=> 124
よさそうな感じです。
Erlang の gen_server / Elixir の GenServer
ここで試したことは、Erlang の gen_server を念頭に置いて実装しました。 Elixir にも GenServer があり、これは gen_server を Elixir から利用しやすくラップしたライブラリです。
先に紹介したブログ記事にも Erlang への言及があり、Erlang 由来のモニター機能が追加されたことが説明されています。
互いに影響を与え合いつつ、それぞれの言語らしさが表現されるのか、今後も楽しみです。
いつか読むはずっと読まない:自己とは
メッセージを相手に送ったつもりがクローンに送っていたり、クローンに送ったのであっても事実上相手に送ったのと違いがなかったり。 オブジェクトのアイデンティティの扱いには注意を払う必要があることを学びました。
ましてや自己とはなんなのか。 悩ましい。
