RedisストリームはRedis 5で新しく追加されたデータ型です。 Apache Kafka に類似したメッセージ処理のためのパワフルな機能を持っており、様々な応用が可能です。
既存の類似の機能
最初にRedisのメッセージ処理に適した既存の機能をいくつか紹介します。
Pub/Sub
RedisはPub/Subメッセージング機能を提供しています。SUBSCRIBE および PUBLISH コマンドにより、 Publisher(発行者)が送信したメッセージを複数の Subscriber(購読者)が受信することができます。PublisherはSubscriberを知っている必要がなく、メッセージを受け取れないPublisherがあってもSubscriberは影響を受けません。ただし、メッセージは保存されないため、SUBSCRIBEしていない期間のメッセージを受け取ることはできません。
Blocked list
Redisのリスト型はメッセージキューとして使うことができ、 BLPOP および BRPOP コマンドにより、リストの先頭もしくは末尾からデータ(メッセージ)の到着を待って受信することができます。複数のクライアントが同時に受信を待つことができますが、1つのメッセージを受け取った時点でそのメッセージは削除されるので、1つのメッセージを受け取れるのは1つのクライアントのみとなります。
ストリームの特徴
Redisストリームは前述のPub/SubおよびBlocked listに対して大幅に強化された機能をもちます。
Redisストリームは以下のような特徴があります。
- データを時系列で持つ追記型データ構造
- 各データは任意の複数のフィールドを持つことができる
- データを取得しても過去のデータを残せる
- 時間の範囲を指定して複数のデータを読み出せる
- コンシューマグループの概念によりクライアントをグループ化して協調動作できる
ストリームのデータ構造
Redisストリームのデータ構造自体は比較的シンプルです。
Redisストリームは追記専用のデータ構造です。各エントリは固有のIDを持ちます。IDは単調増加し、データ追加時にIDを指定しない場合は現在時刻から「<タイムスタンプ>-<連番>」の形式で自動生成されます。1つのエントリは任意の数のフィールドと値のペアで構成されます。
使用例
監視データをストリームに保存することを想定した場合の例
一定時間ごとに監視ツールで計測した監視データを時系列でストリームに保存するという用途を想定します。ここではストリームは単に時系列のデータ構造として使用し、1レコードにホストID、監視項目とその値を保存するものとします。
データを追加
XADDコマンドでストリーム名、ID、フィールド名と値を指定してデータを追加します。IDに「*」を指定するとIDを自動生成します。
> XADD history * hostid 1234 cpu.util 1.2 "1565915608548-0" (←自動生成されたID) > XADD history * hostid 1350 mem.used 65.3 "1565915613858-0" (←自動生成されたID) ...
全範囲のメッセージを取得
XRANGEコマンドは指定した範囲のIDのデータを取得します。「-」は最小のID、「+」は最大のIDを意味するので、以下のコマンドはすべてのデータが取得されます。
> XRANGE history - + 1) 1) "1565915608548-0" 2) 1) "hostid" 2) "1234" 3) "cpu.util" 4) "1.2" 2) 1) "1565915613858-0" 2) 1) "hostid" 2) "1350" 3) "mem.used" 4) "65.3"
最初から1つ分のメッセージを取得
XREADコマンドは指定したIDより大きいIDを持つデータを取得します。ID 0 (0-0)は最小のIDを意味するので、ストリームの最初からデータを取得することになります。COUNTオプションで一回で取得するデータの数を指定できます。
> XREAD COUNT 1 STREAMS history 0 1) 1) "1565915608548-0" 2) 1) "hostid" 2) "1234" 3) "cpu.util" 4) "1.2"
簡易チャットの例
Slackのような複数チャンネルを持ち、過去の履歴も含めて参照可能なチャットシステムを簡易的に再現してみます。
チャンネル作成、発言
XADDコマンドでチャンネル名のストリームにメッセージを追加します。ストリームが存在しない場合は自動的に作成されます。格納するデータはメッセージ本体と発言者とします。
> XADD #channel * msg "channel #channel created." "1565932257598-0" > XADD #channel * msg "こんにちは" user "Taro" "1565932327972-0" > XADD #channel * msg "@Taro こんにちは!" user "Hanako" "1565932387775-0"
新規に参加したユーザがチャンネルの最初からメッセージを取得
XREADコマンドでチャンネルの最初からすべてのメッセージを取得します。
> XREAD STREAMS #channel 0 1) 1) "#channel" 2) 1) 1) "1565932257598-0" 2) 1) "msg" 2) "channel #channel created." 2) 1) "1565932327972-0" 2) 1) "msg" 2) "こんにちは" 3) "user" 4) "Taro" 3) 1) "1565932387775-0" 2) 1) "msg" 2) "@Taro こんにちは!" 3) "user" 4) "Hanako"
※UTF-8 文字列は実際にはエスケープされて出力されますが、分かりやすくするためそのまま記述しています。なお、redis-cliコマンドに–rawオプションを付けて起動するとそのまま出力できます。
新着メッセージを待つ(タイムアウト60秒)
XREADコマンドにBLOCKオプションを付けると、新しいメッセージが追加されるまで指定時間待ちます。IDに「$」を指定すると、XREADコマンドを発行した後に追加されたメッセージのみが対象となります。
> XREAD BLOCK 60000 STREAMS #channel $
誰かが60秒以内に発言する
> XADD #channel * msg "Redis Streamsでチャットを作ってみたよ" user "Jiro" "1565932616524-0"
XREAD BLOCK が返り、メッセージが取得される
1) 1) "#channel" 2) 1) 1) "1565932616524-0" 2) 1) "msg" 2) "Redis Streamsでチャットを作ってみたよ" 3) "user" 4) "Jiro" (2.75s)
次のメッセージを待つ(既にあれば即座に取得)
連続してメッセージを取得する場合は前回取得した最後のIDを指定することに注意してください。再度「$」を指定してしまうと、途中のメッセージが抜け落ちる恐れがあります。
> XREAD BLOCK 60000 STREAMS #channel 1565932616524-0
監視データを複数クライアントで並列処理することを想定した場合の例
1つめの例で保存した監視データをコンシューマグループを利用し、複数のクライアントで協調して処理することを想定します。
ストリーム history に対してコンシューマグループ group1 を作成
XGROUPコマンドでコンシューマグループを作成します。IDに「0」を指定するとストリーム history の最初からデータを読み込みます。
> XGROUP CREATE history group1 0 OK
グループのメンバー C1 が他のメンバーによってまだ読まれていないデータを1つ読む
XREADGROUPコマンドでコンシューマグループを使用してデータを取得します。ここではコンシューマのメンバー「C1」としてデータを読みます。コンシューマがまだ存在しない場合は自動的に作成されます。IDに「>」を指定すると、他のコンシューマに送信されていない新しいメッセージを要求します。
> XREADGROUP GROUP group1 C1 COUNT 1 STREAMS history > 1) 1) "history" 2) 1) 1) "1565915608548-0" 2) 1) "hostid" 2) "1234" 3) "cpu.util" 4) "1.2"
グループのメンバー C2 が他のメンバーによってまだ読まれていないデータを1つ読む
続いてメンバーC2が同様にデータを取得します。先程C1により取得されたデータではなくその次のメッセージが返ります。
> XREADGROUP GROUP group1 C2 COUNT 1 STREAMS history > 1) 1) "history" 2) 1) 1) "1565915613858-0" 2) 1) "hostid" 2) "1350" 3) "mem.used" 4) "65.3"
C1 が取得したがペンディング状態のデータを確認
XREADGROUPでIDに「0」を指定すると、そのメンバーが取得し、かつペンディングとなっている(XACKを返していない)データが返ります。
> XREADGROUP GROUP group1 C1 STREAMS history 0 1) 1) "history" 2) 1) 1) "1565915608548-0" 2) 1) "hostid" 2) "1234" 3) "cpu.util" 4) "1.2"
取得したデータが正常に処理できたことを伝える
コンシューマが取得したデータを正常に処理できたら、そのことをサーバにXACKコマンドで伝えることで一連の処理が完了します。XACKで指定したデータはペンディングエントリリストから削除されます。
> XACK history group1 1565915608548-0 (integer) 1
> XREADGROUP GROUP group1 C1 STREAMS history 0 1) 1) "history" 2) (empty list or set)
さらにデータを読む
以降も同様にメンバーC1、C2でデータを取得し、処理完了を通知します。
> XREADGROUP GROUP group1 C1 COUNT 1 STREAMS history > 1) 1) "history" 2) 1) 1) "1565933823472-0" 2) 1) "hostid" 2) "1234" 3) "proc.num[httpd]" 4) "5" > XREADGROUP GROUP group1 C2 COUNT 1 STREAMS history > 1) 1) "history" 2) 1) 1) "1565933855023-0" 2) 1) "hostid" 2) "1350" 3) "cpu.util" 4) "0.3"
読んだことを確認
> XACK history group1 1565933823472-0 (integer) 1 > XACK history group1 1565915613858-0 1565933855023-0 (integer)
おわりに
今回はRedisストリームについて紹介しました。Redisストリームのデータ構造自体はシンプルですが、データにアクセスするための豊富な機能が用意されています。これにより、単純な時系列データの保存から大規模なメッセージングシステムまで、非常に柔軟な使い方が可能となっています。