概要
原著者の許諾を得て翻訳・公開いたします。
- 英語記事: RediSearch and time series data
- 原文公開日: 2018/04/01
- 著者: Dmitry Polyakovsky(@dmitrypol)
Rails: RedisとRediSearchで時系列データを扱う(翻訳)
前回の記事では、時系列データをRedisとElasticSearchの間で統合する方法を探りました。今回はさらにその先に進んで、RediSeatchモジュールを用いるRedisで時系列データを検索する方法をご紹介します。
私たちは、Ruby on Railsフレームワークを用いて構築された国内規模の小売チェーン向けのと同じものを概念実証(POC)アプリとして用いることにします。Webサイトではユーザーがさまざまな対話的操作で検索(サイトに来るユーザーの郵便番号だったり、ユーザーが探している製品だったり)を行えるようにしたいと思います。
時系列データでよくあるアプローチは、何らかの周期(1日単位が多い)でインデックスを作成することです。続いて、古いインデックスを削除する(または別のデータストアに移動する)プロセスを定期的に実行して、プライマリのRedis DBには直近X日のデータのみを保持します。
インデックスを日毎に分割する
ロジックをカプセル化するため、レコードを作成して適切なインデックスに挿入するクラスを別途ひとつ作成します。
# config/initializers/redis.rb
REDI_SEARCH = Redis.new host: 'localhost', ...
# app/services/
class RediSearchClient
def initialize time: nil, index_per_day: nil, index_pattern: nil
@time = time || Time.now
@index_per_day = index_per_day || true
@index_pattern = index_pattern || 'search_log'
end
def create
return if index_exists? == true
REDI_SEARCH.call('FT.CREATE', get_index, 'SCHEMA', 'zipcode', 'TEXT',
'product', 'TEXT')
end
def add id: , zipcode: , product:
create
REDI_SEARCH.call('FT.ADD', get_index, id, '1.0', 'FIELDS',
'zipcode', zipcode, 'product', product)
end
private
def index_exists?
return true if REDI_SEARCH.call("FT.INFO", get_index)
# インデックスがない場合のエラーハンドリングが必要
end
def get_index
return "#{@index_pattern}:#{@time.strftime("%Y-%m-%d")}" if @index_per_day
return @index_pattern
end
end
データ処理が遅延する可能性があるのと、昨日のデータを本日のインデックスに挿入したくないため、渡すのが遅れないようにしています。インデックスで使う命名パターンを決定して日毎のインデックスを作成するかどうかを決定する(これはインデックスパターンに日付スタンプを追加することで行う)ためのオプションパラメータも渡します。
複数のインデックスに対してクエリをかけるには、FT.SEARCH
コマンドでRedisに個別のリクエストを作成し、それらの結果をコードでマージしなければなりません。インデックスのリストを返すために、パターンにマッチするRedisキーを取得します。
class RediSearchClient
...
def search query: , limit:
output = {}
get_indexes.each do |index|
result = REDI_SEARCH.call('FT.SEARCH', index, query, 'LIMIT', 0, limit).drop(1)
output.merge! ( Hash[result.each_slice(2).to_a] ) unless result.empty?
end
return output
end
private
...
def get_indexes
REDI_SEARCH.call("keys", "idx:#{@index_pattern}*").map do |index|
index.split(':').drop(1).join(':')
end
end
end
Redis内のデータは以下のような感じになります。日付ごとにft_index0
がひとつ、ft_invidx
が複数ある形です。
idx:search_log:YYYY-MM-DD ft_index0
ft:search_log:YYYY-MM-DD/java ft_invidx
ft:search_log:YYYY-MM-DD/redis ft_invidx
古いインデックスを廃棄するために、FT.DROP
に適切な日付(Time.now - X.days
)を渡してクラスの初期化を呼び出します。
class RediSearchClient
...
def drop
REDI_SEARCH.call('FT.DROP', get_index)
end
end
インデックスが廃棄されると、RediSearchはft_index0
やft_invidx
キー(複数)を削除するほか、ドキュメント自身を保管するRedisハッシュも削除します。
RedisSearch内の他のメソッドをサポートしたり、インデックスのSCHEMA
やドキュメントフィールドをさらに抽象化するには、このコードにまだまだ手を加える必要があります。しかしこのコードは単に、アプリ内でこうした複数の関連インデックスを管理できるようになることを示すためのものです。
1つのインデックスですべてのデータをカバーする
複数のインデックスを管理し、複数の検索結果をマージするのに手間暇はかけたくないものです。代わりに、1つのインデックスを用い、古いドキュメントを削除するロジックを構築する手も考えられます。
今のところ同じクラスを使っているので、レコード作成時にインデックス名から日付スタンプを除外するために@index_per_day = false
を指定しています。
FT.SEARCH
は、クエリに一致するレコード件数を最初のパラメータとして返します。これを用いてインデックス内の全ドキュメントをループし、ID(タイムスタンプから導出)をチェックし、FT.DEL
コマンドを実行して各ドキュメントを削除します。
class RediSearchClient
...
def purge
ttl = ((Time.now - X.days).to_f.round(3)*1000).to_i
deleted_count = 0
num = 10
total_docs = REDI_SEARCH.call('FT.SEARCH', get_index, '*', 'NOCONTENT').first
(total_docs/num).times do |i|
offset = (num * i) - deleted_count
document_ids = REDI_SEARCH.call('FT.SEARCH', get_index, '*', 'NOCONTENT',
'LIMIT', offset, num).drop(1)
document_ids.each do |id|
if id.to_i < ttl
REDI_SEARCH.call('FT.DEL', get_index, id, 'DD')
deleted_count += 1
end
end
end
end
end
DD
を指定すると(Redisハッシュ内に保存されている)ドキュメントも削除されます。ft:search_log
キーとredis ft_invidx
キーについてはそのままです。
この手法の大きな欠点は、クエリの実行やドキュメントの削除のためにRedis呼び出しを多数実行する必要がある点です。この手法は、Redisでキーの期限切れを用いるTTLアプローチと比べて著しく複雑になります。