エンジニアのソフトウェア的愛情

または私は如何にして心配するのを止めてプログラムを・愛する・ようになったか

Ruby でオブジェクトへ非同期メッセージを送る

動機

先日、会社の同僚との話の中で非同期メッセージが話題になりました。 非同期メッセージにあまり馴染みがないために、動作の表現に苦労した様子。

普段 Elixir に触れていると非同期メッセージは当たり前のように使っているわけですが、話題にしていた Ruby の環境では意識した上でないとなかなか使わないかもしれません。

あとになって考えを巡らしてみました。 Ruby で非同期メッセージを使おうと思ったらどのようになるか。

試しにコードにしてみました。

同期メッセージ再確認

まず前提として。 Ruby では、オブジェクトの間のやり取りは同期メッセージで行われます。 コードで レシーバー.メッセージ と書くと、メッセージがレシーバー(受信者)へ送られ、メッセージに対応するレシーバーのメソッドが起動します。 レシーバーのメソッドの処理が終わるまでセンダー(送信者)の処理はブロック、待たされることになります。

メッセージは通常セレクタと引数からなり、Ruby ではセレクタにメソッドの名前を利用します(シグネチャも関わってきますがその点は省略)。

これは レシーバ.メッセージ つまり レシーバー.セレクタ(引数) というコードを レシーバ.send(セレクタ, 引数) と書き換えられることで確認できます。

'abc'.sub('b', 'B')
#=> "aBc"

'abc'.send(:sub, 'b', 'B')
#=> "aBc"

非同期メッセージ再確認

これが非同期メッセージになると、レシーバーの処理が終わらなくてもセンダーの処理はブロックされません。

このばあい、レシーバーの処理が終わったことをセンダーが知るための仕組みも考える必要があるのですが、今回はその点は脇に置いてメッセージの送信だけに話を絞りたいと思います。

ActiveJob を使った非同期処理

Ruby on Rails では、非同期処理の仕組みとして ActiveJob が標準で用意されています。

rubygems.org

今回はこれを単独で利用します。

まず ActiveJob をインストール。

$ gem install activejob

適当なジョブを作成します。

require 'active_job'

class FooJob < ActiveJob::Base
  def perform(a, b, c)
    sleep 5

    puts "foo performing (#{a}, #{b}, #{c})"
  end
end

ここで単純に ruby -r'./foo_job' -e 'FooJob.perform_later' などと実行すると、ジョブの実行を待たずに終了してしまうので、簡単なスクリプトを用意することにします。

# example.rb

require_relative './foo_job'

FooJob.perform_later(1, 2, 3)

7.times do |i|
  puts i

  sleep 1
end

実行します。

$ ruby example.rb
[ActiveJob] Enqueued FooJob (Job ID: 1fa190b5-f0bc-4d53-98c5-619bbce99400) to Async(default) with arguments: 1, 2, 3
0
[ActiveJob] [FooJob] [1fa190b5-f0bc-4d53-98c5-619bbce99400] Performing FooJob (Job ID: 1fa190b5-f0bc-4d53-98c5-619bbce99400) from Async(default) enqueued at 2026-01-28T09:38:47.493669000Z with argumen
ts: 1, 2, 3
1
2
3
4
foo performing (1, 2, 3)
[ActiveJob] [FooJob] [1fa190b5-f0bc-4d53-98c5-619bbce99400] Performed FooJob (Job ID: 1fa190b5-f0bc-4d53-98c5-619bbce99400) from Async(default) in 5005.63ms
5
6

結果を確認しましょう。

0 から 6 までの数字は呼び出し元が出力しています。 sleep 1 しているのでおよそ 1 秒ごとに出力されます。

それ以外はジョブの処理が出力しています。

最初にジョブがキューイングされたことがログに出力されます。

[ActiveJob] Enqueued FooJob (Job ID: 1fa190b5-f0bc-4d53-98c5-619bbce99400) to Async(default) with arguments: 1, 2, 3

次にジョブが起動したことがログに出力されます。

