Tech Racho エンジニアの「?」を「!」に。
  • Ruby / Rails関連

Railsで"脳に優しい"シンプルなイベント駆動システムを構築しよう(翻訳)

概要

元サイトの許諾を得て翻訳・公開いたします。

日本語タイトルは内容に即したものにしました。

Railsで"脳に優しい"シンプルなイベント駆動システムを構築しよう(翻訳)

イベントソーシング(event sourcing)といえば、専門用語が山盛りでややこしく、多くの開発者にとって理解しにくいものです。基本的な概念を説明するときにも「集約ルート(aggregate root)」や「投影(projection)」といった難解な用語が飛び交いがちです。

ハイレベルな「完全装備のイベントソーシング」では、この概念を中心に据えてアプリケーション全体を構築することが推奨されますが、最初のうちはコードベースの狭い範囲に導入する方がうまくいくことが多いでしょう。

以前の私は、イベントソーシングの最も広範な部分については馴染みがあったものの、JavaコードやKafkaストリームが大量に登場し、分散システムを最終的に統合するときのあらゆる苦労がつきまとうオーバーキルな設計のように思えて仕方がありませんでした。

しかし最近の私は、とことん簡素化した基本的なイベントソーシング(私はこれを「脳に優しい(smooth brains)イベントソーシング」と呼んでいます)でアプリケーションを構築するようになっていて、このモデルは「退屈なRailsモノリス1」に最適であることがわかってきたのです。

🔗 "脳に優しい"イベントソーシングを選んだ理由

アプリケーションでは大量のイベントが生成されます。開発者がイベントだと思っていなくても、やはりイベントは発生しているのです。GitHubサイト上のプロジェクトで言えば、「新規issueの作成」「コメントの追加」「ラベルの追加」など、さまざまなイベントが存在します。

こうしたイベントを何らかのリストとしてフィードする必要が生じることはよくあります。アプリケーションが複雑になってくれば、イベント発生時に行わなければならない「処理」もどんどん増えてきます。

再びGitHub issueの例を思い出してみましょう。
issueにコメントが追加されたら、issueの作者にメールで通知したり、チームメンバーに通知を送信したり、カウンタを更新したり、自動化アクションをトリガーしたり、スパムチェックを走らせたり、コメント投稿者の貢献グラフを更新したりする必要が生じるでしょう。

このようにしてあっという間に、さまざまな処理を行うコードを山ほど書くはめになってきます。そういうわけで、イベントとやりとりするための何らかの標準的なパターンが必要になってきます。

ここでは、私たちがArrowsというサービスで行っている、シンプルなイベントソーシングについて説明します。私たちの運用規模はGitHubなどの大規模アプリケーションほどではありませんが、このシンプルなイベントソーシングは十分機能しています。パターンがシンプルなおかげで、さらに複雑なものを追加する必要が生じるまで長期的にスケール可能です。

🔗 イベントだけに注目する設計

「投影(projection)」「集約(aggregate)」「リアクター(reactor)」「コマンドクエリの責任分離(command query responsibility separation)」「読み取りモデル(read model)」といった概念に煩わされることなく、ひたすらイベントに着目します。

また、「GitHub issuesの基本バージョン」という特定のビジネスドメインに関連するイベントにも着目します。

以下のスキーマを持つissue_eventsテーブルを作成します(私は基本的にこの構造を用いますが、好みに応じて調整してください)。

create_table :issue_events do |t|
  t.references :issue, null: false, foreign_key: true

  t.references :actor, null: false, foreign_key: { to_table: :users }
  t.string :action, null: false, index: true
  t.references :record, polymorphic: true, null: true

  t.jsonb :extra, null: false, default: "{}"

  t.datetime :occurred_at, null: false, default: -> { "CURRENT_TIMESTAMP" }
  t.timestamps
end

Railsアプリの場合、私はイベントモデルをIssue名前空間の下に配置するのが好みです(特に"event"のような一般的な名前が使われている場合)。

