Tech Racho エンジニアの「?」を「!」に。
  • 開発

Rails: RedisとRediSearchで時系列データを扱う(翻訳)

概要

原著者の許諾を得て翻訳・公開いたします。

Rails: RedisとRediSearchで時系列データを扱う(翻訳)

前回の記事では、時系列データをRedisとElasticSearchの間で統合する方法を探りました。今回はさらにその先に進んで、RediSeatchモジュールを用いるRedisで時系列データを検索する方法をご紹介します。

私たちは、Ruby on Railsフレームワークを用いて構築された国内規模の小売チェーン向けのと同じものを概念実証(POC)アプリとして用いることにします。Webサイトではユーザーがさまざまな対話的操作で検索(サイトに来るユーザーの郵便番号だったり、ユーザーが探している製品だったり)を行えるようにしたいと思います。

時系列データでよくあるアプローチは、何らかの周期(1日単位が多い)でインデックスを作成することです。続いて、古いインデックスを削除する(または別のデータストアに移動する)プロセスを定期的に実行して、プライマリのRedis DBには直近X日のデータのみを保持します。

インデックスを日毎に分割する

ロジックをカプセル化するため、レコードを作成して適切なインデックスに挿入するクラスを別途ひとつ作成します。

# config/initializers/redis.rb
REDI_SEARCH = Redis.new host: 'localhost', ...
# app/services/
class RediSearchClient
  def initialize time: nil, index_per_day: nil, index_pattern: nil
    @time = time || Time.now
    @index_per_day = index_per_day || true
    @index_pattern = index_pattern || 'search_log'
  end
  def create
    return if index_exists? == true
    REDI_SEARCH.call('FT.CREATE', get_index, 'SCHEMA', 'zipcode', 'TEXT',
      'product', 'TEXT')
  end
  def add id: , zipcode: , product:
    create
    REDI_SEARCH.call('FT.ADD', get_index, id, '1.0', 'FIELDS',
      'zipcode', zipcode, 'product', product)
  end
private
  def index_exists?
    return true if REDI_SEARCH.call("FT.INFO", get_index)
    # インデックスがない場合のエラーハンドリングが必要
  end
  def get_index
    return "#{@index_pattern}:#{@time.strftime("%Y-%m-%d")}" if @index_per_day
    return @index_pattern
  end
end

データ処理が遅延する可能性があるのと、昨日のデータを本日のインデックスに挿入したくないため、渡すのが遅れないようにしています。インデックスで使う命名パターンを決定して日毎のインデックスを作成するかどうかを決定する(これはインデックスパターンに日付スタンプを追加することで行う)ためのオプションパラメータも渡します。

複数のインデックスに対してクエリをかけるには、FT.SEARCHコマンドでRedisに個別のリクエストを作成し、それらの結果をコードでマージしなければなりません。インデックスのリストを返すために、パターンにマッチするRedisキーを取得します。

class RediSearchClient
  ...
  def search query: , limit:
    output = {}
    get_indexes.each do |index|
      result = REDI_SEARCH.call('FT.SEARCH', index, query, 'LIMIT', 0, limit).drop(1)
      output.merge! ( Hash[result.each_slice(2).to_a] ) unless result.empty?
    end
    return output
  end
private
  ...
  def get_indexes
    REDI_SEARCH.call("keys", "idx:#{@index_pattern}*").map do |index|
      index.split(':').drop(1).join(':')
    end
  end
end

Redis内のデータは以下のような感じになります。日付ごとにft_index0がひとつ、ft_invidxが複数ある形です。

idx:search_log:YYYY-MM-DD      ft_index0
ft:search_log:YYYY-MM-DD/java  ft_invidx
ft:search_log:YYYY-MM-DD/redis ft_invidx

古いインデックスを廃棄するために、FT.DROPに適切な日付(Time.now - X.days)を渡してクラスの初期化を呼び出します。

class RediSearchClient
  ...
  def drop
    REDI_SEARCH.call('FT.DROP', get_index)
  end
end

インデックスが廃棄されると、RediSearchはft_index0ft_invidxキー(複数)を削除するほか、ドキュメント自身を保管するRedisハッシュも削除します。

RedisSearch内の他のメソッドをサポートしたり、インデックスのSCHEMAやドキュメントフィールドをさらに抽象化するには、このコードにまだまだ手を加える必要があります。しかしこのコードは単に、アプリ内でこうした複数の関連インデックスを管理できるようになることを示すためのものです。

1つのインデックスですべてのデータをカバーする

複数のインデックスを管理し、複数の検索結果をマージするのに手間暇はかけたくないものです。代わりに、1つのインデックスを用い、古いドキュメントを削除するロジックを構築する手も考えられます。

今のところ同じクラスを使っているので、レコード作成時にインデックス名から日付スタンプを除外するために@index_per_day = falseを指定しています。

FT.SEARCHは、クエリに一致するレコード件数を最初のパラメータとして返します。これを用いてインデックス内の全ドキュメントをループし、ID(タイムスタンプから導出)をチェックし、FT.DELコマンドを実行して各ドキュメントを削除します。

class RediSearchClient
  ...
  def purge
    ttl = ((Time.now - X.days).to_f.round(3)*1000).to_i
    deleted_count = 0
    num = 10
    total_docs = REDI_SEARCH.call('FT.SEARCH', get_index, '*', 'NOCONTENT').first
    (total_docs/num).times do |i|
      offset = (num * i) - deleted_count
      document_ids = REDI_SEARCH.call('FT.SEARCH', get_index, '*', 'NOCONTENT',
        'LIMIT', offset, num).drop(1)
      document_ids.each do |id|
        if id.to_i < ttl
          REDI_SEARCH.call('FT.DEL', get_index, id, 'DD')
          deleted_count += 1
        end
      end
    end
  end
end

DDを指定すると(Redisハッシュ内に保存されている)ドキュメントも削除されます。ft:search_logキーとredis ft_invidxキーについてはそのままです。

この手法の大きな欠点は、クエリの実行やドキュメントの削除のためにRedis呼び出しを多数実行する必要がある点です。この手法は、Redisでキーの期限切れを用いるTTLアプローチと比べて著しく複雑になります。

参考リンク

関連記事

Railsのフラグメントキャッシュを分解調査する(翻訳)

RabbitMQはSidekiqと同等以上だと思う: 前編(翻訳)


CONTACT

TechRachoでは、パートナーシップをご検討いただける方からの
ご連絡をお待ちしております。ぜひお気軽にご意見・ご相談ください。