[ActiveJob] [FooJob] [1fa190b5-f0bc-4d53-98c5-619bbce99400] Performing FooJob (Job ID: 1fa190b5-f0bc-4d53-98c5-619bbce99400) from Async(default) enqueued at 2026-01-28T09:38:47.493669000Z with argumen
ts: 1, 2, 3

そして FooJob#perform 内の puts が実行され、

foo performing (1, 2, 3)

最後にジョブが終了したことがログに出力されます。

[ActiveJob] [FooJob] [1fa190b5-f0bc-4d53-98c5-619bbce99400] Performed FooJob (Job ID: 1fa190b5-f0bc-4d53-98c5-619bbce99400) from Async(default) in 5005.63ms

puts の前に sleep 5 しているので、出力はジョブの起動からおよそ 5 秒後になります。

これを踏まえて。

ActiveJob を経由して非同期でメッセージを送る

これを踏まえて、こう考えました。

例えば Foo#do_something(...) というメソッドが定義されていたとして、Foo のオブジェクトへ do_something_asycn(...) というメッセージを送ったら ActiveJob 経由で Foo#do_something(...) が起動するなら、非同期でメッセージを送ったことにならないだろうか。

やってみましょう。

Foo - レシーバーの実装

Foo に任意のメソッド #do_something を定義します。

次にメソッド名に _async の接尾辞を付けたメッセージを送ったばあいに ActiveJob を経由してメッセージを送れるようにします。 仕組みは #method_missing を再定義することで実現することにしました。

受け取ったメソッド名を分解し、接尾辞が _async で接尾辞を除いた名前のメソッドをレシーバーが持つばあいにジョブを起動します。 ジョブにはレシーバーである自分自身、メソッド名、引数を渡します。

ここでレシーバーと引数をマーシャリング(シリアライズ)しているのは、ジョブを起動するときに渡せる値の型に制限があるためで話の本質ではありません。 なのでその点の詳細は省略。

require_relative './foo_job'

class Foo
  def do_something(a, b, c)
    sleep 5

    puts "foo performing (#{a}, #{b}, #{c})"
  end

  private

  def method_missing(name, *args)
    match_data = name.to_s.match(/\A(?<selector>.+)_async\z/)
    super if match_data.nil? || !respond_to?(match_data[:selector])

    marshaled_receiver = Marshal.dump(self)
    selector = match_data[:selector]
    marshaled_args = Marshal.dump(args)

    FooJob.perform_later(marshaled_receiver, selector, marshaled_args)
  end
end

FooJob - メッセージを中継する仕組みの実装

ジョブではレシーバーと引数をアンマーシャリングして復元し、レシーバーにメッセージを送ります。

require 'active_job'

class FooJob < ActiveJob::Base
  def perform(marshaled_receiver, selector, marshaled_args)
    receiver = Marshal.load(marshaled_receiver)
    args = Marshal.load(marshaled_args)

    receiver.send(selector, *args)
  end
end

実行

実行するためのスクリプトも次のように書き換えます。

require_relative './foo'

foo = Foo.new

foo.do_something_async(1, 2, 3)

7.times do |i|
  puts i

  sleep 1
end

そして実行。

[ActiveJob] Enqueued FooJob (Job ID: a388c76f-7691-4deb-a75b-bca99dace777) to Async(default) with arguments: "\x04\bo:\bFoo\x00", "do_something", "\x04\b[\bi\x06i\ai\b"
0
[ActiveJob] [FooJob] [a388c76f-7691-4deb-a75b-bca99dace777] Performing FooJob (Job ID: a388c76f-7691-4deb-a75b-bca99dace777) from Async(default) enqueued at 2026-01-28T09:47:04.117553000Z with argumen
ts: "\x04\bo:\bFoo\x00", "do_something", "\x04\b[\bi\x06i\ai\b"
1
2
3
4
foo performing (1, 2, 3)
[ActiveJob] [FooJob] [a388c76f-7691-4deb-a75b-bca99dace777] Performed FooJob (Job ID: a388c76f-7691-4deb-a75b-bca99dace777) from Async(default) in 5005.86ms
5
6

期待する雰囲気で動いてくれました。

これは本当に非同期メッセージを送ったのか?