class Issues < ApplicationRecord
  belongs_to :project

  has_many :events, class_name: "Issue::Event"
end

class Issue::Event < ApplicationRecord
  belongs_to :issue

  belongs_to :actor, class_name: "User"
  belongs_to :record, polymorphic: true
end

🔗 イベントのモデル

Issue::Eventは、イベントデータを保存するシンプルなモデルです。

action
イベントの名前(例: "comment_added"、"label_added")。入力ミスや無効なイベントにならないよう、バリデーションもいくつか加えています。
actor
アクションを実行するユーザー。特定の人物ではなくアプリケーション自身によって生成されるイベントには"system"ユーザーも存在します。
occurred_at
イベントの発生時刻
record
操作が行われたレコードへのポリモーフィック関連付け(オプショナル)。例: CommentLabel
extra
必要になるかもしれない追加データ保存用のJSONBカラム。JSONBは構造化されていないので一般的にやや注意が必要だが、基本的な用途では問題ない。
class Issue::Event < ApplicationRecord
  belongs_to :issue

  belongs_to :actor, class_name: "User"
  belongs_to :record, polymorphic: true

  SUPPORTED_ACTIONS = %w[
    comment_added
    comment_deleted
    comment_viewed
    label_added
    ...
  ].freeze

  validates :action, inclusion: {
    in: SUPPORTED_ACTIONS,
    message: "%{value} is not a valid action"
  }
end

ここでは何一つ特別なことはしません。他のモデルと同様にクエリや操作を実行できる、基本的なRailsモデルです。

さて、ここでイベントを作成するための優秀なAPIが必要になったとします。実際にAPIを使ってみてすぐわかったことの1つは、イベントの種類によってはスロットリングが必要だということです。

たとえば、コメントが閲覧されたかどうかをトラッキングしたい場合、ページが表示されるたびに全部のイベントを愚直に記録する必要はありません。特定の期間を定めて1個の"comment viewed"イベントにグループ化すればよいのです。

私たちのアプリでは、「アクティビティが発生していない」(例: そのissueがしばらく誰からも閲覧されていない)といったイベントを記録したかったのですが、チェックのたびにno_activityイベントを追加し続けるようなことはしたくなかったので、スロットルをポーリング間隔よりも大きく設定しました。

本来の「適切な」イベントソーシングは、個別のイベントをすべて記録してから、それらをまとめるなり中間スナップショットを作成するなど複雑なことを行うものですが、私たちの場合は作成時にスロットリングするだけで十分でした。完全無欠の履歴は残せませんが、それらを処理するためのメカニズムを追加せずに済みます。

class Issue < ApplicationRecord
  has_many :events, -> { order(occurred_at: :desc) },
    class_name: "Issue::Event",
    dependent: :destroy

  def record_event!(
    action,
    actor: Current.user,
    record: nil,
    extra: {},
    throttle_within: nil
  )
    if throttle_within.present?
      existing = events.find_by(
        action: action,
        record: record,
        actor: actor,
        occurred_at: throttle_within.ago..
      )
      return existing if existing&.touch(:occurred_at)
    end

    events.create!(
      action: action,
      record: record,
      actor: actor,
      extra: extra
    )
  end
end

# イベントを記録
@issue.record_event!(:comment_added, actor: @comment.author, record: @comment)

# イベントをスロットリング
@issue.record_event!(:comment_viewed, record: @comment, throttle_within: 15.minutes)

# 追加のメタデータ用データを追加する
@issue.record_event!(:label_added, extra: { name: @label.name })

このイベントを作成するrecord_event!メソッドをIssueモデルに追加してから、特定の期間に収まる場合はスロットリングするメソッドを必要に応じて追加します。

スロットリングを行うために、スロットル期間内に発生する「同じアクション」「アクション」「レコード」の既存のイベントを検索して、occurred_atタイムスタンプをtouchで更新します。

🔗 アクティビティフィードの出来上がり!

