動機
先日、会社の同僚との話の中で非同期メッセージが話題になりました。 非同期メッセージにあまり馴染みがないために、動作の表現に苦労した様子。
普段 Elixir に触れていると非同期メッセージは当たり前のように使っているわけですが、話題にしていた Ruby の環境では意識した上でないとなかなか使わないかもしれません。
あとになって考えを巡らしてみました。 Ruby で非同期メッセージを使おうと思ったらどのようになるか。
試しにコードにしてみました。
- 動機
- 同期メッセージ再確認
- 非同期メッセージ再確認
- ActiveJob を使った非同期処理
- ActiveJob を経由して非同期でメッセージを送る
- これは本当に非同期メッセージを送ったのか?
- これでも構わないケースがある
- 本当の非同期メッセージを実現するには?
- いつか読むはずっと読まない:分岐する自己
同期メッセージ再確認
まず前提として。
Ruby では、オブジェクトの間のやり取りは同期メッセージで行われます。
コードで レシーバー.メッセージ と書くと、メッセージがレシーバー(受信者)へ送られ、メッセージに対応するレシーバーのメソッドが起動します。
レシーバーのメソッドの処理が終わるまでセンダー(送信者)の処理はブロック、待たされることになります。
メッセージは通常セレクタと引数からなり、Ruby ではセレクタにメソッドの名前を利用します(シグネチャも関わってきますがその点は省略)。
これは レシーバ.メッセージ つまり レシーバー.セレクタ(引数) というコードを レシーバ.send(セレクタ, 引数) と書き換えられることで確認できます。
'abc'.sub('b', 'B') #=> "aBc" 'abc'.send(:sub, 'b', 'B') #=> "aBc"
非同期メッセージ再確認
これが非同期メッセージになると、レシーバーの処理が終わらなくてもセンダーの処理はブロックされません。
このばあい、レシーバーの処理が終わったことをセンダーが知るための仕組みも考える必要があるのですが、今回はその点は脇に置いてメッセージの送信だけに話を絞りたいと思います。
ActiveJob を使った非同期処理
Ruby on Rails では、非同期処理の仕組みとして ActiveJob が標準で用意されています。
今回はこれを単独で利用します。
まず ActiveJob をインストール。
$ gem install activejob
適当なジョブを作成します。
require 'active_job' class FooJob < ActiveJob::Base def perform(a, b, c) sleep 5 puts "foo performing (#{a}, #{b}, #{c})" end end
ここで単純に ruby -r'./foo_job' -e 'FooJob.perform_later' などと実行すると、ジョブの実行を待たずに終了してしまうので、簡単なスクリプトを用意することにします。
# example.rb require_relative './foo_job' FooJob.perform_later(1, 2, 3) 7.times do |i| puts i sleep 1 end
実行します。
$ ruby example.rb
[ActiveJob] Enqueued FooJob (Job ID: 1fa190b5-f0bc-4d53-98c5-619bbce99400) to Async(default) with arguments: 1, 2, 3 0 [ActiveJob] [FooJob] [1fa190b5-f0bc-4d53-98c5-619bbce99400] Performing FooJob (Job ID: 1fa190b5-f0bc-4d53-98c5-619bbce99400) from Async(default) enqueued at 2026-01-28T09:38:47.493669000Z with argumen ts: 1, 2, 3 1 2 3 4 foo performing (1, 2, 3) [ActiveJob] [FooJob] [1fa190b5-f0bc-4d53-98c5-619bbce99400] Performed FooJob (Job ID: 1fa190b5-f0bc-4d53-98c5-619bbce99400) from Async(default) in 5005.63ms 5 6
結果を確認しましょう。
0 から 6 までの数字は呼び出し元が出力しています。
sleep 1 しているのでおよそ 1 秒ごとに出力されます。
それ以外はジョブの処理が出力しています。
最初にジョブがキューイングされたことがログに出力されます。
[ActiveJob] Enqueued FooJob (Job ID: 1fa190b5-f0bc-4d53-98c5-619bbce99400) to Async(default) with arguments: 1, 2, 3
次にジョブが起動したことがログに出力されます。
[ActiveJob] [FooJob] [1fa190b5-f0bc-4d53-98c5-619bbce99400] Performing FooJob (Job ID: 1fa190b5-f0bc-4d53-98c5-619bbce99400) from Async(default) enqueued at 2026-01-28T09:38:47.493669000Z with argumen ts: 1, 2, 3
そして FooJob#perform 内の puts が実行され、
foo performing (1, 2, 3)
最後にジョブが終了したことがログに出力されます。
[ActiveJob] [FooJob] [1fa190b5-f0bc-4d53-98c5-619bbce99400] Performed FooJob (Job ID: 1fa190b5-f0bc-4d53-98c5-619bbce99400) from Async(default) in 5005.63ms
puts の前に sleep 5 しているので、出力はジョブの起動からおよそ 5 秒後になります。
これを踏まえて。
ActiveJob を経由して非同期でメッセージを送る
これを踏まえて、こう考えました。
例えば Foo#do_something(...) というメソッドが定義されていたとして、Foo のオブジェクトへ do_something_asycn(...) というメッセージを送ったら ActiveJob 経由で Foo#do_something(...) が起動するなら、非同期でメッセージを送ったことにならないだろうか。
やってみましょう。
Foo - レシーバーの実装
Foo に任意のメソッド #do_something を定義します。
次にメソッド名に _async の接尾辞を付けたメッセージを送ったばあいに ActiveJob を経由してメッセージを送れるようにします。
仕組みは #method_missing を再定義することで実現することにしました。
受け取ったメソッド名を分解し、接尾辞が _async で接尾辞を除いた名前のメソッドをレシーバーが持つばあいにジョブを起動します。
ジョブにはレシーバーである自分自身、メソッド名、引数を渡します。
ここでレシーバーと引数をマーシャリング(シリアライズ)しているのは、ジョブを起動するときに渡せる値の型に制限があるためで話の本質ではありません。 なのでその点の詳細は省略。
require_relative './foo_job' class Foo def do_something(a, b, c) sleep 5 puts "foo performing (#{a}, #{b}, #{c})" end private def method_missing(name, *args) match_data = name.to_s.match(/\A(?<selector>.+)_async\z/) super if match_data.nil? || !respond_to?(match_data[:selector]) marshaled_receiver = Marshal.dump(self) selector = match_data[:selector] marshaled_args = Marshal.dump(args) FooJob.perform_later(marshaled_receiver, selector, marshaled_args) end end
FooJob - メッセージを中継する仕組みの実装
ジョブではレシーバーと引数をアンマーシャリングして復元し、レシーバーにメッセージを送ります。
require 'active_job' class FooJob < ActiveJob::Base def perform(marshaled_receiver, selector, marshaled_args) receiver = Marshal.load(marshaled_receiver) args = Marshal.load(marshaled_args) receiver.send(selector, *args) end end
実行
実行するためのスクリプトも次のように書き換えます。
require_relative './foo' foo = Foo.new foo.do_something_async(1, 2, 3) 7.times do |i| puts i sleep 1 end
そして実行。
[ActiveJob] Enqueued FooJob (Job ID: a388c76f-7691-4deb-a75b-bca99dace777) to Async(default) with arguments: "\x04\bo:\bFoo\x00", "do_something", "\x04\b[\bi\x06i\ai\b" 0 [ActiveJob] [FooJob] [a388c76f-7691-4deb-a75b-bca99dace777] Performing FooJob (Job ID: a388c76f-7691-4deb-a75b-bca99dace777) from Async(default) enqueued at 2026-01-28T09:47:04.117553000Z with argumen ts: "\x04\bo:\bFoo\x00", "do_something", "\x04\b[\bi\x06i\ai\b" 1 2 3 4 foo performing (1, 2, 3) [ActiveJob] [FooJob] [a388c76f-7691-4deb-a75b-bca99dace777] Performed FooJob (Job ID: a388c76f-7691-4deb-a75b-bca99dace777) from Async(default) in 5005.86ms 5 6
期待する雰囲気で動いてくれました。
これは本当に非同期メッセージを送ったのか?
見た目はそれっぽいのですが、これは正しく非同期メッセージを送ったとはいえません。 すでにばれていると思いますが、これは Foo オブジェクトが状態を持っているばあいに明白になります。
引数で渡した値をインスタンス変数に格納するように変更します。
class Foo # インスタンス変数の初期化を追加 def initialize @a, @b, @c = 0, 0, 0 end def do_something(a, b, c) sleep 5 puts "foo performing (#{a}, #{b}, #{c})" @a, @b, @c = a, b, c # 追加 pp self # 実行後の自分の状態を出力 end # ... 以下略 end
メッセージを送る側でも foo の状態を確認するコードを追加します。
require_relative './foo' foo = Foo.new foo.do_something_async(1, 2, 3) 7.times do |i| puts i sleep 1 end pp foo # 追加
実行。
[ActiveJob] Enqueued FooJob (Job ID: 0849abfd-0075-4645-a135-9075a82f9947) to Async(default) with arguments: "\x04\bo:\bFoo\b:\a@ai\x00:\a@bi\x00:\a@ci\x00", "do_something", "\x04\b[\bi\x06i\ai\b" 0 [ActiveJob] [FooJob] [0849abfd-0075-4645-a135-9075a82f9947] Performing FooJob (Job ID: 0849abfd-0075-4645-a135-9075a82f9947) from Async(default) enqueued at 2026-01-28T09:51:35.721457000Z with argumen ts: "\x04\bo:\bFoo\b:\a@ai\x00:\a@bi\x00:\a@ci\x00", "do_something", "\x04\b[\bi\x06i\ai\b" 1 2 3 4 foo performing (1, 2, 3) 5 #<Foo:0x0000000120f77fd0 @a=1, @b=2, @c=3> [ActiveJob] [FooJob] [0849abfd-0075-4645-a135-9075a82f9947] Performed FooJob (Job ID: 0849abfd-0075-4645-a135-9075a82f9947) from Async(default) in 5022.28ms 6 #<Foo:0x0000000120c8b830 @a=0, @b=0, @c=0>
予想通りですが、メッセージを送った相手である foo の状態は変化していません。
マーシャリング / アンマーシャリングをしているわけですから、当然復元されたオブジェクトは元のオブジェクトの複製であり、元のオブジェクトではありません。 センダーがいるプロセスと ActiveJob が実行されるプロセスでオブジェクトを共有できない以上、オブジェクトを複製するしかなく、複製である以上同じオブジェクトではないことは明白です。
これでも構わないケースがある
値オブジェクトのばあい
値オブジェクトは不変であることが特徴の一つで、状態が同じであれば同じ値と扱われます。 複製であっても同一の値として扱えるので、このばあいは複製されても問題になりません。
もっともそれは、オブジェクトに非同期メッセージを送ることと関数を並行で実行することとで大差がなくなってしまうので、あえてこのような仕組みを考えるまでもないかもしれません。
ActiveRecord など状態がオブジェクトの外部にあるばあい
ActiveRecord のオブジェクトはデータベース上のレコードをオブジェクトに映し取ったものです。 メモリ上では異なるオブジェクトでも同じレコードを参照するのであれば同じ値として扱えます。 もちろんレコードとオブジェクトが適切に同期されていることが前提ですが。
実際、Rails アプリケーションでジョブを利用するケースではそれを前提としていることが普通と思います。
本当の非同期メッセージを実現するには?
オブジェクトがプロセスをまたげないのなら、最初からオブジェクトを別のプロセスに用意してそこに向かってメッセージを送る、ことになると思います。
そしてこちらのプロセスには、さも向こうのプロセスにいるオブジェクトのようにふるまうプロキシオブジェクトを用意し、 プロキシ内に非同期送信の仕組みを押し込むのがよさそうです。
そんなわけで。 次回は、 Ruby に導入された平行処理の仕組みの中では一番新しい Ractor を使って非同期メッセージ送信を実現する方法を考えてみようと思います。
いつか読むはずっと読まない:分岐する自己
人の意識が肉体を捨てソフトウェアとして生きるようになった時代の話。
記事で、複製されるオブジェクトについて書いているうちに思い出したので。

