概要
原著者の許諾を得て翻訳・公開いたします。
- 英語記事: ElasticSearch and Redis streams
- 原文公開日: 2018/01/16
- 著者: Dmitry Polyakovsky(@dmitrypol)
Rails: ElasticSearchとRedis Streamsのストリームを使う(翻訳)
Redis Listsは、データをメインのデータストアからElasticSearchに移動するジョブのキューとして使えます。時系列データをRedisに残し、かつElasticSearchにコピーする必要がある場合はどうでしょうか。
前回の記事では、国内小売りチェーン向けのRails Webサイトを構築しました。このときは、Redis ListsをElasticSearchへのETLデータキューとして用い、検索頻度の高い郵便番号を記録するためにRedisSortedSetsも用いました。さらに、day_of_week
やhour_of_day
による検索でRedis Stringsによるカウンタを用いました。今回の新たな要件は、各検索とパラメータ(クエリごとの郵便番号や製品)の正確な検索時刻を追えるようにすることです。この機能は、どの製品をどのテンポにストックしておくべきかを顧客が決定する上で役に立ちます。
Redis Streams
通常、この検索データはGET
リクエストの一部としてアプリのログに出力され、Logstashを用いてこれをElasticSearchに保存できます。しかし今回はこのデータをStreamsで最初にRedisに保存します。Streamsは、Redis 4.0.xでリリース予定の新しいデータ構造です。
class StoreLocator
def initialize zipcode:, query:
@zipcode = zipcode
@query = query
...
end
def perform
# search code here
record_stats
end
private
def record_stats
key = "search_log:#{Time.now.strftime("%Y-%m-%d")}"
REDIS_CLIENT.xadd(key, '*', 'zip', @zip, 'query', @query)
end
end
今回は新しいxadd
コマンドを用いてRedisの新しいストリームを作成し、キー/バリューペア(郵便番号とクエリ)を用いて項目をそこに追加します。キーはタイムスタンプを元に1日に1件作成され、ちょうどログのローテーションに似ています。Redis内のデータは次のような感じになります。
# launch redis-cli
xrange search_log:2018-01-08 - +
1) 1) 1515465841276-0
2) 1) "zip"
2) "98168"
3) "query"
4) "Spilt Light"
2) 1) 1515465842278-0
2) 1) "zip"
2) "98114"
3) "query"
4) "Wake-up Volcano"
...
xrange
は、ストリームから項目を取得できる新しいコマンドです。-
と+
を指定すると最初から最後まですべての項目を取り出せます。ストリームのIDは、Unixのエポックタイム(msec単位)とシーケンス番号を元に自動生成されます。REDIS_CLIENT.xadd(key, numeric_id, ...)
で固有のIDを使うこともできます。
ETLからElasticSearchへ
データ量がかなり莫大なので、直近7日間のデータをRedisに残し、それより古いデータをElasticSearchに移すことにします。Streamsのスキーマは柔軟で、ElasticSearchインデックスと相性のよいさまざまなフィールドが使えます。
xrange
これに対応するElasticSearchインデックス("search_log:YYYY-MM-DD"など)を作成し、これら項目に対してバッチでループを回します。また、ストリームの項目IDをElasticSearchのドキュメントIDとして指定します。
ES_CLIENT = Elasticsearch::Client.new ...
# app/jobs/
class RedisElasticEtlJob < ApplicationJob
def perform(num_days=7)
key = "search_log:#{(Time.now - num_days.days).strftime("%Y-%m-%d")}"
count = 100
starting_id = '-'
while true
items = REDIS_CLIENT.xrange(key, starting_id, '+', 'count', count)
items.each do |item|
# => ["1515258610192-0", ["zip", "98134", "query", "Express Mug"]]
hash = Hash[item.second.each_slice(2).to_a]
# => {"zip": "98134", "query": "Express Mug"}
hash['@timestamp'] = Time.strptime(item.first.to_i.to_s, '%Q')
ES_CLIENT.index index: key, type: 'default', id: item.first, body: hash
end
break if items.count < count
last_id = items.last.first.split('-')
starting_id = [last_id.first, (last_id.second.to_i + 1).to_s].join('-')
end
end
end
または、REDIS_CLIENT.xrevrange('search_log:YYYY-MM-DD', '+', '-'
を用いて項目を逆順で取得する方法も考えられます。Redisのデータは、別のストリームキーにTTLを設定するか、REDIS_CLIENT.del(''search_log:YYYY-MM-DD'')
を手動実行することで削除できます。
xread
RedisからElasticSearchへのリアルタイムデータパイプラインがさらに欲しい場合はどうすればよいでしょうか?デイリージョブをスケジューリングしなくても、新しいxread
コマンドでデーモンを構築できます。
class RedisElasticStreamConsumer
def perform
while true
key = "search_log:#{Time.now.strftime("%Y-%m-%d")}"
data = REDIS_CLIENT.xread('BLOCK', 5000, 'STREAMS', key, '$')
# => [["search_log:2018-01-07-21-35", [["1515389726944-0", ["zip", "98178", "query", "Red Select"]]]]]
hash = Hash[data.first.second.first.second.each_slice(2).to_a]
# => {"zip"=>"98178", "query"=>"Red Select"}
id = data.first.second.first.first
hash['@timestamp'] = Time.strptime(id.to_i.to_s, '%Q')
ES_CLIENT.index index: key, type: 'default', id: id, body: hash
end
end
end
この方法で困難なのは、ElasticSearchでドキュメントを作成するスピード(ディスクアクセス)よりも、Redisがストリームに項目を記録するスピード(RAMアクセス)の方が遥かに上である点です。ジョブをスケジューリングする方がこの困難が和らげられますが、これはどちらかというと既存のETLプロセスに向いています。
データを用いる
データにアクセスするために、データソースとしてRedisとElasticSearchのいずれかを選択するロジックを別のクラスにカプセル化します。
class SearchDataSelector
def initialize date:
@date = date
end
def perform
elasticsearch if @date > Date.today - 7.days
redis_streams
end
private
def redis_streams
REDIS_CLIENT.xrange(@date, ...)
end
def elasticsearch
ES_CLIENT.search(index: @date, ...)
end
end
必要なデータをRedisとElasticSearchの両方から特定の期間について扱えるようにするには、このコードにもう少し手を加える必要があるでしょう。ElasticSearch Kibanaを使えば、ElasticSearchでデータを興味深い形でビジュアル表示できます。上述のLogstashでは、Redis内のデータにアクセスする入出力プラグインが使えます。現時点でサポートされているのはリストとチャンネルのみですが、いずれStreamsに統合されるものと期待しています。
Streamsのその他のコマンド
xlen
xlen search_log:2018-01-08
とすると、ストリーム内の項目数を取れます。日付スタンプを持つキーをループすれば、日付と検索件数を表示するテーブルをUIで表示できるでしょう。
maxlen
REDIS.xadd('last_1000_searches', 'maxlen', '~', 1000, '*', 'zip', zip, 'query', query)
とすると、長さの上限を指定してストリームを作成できます。~
を指定することで、直近の約1000項目をRedisに保持してパフォーマンスを上げられます。これによって、私たちの顧客に直近の検索結果を表示できるようになります。
参考リンク
- Redis Streams and the Unified Log — Brandur Leach
- Kafka & Redis Streams – Hacker Noon
- redis-rcp/RCP11.md at master · redis/redis-rcp
- Redis Streams: consumer groups v2 specification document