ここまでいい感じに作れました。とは言っても、それっぽいアクティビティフィードを1個作っただけです。

class Issues::FeedsController < ApplicationController
  def show
    @issue = Issue.find(params[:issue_id])
    @page, @events = pagy(@issue.events.order(occurred_at: :desc), items: 10)
  end
end

# フィード内の個別のイベントをレンダリングするビューやコンポーネントを作成してください
# 個別の項目には、行の項目を構成する`action`、`actor`、`occurred_at`、`record`があります
# 種別を示すアイコンやさまざまな色などを定義できます
# (読者への宿題とします)
render Issues::UI::Feed.new(@events)

将来イベントを手軽に追加できるようになったという意味では確かに便利ですが、このままではイベントソーシングの真の実力を示していません。

真の力を見せるには、イベントで実際に他の作業も行う必要があります。

Arrowsで私が手がけた仕事(および、それ以外のほぼすべてのRailsアプリで私が行った仕事)では、さまざまな統合や通知システム、アプリで何が起こったかを知るための軽量の「メトリック」ダッシュボードを最終的に構築することになります。

このIssueモデルの場合、たとえばコメントが追加されたらissue作成者にメールを送信し、issue作者のGitHub通知の受信箱に追加し、Slackチャンネルにも投稿するようにする必要があるとします。

Event BusやPub/Subライブラリのような重量級の手法に手を出さなくても、Railsのafter_create_commitコールバックで実現できます。

「うげ、あの邪悪なコールバックを使うの?」いえいえ、コールバックは確かに混乱の元になることもありますが、Railsの強力なツールの1つであることも確かです。コールバックは「よく切れる刃物」に例えられますが、それは「キッチンに持ち込んではならない」という意味ではなく「気をつけて使え」という意味です。

class Issue::Event < ApplicationRecord
  # ...

  after_create_commit :broadcast

  private

  def broadcast
    Email::Inbox.new(self).process_later
    AppNotification::Inbox.new(self).process_later
    Slack::Inbox.new(self).process_later
    # アプリにふさわしい処理なら何でもここに足せる
  end
end

このbroadcastメソッドは、イベントが作成された直後に呼び出されます(注: この呼び出しは、イベントが最初にスロットリングされたタイミングで行われますが、以後は呼び出されません: この振る舞いが適切でないユースケースもあります)。

続いて、そのイベントを「受信箱(inbox)」と呼ばれるさまざまなオブジェクトに送信します。個別の受信箱では、「イベントを送信すべきか」「どんなデータを送信するか」「どんな方法で送信するか」を指定できます。Railsでお馴染みの_laterサフィックスを追加しておけば、それらの処理がほぼ確実にバックグラウンドジョブとして実行されることが他の開発者にもわかります。

個別の受信箱は本記事には掲載しませんが、受信箱の一般的な構造は次のようになります。

class Email::Inbox
  def initialize(event)
    @event = event
  end

  def process_later
    Job.perform_later(@event)
  end

  def process
    case @event.action.to_sym
    when :comment_added
      # 例: issue作成者にメールを送信する
      # 例: issueをサブスクライブしたユーザーにメールを送信する
      # 例: 通知を有効にしているプロジェクトメンテナーにメールを送信する
      # ...
    when :label_added
      # ...
    when :comment_viewed
      # ...
    end
  end

  private

  class Job < ApplicationJob
    def perform(event)
      new(event).process
    end
  end
end

ご想像のとおり、さまざまな種類のイベントを大量に処理する受信箱もあれば、わずかなイベントを処理するだけの受信箱もあります。しかし、イベントを受け取って処理するためのクラスを作成するという受信箱の一般的なパターンは変わりません。

この受信箱は自由に構成できるので、今後の処理のロジックが肥大化してきたら、イベントを処理する別のクラスを抽出するといったことも可能です。これは、Slack統合などの受信トレイで特に便利です。Slack統合では、あるイベントオブジェクトを、Slackで期待されるフォーマット済みAPIペイロードに変換できるSlack::MessageBuilderのようなオブジェクトを作成できます。

