Apache Kafka(以下 Kafka)ではメッセージ送信をトランザクションにまとめてアトミックに行うことができます。
さらに Kafka から取り出したメッセージに何らかの処理を行い再度 Kafka に書き込む、という処理を行うにあたっては Offset Commit もトランザクションに含めることができます。
この記事ではトランザクションを用いて送信したメッセージが後段で読まれるまでの流れを説明し、さらに Offset Commit をトランザクション内で行う際の流れについても説明します。
Kafka バージョンは 4.1.0 を用います。
トランザクションでメッセージを送る流れ
メッセージ送信だけでなく Offset Commit も一つのトランザクションにまとめられるのが Kafka のトランザクションの特徴ですが、まずトランザクションを用いて単純にメッセージを送る流れから説明します。
トランザクションを使用してメッセージを送信する際の大まかな流れは以下の通りです (すべて正常に送信できた場合)。

- トランザクションを初期化
- トランザクションを開始
- メッセージを送信 (ここでは送信先は Topic A)
- トランザクションを Commit
このように大まかな流れはデータベースのトランザクションと同様です。そして各ステップでそれぞれ以下のメソッドが対応します。
- トランザクションを初期化 (initTransactions メソッド)
- トランザクションを開始 (beginTransaction メソッド)
- メッセージを送信 (send メソッド)
- トランザクションを Commit (commitTransaction メソッド)
また送信中に例外が起きた場合などは abortTransaction メソッドによりトランザクションを Abort します。これらはすべて Producer のメソッドです。
トランザクションの挙動
ここからは各メソッドがどのように動作するかを説明します。
1. トランザクションの初期化 (initTransactions)
まずトランザクションを利用する前に initTransactions を呼び出します。このメソッドでは Transaction Coordinator に InitProducerId リクエストを送信してトランザクション初期化の処理を行います。InitProducerId リクエストには TransactionalID が含まれています。これは Producer のパラメータで、トランザクションを識別できるようにクラスタ内で一意になるよう設定します。初期化処理ではこの ID をもとに Transaction Coordinator から Producer ID を取得します。initTransactions は Producer 起動時に 1 回実行する必要があります。

2. トランザクションの開始 (beginTransaction)
次に beginTransaction メソッドによってトランザクションを開始します。このメソッドでは Transaction Coordinator にリクエストを送るわけではなく Producer 内部の状態を変更することでトランザクションを開始します。

3. メッセージを送信 (send)
メッセージの送信は通常の送信と同様 send メソッドを用います。基本的には通常の送信と同じ挙動なのですが、Broker はトランザクション中の Produce リクエストを受け取るとメッセージの書き込みの前に (未実施であれば) AddPartitionsToTxn というリクエストを Transaction Coordinator に送ります。このリクエストでは書き込み先の Topic と Partition を Transaction Coordinator に知らせ、__transaction_state というトランザクション管理用内部 Topic に記録します。これによってどの Partition がトランザクション中なのか記録することができます。

4. トランザクションを Commit/Abort (commitTransaction/abortTransaction)
必要なメッセージ送信が完了したら commitTransaction によってトランザクションを確定させます。commitTransaction ではトランザクションを Commit する旨を添えて EndTxn リクエストを Transaction coordinator に送ります。Transaction coordinator がリクエストを受け取るとまず __transaction_state に Commit する旨を記録します。さらに AddPartitionsToTxn によって記録されている Partition に対して Commit したことを示すメッセージを送ります。このメッセージは Control Batch と呼ばれる、トランザクションの終了を示す特別なメッセージです。Control Batch の送信は commitTransaction とは非同期で行われます。

トランザクションを Abort するには abortTransaction を用います。abortTransaction ではトランザクションを Abort する旨を添えて EndTxn リクエストを送り、関連する Partition に Abort したことを示す Control Batch を送ります。
Commit / Abort がされた状態の Partition は下図のようになっています。Control Batch も通常のメッセージと同じように記録されます。

