Apache Kafka の Idempotent Producer について

kafka_logo

はじめに

この記事は SRA Advent Calendar 2025 の20日目の記事です。

Apache Kafka(以下 Kafka)には送達保証のための機能がいくつかあります。 この記事ではその中でも送達が重複するのを防止するための機能である Idempotent Producer について解説します。

Idempotent Producer は Kafka のバージョン 0.11.0.0 からある機能です。この記事ではバージョン 4.1.0 を元に解説します。

Kafka について

Kafka はストリーミング処理を実現するための OSS です。

Kafka では Producer と呼ばれるクライアントが Broker と呼ばれるサーバにデータを送信します。Broker は複数台で Kafka クラスタを構成します。Kafka ではやり取りする一つ一つのデータを Message と呼びます。 そして Consumer と呼ばれるクライアントが Broker からデータを読み出し処理、活用します。

今回紹介する Idempotent Producer は Broker に重複した Message が送られてきた時にそれを排除する機能です。 ではどのように重複は発生し、またそれはどのように排除されるのでしょうか。

Producer の自動リトライ機能

Producer から Broker に送られる Message に重複が発生するのは Producer の自動リトライ機能によるものです。まずこちらについて説明します。

Producer は Message を送ったあと Broker 側から送信が成功したかについての応答として Ack を受け取ります。この Ack をもとに Producer 側で判断して自動で Message 送信をリトライするのが自動リトライ機能です。 以下の図では送信の失敗を正しく検知し再送しています。このように自動リトライ機能は Message を確実に送るために重要な機能です。

自動リトライ機能は Producer の retries という設定値によって制御できます。 送信失敗時、Producer はこの値の回数だけ再送を試みます(0に設定すると自動再送を行いません)。ただし致命的なエラーが起こった時や delivery.timeout.ms というタイムアウト超過時などはそれ以上再送しません。 その場合はユーザー処理で再送することになりますが、そのような再送は Producer による自動リトライではないので区別されます。

さて、以上が自動リトライ機能の概要ですが、この仕組みでは以下の図のように送信には成功したのに Ack が受け取れず再送してしまう可能性があります。重複はこのようにして発生します。

Idempotent Producer(重複排除機能付きの Producer)

自動リトライによって Message が重複して記録されてしまうのを防ぐのが Idempotent Producer の機能です。次はこちらについて説明します。

この機能では Producer は各 Message に1ずつ増加するシーケンス番号を自動で割り当て、Producer ID とともに送信します。 こうすることで Broker 側では、同じ Producer から同じ Message が来たことを検知します。 以下の図のように、Broker はそれを検知した場合重複している Message を Broker 側に記録しません。 さらに、これだけでは Producer は送信が成功したか分からず再送し続けてしまうため、Broker 側は送信成功の Ack を返します。こうすることで Broker 側では Message は重複しませんし、Producer 側で再送し続けることもありません。

Idempotent Producer の重複排除で注意すべきは、検知できるのは自動リトライによる問題という点です。例えば「Producer に障害があって再起動した」という場合は元の Producer とは別の Producer として Broker は認識するので重複があっても気付けません。 また、「自動リトライでは送りきれなかったのでユーザー処理で再送する」という場合も元の Message とは別の Message として Broker は認識するので重複に気付けません。

またシーケンス番号がついていることによって、重複だけでなく Message を記録する順番も Broker は確認することができます。Producer では送信リクエストを同時に複数送ることができるため、ネットワークの問題などで送信リクエストが届く順番が変わってしまうというのが順番が入れかわる典型的なケースです。 Broker では上記の重複確認を行った上で、送られてきた Message のシーケンス番号が順番通りか(直前に記録したシーケンス番号+1と等しいか)確認します。順番通りでない場合は OUT_OF_ORDER_SEQUENCE_NUMBER というエラーによって Producer 側に通知します。この場合、他の送信リクエストを追い抜いてしまっただけという可能性があるので Producer は自動で Message を再送します。追い抜いてしまったのではなく一部の Message が送信失敗してシーケンス番号が飛んでしまったという場合も考えられますが、その場合は Producer、Broker 双方でシーケンス番号をリセットして再送します。

また Idempotent Producer を有効にするには設定値が以下の条件を満たしている必要があります。デフォルトの設定ではこれらは満たされているので、Idempotent Producer はデフォルトで有効です。

パラメータ 条件 説明
enable.idempotence true Idempotent Producer の有効化設定です。
max.in.flight.requests.per.connection 5以下 Producer では Message の送信リクエストを
同時に複数送ることができます。
同時送信数を決めるのがこのパラメータです。
Idempotent Producer では
実装上5までしか対応していません。
retries 1以上 0だと再送をしないので1以上にします。
acks all Broker でデータの複製が適切にされた状態を
保証することで重複検知を正しく行います。

重複排除のソースコード

最後に、Message の重複を検知し排除している部分を Kafka 4.1.0 のソースコードで見てみましょう。 重複の確認は UnifiedLog の append() メソッド内で行っています。このメソッドは送られてきた Message の記録をし、 記録の結果(メソッド内では appendInfo という名前)を返すメソッドです。以下はそこからの抜粋です。

// now that we have valid records, offsets assigned, and timestamps updated, we need to
// validate the idempotent/transactional state of the producers and collect some metadata
AnalyzeAndValidateProducerStateResult result = analyzeAndValidateProducerState(
        logOffsetMetadata, validRecords, origin, verificationGuard);

if (result.maybeDuplicate.isPresent()) {
    BatchMetadata duplicate = result.maybeDuplicate.get();
    appendInfo.setFirstOffset(duplicate.firstOffset());
    appendInfo.setLastOffset(duplicate.lastOffset);
    appendInfo.setLogAppendTime(duplicate.timestamp);
    appendInfo.setLogStartOffset(logStartOffset);
} else {
    // Append the records, and increment the local log end offset immediately after the append because a

analyzeAndValidateProducerState() メソッド内で重複の確認をしており、その結果を result として受け取っています。その結果で result.maybeDuplicate.isPresent() が True だと重複があることが分かります。続く if 文では重複があるか判定していますが、重複がない場合は else 節に存在する Message 記録の処理が行われ、appendInfo に値がセットされます。 重複がある場合は else 節に入らないため Message の記録は行いません。これによって重複は排除されます。また、記録はしませんが appendInfo に値をセットし正常に Message の記録が行われたときと同じ状態にします。このようにすることで Producer には正常に Message が記録されていることが伝わります。

おわりに

Idempotent Producer の仕組みについて解説しました。 重複を排除するというありがたい機能なので、仕組みを理解し適切に活用しましょう。