こうしておけば、アプリケーションに機能を追加するときに、イベントが発生したときの処理をどこに置けばよいかが明確にわかる便利な場所が用意されていることに気づけるでしょう。

🔗 まいた種を刈り取る

基本的なセットアップが終わったので、一見非常に複雑そうに思えた機能でもずっと手軽に構築できるようになりました。

外部APIへの統合を新たに構築したければ、以下のようにお膳立てはもう整っています。

class Linear::Inbox
  #...

  def process
    return unless @event.issue.synced_to_linear?

    case @event.action.to_sym
    when :comment_added
      Linear::API.add_comment!(@event.issue, @event.record.body)
      #...
    end
  end

  # ...
end

基本的なワークフローを自動化したい場合も、これを元に構築できます。

class Issue::Workflow < ApplicationRecord
  belongs_to :issue
  has_many :conditions
  has_many :actions

  attribute :triggered_on
  validates :triggered_on, inclusion: {
    in: Issue::Event::SUPPORTED_ACTIONS
  }
end

@issue.workflows.create!(
  triggered_on: "comment_added",
  conditions: [
    { attribute: "created_at", operator: "lt", value: "2022-01-01" }
  ],
  actions: [
    { type: "reply", message: "This issue is stale, open a new one" }
  ]
)

機能がどのぐらいの頻度で使われているかをチェックするために基本的な「履歴」クエリを実行できるようにしたい場合にも、強固な基盤として使えます。

Issue::Event.where(action: "comment_deleted")
  .where(issue: @account.issues)
  .count

データを埋め戻すためにイベントを「リプレイ」する機能が必要になったら、通常のActive Recordモデルと同様にイベントをクエリしてから独自の処理を書けばよいのです。

last_commented_at = @issue.events
  .where(action: "comment_added")
  .maximum(:occurred_at)
@issue.update(last_commented_at: last_commented_at)

この書き方は、Arrowsで強力なパターンとして定着しています。そのおかげで、少数精鋭のチームが複数の「システム」を短期間で構築できるようになりました。私たちの主要なドメインオブジェクトには50種類ほどのイベント種別がありますが、時間が経っても実に快適に扱えることがわかってきました。

新機能の追加も、コードのメンテナンスも簡単です。イベントの作成と処理が分離されているおかげでテストも書きやすく、既存の振る舞いを壊す心配なしに安全に作業できます。

最後に、このコードは(本格的なイベントソーシングや大量の追加サービスによる方法と異なり)基本的かつシンプルなので理解もしやすく、しかも実際に私たちの現場で使われているのです。

🔗 謝辞と参考文献

イベントソーシングというアイデアに立ち戻ったきっかけは、LaravelのVerbsパッケージに関連する文脈でDaniel CoulbourneChris Morrellの話を聞いたことでした。

いつも素晴らしいMartin Fowlerのブログにもイベントソーシングの見事な記事があり、プロダクトエンジニアの思考法と学術的なイベントソーシングの仕組みの間のギャップを埋めるのに役立ちました。

さらに、私が駆け出しで.NETやJavaのコードと取り組んでいた頃に読んだものの、当時はさっぱり理解できなかったイベントソーシングやCQRSの書籍にも感謝したいと思います。当時の自分にはピンときませんでしたが、その一部を理解できるようになったことを嬉しく思います。


本記事が役に立つと思った方は、ぜひフォームでニュースレターの登録をお願いします。余分な文章を削ぎ落とした読みやすく価値ある情報だけをニュースレターで配信いたします。

関連記事

Railsの技: Action Mailerを活用する良い書き方を改めておさらいする(翻訳)


  1. 訳注: 同サイトの名前にもなっている「boring」は、「退屈なまでに安心して使える」といったポジティブな意味で使われています。 

CONTACT

TechRachoでは、パートナーシップをご検討いただける方からの
ご連絡をお待ちしております。ぜひお気軽にご意見・ご相談ください。