トランザクションで送信したメッセージの読み出し
次にメッセージを読み出す際にトランザクション中のメッセージや Abort したメッセージをどのように扱うかについて説明します。
すでに述べたように、トランザクションの Commit / Abort は Control Batch によって行うのでトランザクション中のメッセージも Partition 内には記録されています。また Abort したメッセージも Partition 内には残っています。これらのメッセージをどのように扱うかは Consumer の設定である isolation.level によって決まります。
isolation.level には read_uncommitted (デフォルト値) あるいは read_committed を設定することができます。read_uncommitted の場合、poll メソッドはトランザクション中かどうかや Abort されているかに関わらずすべてのメッセージを取得することができます。
次に read_committed の場合ですが、この説明のためには Last Stable Offset (LSO) について説明する必要があります。LSO はトランザクションが未確定の Offset のうち一番小さい Offset のこと (下図では Offset 6) です。これより前の Offset はすべて確定していることになり、Stable Offsets といいます。逆にこれより後の Offset の領域では確定と未確定が混在していることになります。混在しうるのは同じ Partition に複数の Producer がトランザクションでメッセージを送信する場合です。read_committed の場合、poll メソッドは Stable Offsets のメッセージのみ取得することができます。さらに Abort されたメッセージを取り除いた上でユーザーに返します。

このように、トランザクションを使って送信したメッセージを読み出す際は isolation.level を read_committed にすることで Commit 済みのメッセージのみ読むことができます。
トランザクションと Offset Commit
冒頭で述べたように、Offset Commit をトランザクションに含められる点が Kafka のトランザクションの特徴です。この仕組みは、Kafka から取得したメッセージに何らかの処理を行い、再度 Kafka に書き込むような処理で利用されます。

この例では以下の流れで処理が進みます。
- クライアント AP が Topic A からメッセージを取得
- 取得したメッセージを AP で処理し、その結果を Topic A’ に送信
- すべての処理結果送信が完了したら Topic A の Offset Commit
ここで Offset Commit がトランザクションに入らない場合を考えてみましょう。処理結果の送信がアトミックにできたとしても Offset Commit に失敗した場合メッセージの取得からやり直すことになり、処理結果が二重に記録されてしまいます。Offset Commit がトランザクションに入れば、Offset Commit に失敗した時に Abort することができます。
このような利点があるため、Kafka から取得したメッセージに何らかの処理を行い、再度 Kafka に書き込むという処理では Offset Commit をトランザクション内で行います。
Offset Commit をトランザクション内で行う際の挙動
Kafka ではトランザクション内で Offset Commit をするために Producer 側で sendOffsetsToTransaction というメソッドが用意されています。全体の流れで見るとトランザクションを Commit する前に sendOffsetsToTransaction をすることになります。

- トランザクションを初期化 (initTransactions メソッド)
- Topic A からメッセージを取り出す (Consumer が poll メソッドで取得)
- トランザクションを開始 (beginTransaction メソッド)
- 処理結果のメッセージを Topic A’ に送信 (send メソッド)
- Topic A について Offset Commit (sendOffsetsToTransaction メソッド)
- トランザクションを Commit (commitTransaction メソッド)
sendOffsetsToTransaction では TxnOffsetCommit リクエストを Consumer Group Coordinator に送ります。Offset Commit のやり方自体は通常の Consumer から行うものと同じで、__consumer_offsets という内部 Topic へ Offset Commit の情報が記録されます。異なるのは内部 Topic への記録の前に (未実施であれば) AddPartitionsToTxn リクエストが送信される点です。send と同様に、この操作によってトランザクションに含まれる Partition の情報が __transaction_state に記録されます。

Offset Commit 情報の Commit (あるいは Abort) も、__consumer_offsets の対応する Partition に Control Batch を送ることによって記録されます。
終わりに
Kafka でトランザクションを用いてメッセージを送信し確定させるまでの挙動と、トランザクションで送信したメッセージを読み出す際の挙動について解説しました。さらに Offset Commit をトランザクション内で行う際の挙動についても解説しました。
Kafka のトランザクションで Offset Commit を行う場合、本来 Consumer から行うものを Producer から行うため混乱しますが、単にメッセージを送るのみであれば使う流れはデータベースのトランザクションと同じようなものです。メッセージを送るのみの流れ、Offset Commit もする場合の流れと段階的に理解するのがよいでしょう。