誰でも試せる Apache Kafka

この記事は SRA Advent Calendar 202412月24日の記事を一部改変したものです。
本記事においては Apache Kafka 4.0 におけるコードを参照しています。

Apache Kafka について

Apache Kafka(以下、Kafka)はストリーミング処理に欠かせない重要な OSS です。Kafka を利用することにより高可用性、スケーラブル、低レイテンシといった様々なメリットを得ることができます。
並列分散処理の環境を構築するためには複数のサーバを用意したり、環境設定用の Puppet、Ansible を準備したり、といった作業を想定される方もいらっしゃるかと思います。しかし、Kafka には動作を簡単に確認できる Docker 環境などが用意されており、テスト用に試すのであれば意外と簡単に環境を立てることが可能です。
また、性能測定のために Producer と Consumer をそれぞれ用意する必要がありますが、テスト用のツールについてもコミュニティ提供のものが存在しています。
本記事においては、Kafka に興味のある方に向けて、Docker 環境を用いて簡単に Kafka を試すための手順についてご説明いたします。

 Kafka の Docker image

Kafka では3.7.0のリリース以降、コミュニティからKafkaのDocker イメージ
このイメージを利用することで、簡単に Broker を起動することができます。
 
docker run -d --name broker apache/kafka:latest
また、最近では Kafka の中でも GraalVM ベースで開発されたKafka-native というイメージが登場しています。
こちらについては本記事では詳しく取り上げませんが、GraalVM ベースの Kafka ではさらに軽量で高速に起動することができます。
興味のある方はぜひKIP-974をご確認ください。

 Docker Compose ファイルについて

Docker Compose ファイルについても、コミュニティ付帯のものを利用することが可能です。
Kafka のリポジトリには docker/examples というディレクトリがあり、その中は以下のとおりのディレクトリ構造をしています。このディレクトリのなかに docker-compose.yml がそれぞれ用意されており、目的に応じて使い分けることが可能です。
├── examples
│   ├── docker-compose-files
│   │   ├── cluster
│   │   │   ├── combined
│   │   │   │   ├── plaintext
│   │   │   │   └── ssl
│   │   │   └── isolated
│   │   │       ├── plaintext
│   │   │       └── ssl
│   │   └── single-node
│   │       ├── file-input
│   │       ├── plaintext
│   │       └── ssl
 
各ディレクトリの中について少し見てみましょう。
まず、1ノードで起動するモード(single-node)とクラスタで起動するモード(cluster)があります。さらに、クラスタの中では Controller と Broker が別のコンテナで動作するモード(isolated)と同じコンテナで動作するモード(combined)にさらに分かれています。それぞれのパターンに対し、Kafka 内での通信が暗号化されているモード(ssl)と平文で通信するモード(plaintext)が用意されている、といった構造になっています。
一例として Compose ファイルの中身を見てみます。
今回は Compose ファイルの中から、cluster -> isolate -> plaintext のものを確認します。
 
  controller-1:
  image: ${IMAGE}
  environment:
    KAFKA_NODE_ID: 1
    KAFKA_PROCESS_ROLES: 'controller'
    KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093'
    KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
    KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
...
kafka-1:
  image: ${IMAGE}
  ports:
    - 29092:9092
  hostname: kafka-1
  container_name: kafka-1
  environment:
    KAFKA_NODE_ID: 4
    KAFKA_PROCESS_ROLES: 'broker'
    KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093'
      KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
 
利用する Docker イメージは IMAGE 変数によって指定することが可能です。バージョンを切り替えたい場合など、簡単に利用するイメージを変更することができます。
また、各ノード設定内容は environment 内の変数でそれぞれ定義されています。
controller では KAFKA_PROCESS_ROLES として controller、Broker では broker がそれぞれ定義されることにより、各ノード上での動作を切り替えています。(なお、combined モードのクラスタを利用する場合には、broker,controller が設定されます)
Broker が動作するノード上では 9092 番ポートがそれぞれフォワードされています。cluster 環境では3台の Broker が起動し、ローカル上の 29092, 39092, 49092 にアクセスすることで各 Broker に接続をすることが可能です。

 実際に動かしてみる

 Kafka クラスタの起動

