Tech Racho エンジニアの「?」を「!」に。
  • 開発

Rails: ElasticSearchとRedis Streamsのストリームを使う(翻訳)

概要

原著者の許諾を得て翻訳・公開いたします。

Rails: ElasticSearchとRedis Streamsのストリームを使う(翻訳)

Redis Listsは、データをメインのデータストアからElasticSearchに移動するジョブのキューとして使えます。時系列データをRedisに残し、かつElasticSearchにコピーする必要がある場合はどうでしょうか。

前回の記事では、国内小売りチェーン向けのRails Webサイトを構築しました。このときは、Redis ListsをElasticSearchへのETLデータキューとして用い、検索頻度の高い郵便番号を記録するためにRedisSortedSetsも用いました。さらに、day_of_weekhour_of_dayによる検索でRedis Stringsによるカウンタを用いました。今回の新たな要件は、各検索とパラメータ(クエリごとの郵便番号や製品)の正確な検索時刻を追えるようにすることです。この機能は、どの製品をどのテンポにストックしておくべきかを顧客が決定する上で役に立ちます。

Redis Streams

通常、この検索データはGETリクエストの一部としてアプリのログに出力され、Logstashを用いてこれをElasticSearchに保存できます。しかし今回はこのデータをStreamsで最初にRedisに保存します。Streamsは、Redis 4.0.xでリリース予定の新しいデータ構造です。

class StoreLocator
  def initialize zipcode:, query:
    @zipcode = zipcode
    @query = query
    ...
  end
  def perform
    # search code here
    record_stats
  end
private
  def record_stats
    key = "search_log:#{Time.now.strftime("%Y-%m-%d")}"
    REDIS_CLIENT.xadd(key, '*', 'zip', @zip, 'query', @query)
  end
end

今回は新しいxaddコマンドを用いてRedisの新しいストリームを作成し、キー/バリューペア(郵便番号とクエリ)を用いて項目をそこに追加します。キーはタイムスタンプを元に1日に1件作成され、ちょうどログのローテーションに似ています。Redis内のデータは次のような感じになります。

# launch redis-cli
xrange search_log:2018-01-08 - +
1) 1) 1515465841276-0
   2) 1) "zip"
      2) "98168"
      3) "query"
      4) "Spilt Light"
2) 1) 1515465842278-0
   2) 1) "zip"
      2) "98114"
      3) "query"
      4) "Wake-up Volcano"
...

xrangeは、ストリームから項目を取得できる新しいコマンドです。-+を指定すると最初から最後まですべての項目を取り出せます。ストリームのIDは、Unixのエポックタイム(msec単位)とシーケンス番号を元に自動生成されます。REDIS_CLIENT.xadd(key, numeric_id, ...)で固有のIDを使うこともできます。

ETLからElasticSearchへ

データ量がかなり莫大なので、直近7日間のデータをRedisに残し、それより古いデータをElasticSearchに移すことにします。Streamsのスキーマは柔軟で、ElasticSearchインデックスと相性のよいさまざまなフィールドが使えます。

xrange

これに対応するElasticSearchインデックス("search_log:YYYY-MM-DD"など)を作成し、これら項目に対してバッチでループを回します。また、ストリームの項目IDをElasticSearchのドキュメントIDとして指定します。

ES_CLIENT = Elasticsearch::Client.new ...
# app/jobs/
class RedisElasticEtlJob < ApplicationJob
  def perform(num_days=7)
    key = "search_log:#{(Time.now - num_days.days).strftime("%Y-%m-%d")}"
    count = 100
    starting_id = '-'
    while true
      items = REDIS_CLIENT.xrange(key, starting_id, '+', 'count', count)
      items.each do |item|
        # => ["1515258610192-0", ["zip", "98134", "query", "Express Mug"]]
        hash = Hash[item.second.each_slice(2).to_a]
        # => {"zip": "98134", "query": "Express Mug"}
        hash['@timestamp'] = Time.strptime(item.first.to_i.to_s, '%Q')
        ES_CLIENT.index index: key, type: 'default', id: item.first, body: hash
      end
      break if items.count < count
      last_id = items.last.first.split('-')
      starting_id = [last_id.first, (last_id.second.to_i + 1).to_s].join('-')
    end
  end
