horiga blog

とあるエンジニアのメモ

Kafka を試してみた

最近、仕事で、Kafkaを利用することが多くなってちょっと簡単に調べてみたので残す。 Kafka のサイトを見ると以下のように歌っているのでとにかく、高性能なmessage systemだとあることは知っていた。

Apache Kafka - "A high-throughput distributed messaging system."

ブローカー1つで試してみた

何はともあれ、まずはkafkaのセットアップをしてみた。 とりあえず、最新の0.9.x.xを自分のMacへインストール。

まず、zookeeperのインストールと起動

$ wget http//ftp.jaist.ac.jp/pub/apache/zookeeper/stable/zookeeper-3.4.6.tar.gz
$ tar xvf zookeeper-3.4.6.tar.gz
$ ln -s zookeeper-3.4.6 zookeeper
$ cd zookeeper
$ bin/zkServer.sh start

zoo_sample.cfg をそのまま zoo.cfgにコピーしてから起動させてる

次に、kafkaのインストールと起動。特にserver.propertiesは編集していないでもとりあえず大丈夫。 この場合だと、ブローカーは1ノードになる。

$ wget http://ftp.riken.jp/net/apache/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
$ tar xvf kafka_2.11-0.9.0.0.tgz
$ ln -s kafka_2.11-0.9.0.0 kafka
$ cd kafka
$ bin/kafka-server-start.sh config/server.properties

そして、topicを作成する。この時作成したのは、パーティションが1つだけのものと、パーティションが10個のもの

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic study-kafka
Created topic "study-kafka".
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic kafka-partitions
Created topic "kafka-partitions".
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
kafka-partitions
study-kafka

作成した結果のkafkaのlog.dirsにはこんな感じにデータのディレクトリが作成されていた。topic名 + パーティション数になっている

f:id:horiga:20160214221507p:plain

ちなみに、brokerが1つなので、--replication-factorを2とかにするとこんな感じで怒られる。

f:id:horiga:20160214222143p:plain

そんでもって、簡単な Java アプリケーションを使って試してみる。作成したJavaアプリケーションはgithub

github.com

作成したアプリケーションでメッセージを送るとこんな感じのログがでる。

f:id:horiga:20160214222042p:plain

ログをみると、コンシューマー側のスレッドは、固定のパーティションに紐付くから、パーティション単位での順序性は確保されるということがわかる。そして、offsetもパーティション単位で管理している。 keyの設定でパーティションが決定されるとすると、keyの設計が大事になりそう。例えばホスト単位とか、メトリクス単位とか?

ブローカーを複数にする

実際のところブローカーを1つだけってことは、サービス上でありえないしkafkaの意味がないと思うので、今度は、ブローカーを複数にしてみる。 zookeeperとかの設定を初期化しちゃってから以下のようなプロパティファイルを作成して。ブローカーを3つ(3プロセス)起動した。

config/server-1.properties config/server-2.properties config/server-3.properties
broker.id 1 2 3
listeners PLAINTEXT://:19092 PLAINTEXT://:29092 PLAINTEXT://:39092
port 19092 29092 39092
log.dirs ~/apps/kafka/data/server-1 ~/apps/kafka/data/server-2 ~/apps/kafka/data/server-3

さっそく、ブローカーが1つの場合に失敗した、--relication-factorを2でtopicを作成

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 10 --topic replication-topic
Created topic "replication-topic"
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
replication-topic

やった成功!そして、予想通り、--replication-factor 2 --partitions 10なので、20個のデータのディレクトリが作成されて、それぞれのパーティションが他のサーバで作成されていた。 あとはKafkaPrducerのブローカーリストの設定を変更してJavaアプリを実行しても問題なくできた。

f:id:horiga:20160214223719p:plain

f:id:horiga:20160214223902p:plain

1プロセスを落としてみる

じゃあ、レプリケーション2なので1つ死んでも大丈夫かと思って1つプロセス(39092)を殺してやってみた。

f:id:horiga:20160214224350p:plain

参考までに殺したときのzookeeperと死んでないブローカーのログは以下のようになった。 停止したブローカーの担当していたパーティションは、1,2,3,4,7,8,9なので、ログを見ると、生きていたブローカーではレプリカがあるブローカーで担当する対となるパーティションReplicaFetcherThreadなるものが停止されている。

  • zookeeper

f:id:horiga:20160214224512p:plain

  • Broker(kafka) - 19092, 29092

f:id:horiga:20160214224557p:plain

f:id:horiga:20160214224609p:plain

そしてJavaアプリケーション側も以下のようにブローカー(39092)が切断されたことを検知した後も問題なくメッセージをやり取りできることを確認できた。

f:id:horiga:20160214225050p:plain

その後、ブローカー(39092)を復帰させた後もJava側でも復帰したことを検知していた。

f:id:horiga:20160214225247p:plain

全体として、簡単に導入できたし性能も報告をみるととても良いとある。障害性や分散も簡単にできそうなのでとても良さそうに感じた。

Kafka については以下の記事がとても参考になりました。

kafka.apache.org

Apache Kafkaに入門した | SOTA

Apache Kafkaを使ったマイクロサービス基盤