では、実際に Kafka クラスタを動作させてみましょう。
Compose ファイルを使って Kafka クラスタを起動するには、以下のように IMAGE 変数に docker イメージのタグを指定します。
今回は Compose ファイルの中から、前節の「docker-compose-ファイルについて」で中身を参照した cluster -> isolate -> plaintext のものを選択して利用します。
 
$ IMAGE=apache/kafka:latest docker compose -f docker/examples/docker-compose-files/cluster/isolated/plaintext/docker-compose.yml up -d
[+] Running 6/6
✔ Container plaintext-controller-3-1  Started                                                                     0.7s
✔ Container plaintext-controller-2-1  Started                                                                     0.7s
✔ Container plaintext-controller-1-1  Started                                                                     0.7s
✔ Container kafka-3                   Started                                                                     1.4s
✔ Container kafka-1                   Started                                                                     1.3s
✔ Container kafka-2                   Started                                                                     1.4s

$ docker ps
CONTAINER ID   IMAGE                 COMMAND                  CREATED         STATUS         PORTS                                           NAMES
f6c59323b6dc   apache/kafka:latest   "/__cacert_entrypoin…"   4 seconds ago   Up 3 seconds   0.0.0.0:39092->9092/tcp, [::]:39092->9092/tcp   kafka-2
9c8aea6a29e5   apache/kafka:latest   "/__cacert_entrypoin…"   4 seconds ago   Up 3 seconds   0.0.0.0:29092->9092/tcp, [::]:29092->9092/tcp   kafka-1
22124b348942   apache/kafka:latest   "/__cacert_entrypoin…"   4 seconds ago   Up 3 seconds   0.0.0.0:49092->9092/tcp, [::]:49092->9092/tcp   kafka-3
fceba9972dca   apache/kafka:latest   "/__cacert_entrypoin…"   4 seconds ago   Up 4 seconds   9092/tcp                                        plaintext-controller-2-1
9f097d2dffe7   apache/kafka:latest   "/__cacert_entrypoin…"   4 seconds ago   Up 4 seconds   9092/tcp                                        plaintext-controller-3-1
3e326f5b2031   apache/kafka:latest   "/__cacert_entrypoin…"   4 seconds ago   Up 4 seconds   9092/tcp                                        plaintext-controller-1-1

メッセージの送受信

起動した Kafka クラスタに対してデータの送受信を行ってみます。
自作のアプリケーションを用いてもよいですが、今回は簡単に kafka-console-producer.sh, kafka-console-consumer.sh を利用します。
なお、ソースコードをビルドできる環境でれば自分でを実行した上で、bin ディレクトリ以下のスクリプトを利用できます。ビルド済みのバイナリもコミュニティページに配布されていますので、こちらのバイナリを利用しても実行できます。
まず、送受信用のトピックを作成しておきましょう。
 
$ bin/kafka-topics.sh --bootstrap-server localhost:29092,localhost:39092,localhost:49092 --create --topic testTopic --partitions 3
...
Created topic testTopic.
produce, consume のテストのためには別途コンソールを開いておき、
それぞれのコンソールで以下のコマンドを実行します。
 
・コンソール1 (Producer)
$ ./bin/kafka-console-producer.sh --bootstrap-server localhost:29092,localhost:39092,localhost:49092 --topic testTopic
> hoge  # プロンプトが表示されたら入力
 
・ コンソール2 (Consumer)
 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:29092,localhost:39092,localhost:49092 --topic testTopic --from-beginning
  hoge  # コンソール1で入力すると表示される

デバッガの接続

Kafka 内部での細かい動作を確認したい場合、動作中のプロセスに対してデバッガを接続したくなることがあります。
コミュニティの Compose ファイルそのままではデバッガ用のポートは解放されていませんが、設定を追加することでデバッガをアタッチすることも可能です。
Kafka においては daemon の起動など、スクリプトによる操作時に kafka-run-class.sh が利用されており、kafka-run-class.sh では環境変数に応じて起動時のオプションが設定されるようになっています。
# Set Debug options if enabled
if [ "x$KAFKA_DEBUG" != "x" ]; then
  # Use default ports
  DEFAULT_JAVA_DEBUG_PORT="5005"
  if [ -z "$JAVA_DEBUG_PORT" ]; then
      JAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT"
  fi
  # Use the defaults if JAVA_DEBUG_OPTS was not set
  DEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT"
  if [ -z "$JAVA_DEBUG_OPTS" ]; then
      JAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS"
  fi
  echo "Enabling Java debug options: $JAVA_DEBUG_OPTS"
  KAFKA_OPTS="$JAVA_DEBUG_OPTS $KAFKA_OPTS"