end

または、REDIS_CLIENT.xrevrange('search_log:YYYY-MM-DD', '+', '-'を用いて項目を逆順で取得する方法も考えられます。Redisのデータは、別のストリームキーにTTLを設定するか、REDIS_CLIENT.del(''search_log:YYYY-MM-DD'')を手動実行することで削除できます。

xread

RedisからElasticSearchへのリアルタイムデータパイプラインがさらに欲しい場合はどうすればよいでしょうか?デイリージョブをスケジューリングしなくても、新しいxreadコマンドでデーモンを構築できます。

class RedisElasticStreamConsumer
  def perform
    while true
      key = "search_log:#{Time.now.strftime("%Y-%m-%d")}"
      data = REDIS_CLIENT.xread('BLOCK', 5000, 'STREAMS', key, '$')
      # => [["search_log:2018-01-07-21-35", [["1515389726944-0", ["zip", "98178", "query", "Red Select"]]]]]
      hash = Hash[data.first.second.first.second.each_slice(2).to_a]
      # => {"zip"=>"98178", "query"=>"Red Select"}
      id = data.first.second.first.first
      hash['@timestamp'] = Time.strptime(id.to_i.to_s, '%Q')
      ES_CLIENT.index index: key, type: 'default', id: id, body: hash
    end
  end
end

この方法で困難なのは、ElasticSearchでドキュメントを作成するスピード(ディスクアクセス)よりも、Redisがストリームに項目を記録するスピード(RAMアクセス)の方が遥かに上である点です。ジョブをスケジューリングする方がこの困難が和らげられますが、これはどちらかというと既存のETLプロセスに向いています。

データを用いる

データにアクセスするために、データソースとしてRedisとElasticSearchのいずれかを選択するロジックを別のクラスにカプセル化します。

class SearchDataSelector
  def initialize date:
    @date = date
  end
  def perform
    elasticsearch if @date > Date.today - 7.days
    redis_streams
  end
private
  def redis_streams
    REDIS_CLIENT.xrange(@date, ...)
  end
  def elasticsearch
    ES_CLIENT.search(index: @date, ...)
  end
end

必要なデータをRedisとElasticSearchの両方から特定の期間について扱えるようにするには、このコードにもう少し手を加える必要があるでしょう。ElasticSearch Kibanaを使えば、ElasticSearchでデータを興味深い形でビジュアル表示できます。上述のLogstashでは、Redis内のデータにアクセスする入出力プラグインが使えます。現時点でサポートされているのはリストとチャンネルのみですが、いずれStreamsに統合されるものと期待しています。

Streamsのその他のコマンド

xlen

xlen search_log:2018-01-08とすると、ストリーム内の項目数を取れます。日付スタンプを持つキーをループすれば、日付と検索件数を表示するテーブルをUIで表示できるでしょう。

maxlen

REDIS.xadd('last_1000_searches', 'maxlen', '~', 1000, '*', 'zip', zip, 'query', query)とすると、長さの上限を指定してストリームを作成できます。~を指定することで、直近の約1000項目をRedisに保持してパフォーマンスを上げられます。これによって、私たちの顧客に直近の検索結果を表示できるようになります。

参考リンク

関連記事

Rails: RedisキャッシュとRackミドルウェアでパフォーマンスを改善(翻訳)

RabbitMQはSidekiqと同等以上だと思う: 前編(翻訳)


CONTACT

TechRachoでは、パートナーシップをご検討いただける方からの
ご連絡をお待ちしております。ぜひお気軽にご意見・ご相談ください。