KafkaをDockerで起動してRubyスクリプトからメッセージを送受信する
タグ: rubykafka / 初版公開: 2020-03-21

概要

とりあえずKafkaをDockerで立ち上げてローカルのRubyスクリプトから接続する手順を紹介します。実際にはKafkaクラスタの構成や設定、その他Kafkaをプログラムから使う上で把握しておいた方が良いことが多々ありますが、全て割愛します。

前提ソフトウェア

ソフトウェアバージョン備考
bitnami/kafka2.4.1-debian-10-r8Dockerイメージ
bitnami/zookeeper3.6.0-debian-10-r12Dockerイメージ
ruby2.6.3p62-
ruby-kafka1.0.0-

準備

docker-compose.yml

Kafkaが動作するにはZooKeeperも必要なのでこのエントリではdocker-composeを使います。Kafkaのイメージはbitnamiが提供しているDockerイメージを使います。KafkaのDockerイメージとしてはDL数も10M+で最もメジャーなものかと思います。

以下の内容をdocker-compose.ymlに記述して下さい。この内容はbitnamiが提供するdocker-compose.ymlに対してDocker Hubに書いてあるAccessing Kafka with internal and external clientsの変更を加えたものになります。

DockerボリュームにZooKeeper, Kafkaのデータを永続化していますので手順をゼロからやり直す場合はご注意下さい。プログラムからはローカルホストのポート29092番でKafkaブローカーに接続できるように設定しています。

version: '2'

services:
  zookeeper:
    image: 'bitnami/zookeeper:3'
    ports:
      - '2181:2181'
    volumes:
      - 'zookeeper_data:/bitnami'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:2'
    ports:
      - '9092:9092'
      - '29092:29092'
    volumes:
      - 'kafka_data:/bitnami'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
    depends_on:
      - zookeeper

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

Rubyスクリプト

Rubyのスクリプトを用意します。Kafkaクライアントのラッパーはいくつかありますが仕組みがわからないとドはまりするので最初は素のruby-kafkaを使うことを強くおすすめします。

Gemfile

Gemfileを作ります。

bundle init

Gemfileを編集して以下の1行を追加します。

gem "ruby-kafka"

Gemをインストールします。

bundle install

メッセージ送信スクリプト

produce.rbとして以下の内容を記述します。何も指定せずに接続して、時刻が入ったメッセージをtest_topicに1回送るだけです。余計なことは一切しません。

require "kafka"

kafka = Kafka.new("127.0.0.1:29092")

kafka.deliver_message("Hello at #{Time.now.to_s}", topic: "test_topic")

メッセージ受信スクリプト

consume.rbとして以下の内容を記述します。test_topicからメッセージを読み込み続けます。このスクリプトはブロッキングするので終了しません。

require "kafka"

kafka = Kafka.new("127.0.0.1:29092")

kafka.each_message(topic: "test_topic") do |message|
  puts "offset: #{message.offset}"
  puts "key: #{message.key}"
  puts "value: #{message.value}"
end

以上で準備が整いました。

動作

Kafkaの起動

docker-composeでZooKeeperとKafkaブローカーを起動します。起動には少し時間がかかりますのでログ出力が落ち着くまで待ちます。

docker-compose up

[2020-03-21 04:49:01,561] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer) というようなメッセージが出たら起動は終わっています。

Kafkaにメッセージを送信

Kafkaにメッセージを送信します。

bundle exec ruby produce.rb

Kafka::LeaderNotAvailableエラーが出るかも知れませんが無視します。不安ならもう1回実行するとエラーなく終了するはずです。

Kafkaからメッセージを受信

Kafkaからメッセージを受信します。過去に送ったメッセージがすべて出力されます。

bundle exec ruby consume.rb

ここでもう1度prodduce.rbでメッセージを送ってみると、起動中のconsume.rbで同時にメッセージが受信されることが確認できます。