fi
 
そのため、以下のように Compose ファイルに記載を追加することで、ローカルの 25005 から Broker プロセスにデバッガを接続することができます。
(docker-compose ファイルの修正後はクラスタを再作成してください)
kafka-1:
  image: ${IMAGE}
  ports:
     - 29092:9092
+      - 25005:5005
   environment:
+      KAFKA_DEBUG: 'y'
+      JAVA_DEBUG_PORT: '*:5005'
...

テストに便利なツール群

これまでで Kafka クラスタ環境は構築できましたが、より本格的にテストを行うためのツールも Kafka には付帯しています。現在の Kafka の bin ディレクトリ以下には、約40のスクリプトが用意されていますが、今回はそのうちのいくつかについてご紹介します。
 

kafka-[producer,consumer]-perf-test.sh

Producer と Consumer で性能がどの程度かを簡単に測定するためには、perf-test.sh というツールが利用できます。
Producer と Consumer で別のスクリプトが用意されており、それぞれメッセージのサイズや多重度を指定することができます。
任意のパラメータを指定することで、任意の負荷を簡単に掛けることが可能です。
コマンドの実行例としては以下の通りです。
$ bin/kafka-producer-perf-test.sh --producer-props bootstrap.servers=localhost:29092,localhost:39092,localhost:49092 --topic testTopic --num-records 1000 --record-size 10 --throughput 10
...
1000 records sent, 99.930049 records/sec (0.00 MB/sec), 3.31 ms avg latency, 263.00 ms max latency, 2 ms 50th, 4 ms 95th, 38 ms 99th, 263 ms 99.9th.

$ bin/kafka-consumer-perf-test.sh --bootstrap-server localhost:29092,localhost:39092,localhost:49092 --topic testTopic --messages 1000
...
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2024-12-23 11:15:57:371, 2024-12-23 11:15:57:808, 0.0095, 0.0218, 1000, 2288.3295, 375, 62, 0.1537, 16129.0323
 
送受信したデータに対して、どの程度のレイテンシとなっているか等を測定することができます。

kafka-e2e-latency.sh

kafka-[producer,consumer]-perf-test.sh は Producer と Consumer の性能を確認するためのツールでしたが、送信から受信までの End-to-End でのレイテンシを計測したい場合もあります。
他の Producer などと異なり、引数ごとの指定内容が決まっていますので注意が必要です。
各引数についてはコマンド実行時にヘルプが表示され、以下の通りの内容となっています。
  •  broker_list: Broker
  •  topic: 送信先トピック
  •  num_messages: 送信するメッセージ
  •  producer_acks: producer が送信の際に利用する Acks の設定
  •  message_size_bytes: 送信する1メッセージのサイズ(バイト数)
$./bin/kafka-e2e-latency.sh
USAGE: java org.apache.kafka.tools.EndToEndLatency broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file
$ ./bin/kafka-e2e-latency.sh localhost:29092,localhost:39092,localhost:49092 testTopic 1000 all 10
...
0       142.06386600000002
Avg latency: 3.2986 ms
Percentiles: 50th = 2, 99th = 7, 99.9th = 142

trogdor.sh

Kafka における障害試験の実施を行うためのツールです。
trogdor 用の Coordinator,Agent と呼ばれるプロセスを事前に動作させておきます。
また、別途試験用のファイルを定義しておき、ジョブとして登録することでディスク障害・ネットワーク障害などを疑似的に再現することが可能です。
詳しくは trogdor の README、もしくは Wiki を参照ください。
 

まとめ

何かと面倒に感じることの多い並列分散環境の構築ですが、最近では用途に応じて簡単に環境を構築することが可能です。これを機に、みなさんもApache Kafkaの世界に飛び込んでみてはいかがでしょうか。