見た目はそれっぽいのですが、これは正しく非同期メッセージを送ったとはいえません。 すでにばれていると思いますが、これは Foo オブジェクトが状態を持っているばあいに明白になります。

引数で渡した値をインスタンス変数に格納するように変更します。

class Foo
  # インスタンス変数の初期化を追加
  def initialize
    @a, @b, @c = 0, 0, 0
  end

  def do_something(a, b, c)
    sleep 5

    puts "foo performing (#{a}, #{b}, #{c})"
    @a, @b, @c = a, b, c # 追加
    pp self              # 実行後の自分の状態を出力
  end

  # ... 以下略
end

メッセージを送る側でも foo の状態を確認するコードを追加します。

require_relative './foo'

foo = Foo.new

foo.do_something_async(1, 2, 3)

7.times do |i|
  puts i

  sleep 1
end

pp foo # 追加

実行。

[ActiveJob] Enqueued FooJob (Job ID: 0849abfd-0075-4645-a135-9075a82f9947) to Async(default) with arguments: "\x04\bo:\bFoo\b:\a@ai\x00:\a@bi\x00:\a@ci\x00", "do_something", "\x04\b[\bi\x06i\ai\b"
0
[ActiveJob] [FooJob] [0849abfd-0075-4645-a135-9075a82f9947] Performing FooJob (Job ID: 0849abfd-0075-4645-a135-9075a82f9947) from Async(default) enqueued at 2026-01-28T09:51:35.721457000Z with argumen
ts: "\x04\bo:\bFoo\b:\a@ai\x00:\a@bi\x00:\a@ci\x00", "do_something", "\x04\b[\bi\x06i\ai\b"
1
2
3
4
foo performing (1, 2, 3)
5
#<Foo:0x0000000120f77fd0 @a=1, @b=2, @c=3>
[ActiveJob] [FooJob] [0849abfd-0075-4645-a135-9075a82f9947] Performed FooJob (Job ID: 0849abfd-0075-4645-a135-9075a82f9947) from Async(default) in 5022.28ms
6
#<Foo:0x0000000120c8b830 @a=0, @b=0, @c=0>

予想通りですが、メッセージを送った相手である foo の状態は変化していません。

マーシャリング / アンマーシャリングをしているわけですから、当然復元されたオブジェクトは元のオブジェクトの複製であり、元のオブジェクトではありません。 センダーがいるプロセスと ActiveJob が実行されるプロセスでオブジェクトを共有できない以上、オブジェクトを複製するしかなく、複製である以上同じオブジェクトではないことは明白です。

これでも構わないケースがある

値オブジェクトのばあい

値オブジェクトは不変であることが特徴の一つで、状態が同じであれば同じ値と扱われます。 複製であっても同一の値として扱えるので、このばあいは複製されても問題になりません。

もっともそれは、オブジェクトに非同期メッセージを送ることと関数を並行で実行することとで大差がなくなってしまうので、あえてこのような仕組みを考えるまでもないかもしれません。

ActiveRecord など状態がオブジェクトの外部にあるばあい

ActiveRecord のオブジェクトはデータベース上のレコードをオブジェクトに映し取ったものです。 メモリ上では異なるオブジェクトでも同じレコードを参照するのであれば同じ値として扱えます。 もちろんレコードとオブジェクトが適切に同期されていることが前提ですが。

実際、Rails アプリケーションでジョブを利用するケースではそれを前提としていることが普通と思います。

本当の非同期メッセージを実現するには?

オブジェクトがプロセスをまたげないのなら、最初からオブジェクトを別のプロセスに用意してそこに向かってメッセージを送る、ことになると思います。

そしてこちらのプロセスには、さも向こうのプロセスにいるオブジェクトのようにふるまうプロキシオブジェクトを用意し、 プロキシ内に非同期送信の仕組みを押し込むのがよさそうです。

そんなわけで。 次回は、 Ruby に導入された平行処理の仕組みの中では一番新しい Ractor を使って非同期メッセージ送信を実現する方法を考えてみようと思います。

いつか読むはずっと読まない:分岐する自己

人の意識が肉体を捨てソフトウェアとして生きるようになった時代の話。

記事で、複製されるオブジェクトについて書いているうちに思い出したので。