Kafka を試してみた
最近、仕事で、Kafkaを利用することが多くなってちょっと簡単に調べてみたので残す。 Kafka のサイトを見ると以下のように歌っているのでとにかく、高性能なmessage systemだとあることは知っていた。
Apache Kafka - "A high-throughput distributed messaging system."
ブローカー1つで試してみた
何はともあれ、まずはkafkaのセットアップをしてみた。 とりあえず、最新の0.9.x.xを自分のMacへインストール。
まず、zookeeperのインストールと起動
$ wget http//ftp.jaist.ac.jp/pub/apache/zookeeper/stable/zookeeper-3.4.6.tar.gz $ tar xvf zookeeper-3.4.6.tar.gz $ ln -s zookeeper-3.4.6 zookeeper $ cd zookeeper $ bin/zkServer.sh start
zoo_sample.cfg をそのまま zoo.cfgにコピーしてから起動させてる
次に、kafkaのインストールと起動。特にserver.propertiesは編集していないでもとりあえず大丈夫。 この場合だと、ブローカーは1ノードになる。
$ wget http://ftp.riken.jp/net/apache/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz $ tar xvf kafka_2.11-0.9.0.0.tgz $ ln -s kafka_2.11-0.9.0.0 kafka $ cd kafka $ bin/kafka-server-start.sh config/server.properties
そして、topicを作成する。この時作成したのは、パーティションが1つだけのものと、パーティションが10個のもの
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic study-kafka Created topic "study-kafka". $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic kafka-partitions Created topic "kafka-partitions". $ bin/kafka-topics.sh --list --zookeeper localhost:2181 kafka-partitions study-kafka
作成した結果のkafkaのlog.dirs
にはこんな感じにデータのディレクトリが作成されていた。topic名 + パーティション数になっている
ちなみに、brokerが1つなので、--replication-factor
を2とかにするとこんな感じで怒られる。
そんでもって、簡単な Java アプリケーションを使って試してみる。作成したJavaアプリケーションはgithubで
作成したアプリケーションでメッセージを送るとこんな感じのログがでる。
ログをみると、コンシューマー側のスレッドは、固定のパーティションに紐付くから、パーティション単位での順序性は確保されるということがわかる。そして、offsetもパーティション単位で管理している。 keyの設定でパーティションが決定されるとすると、keyの設計が大事になりそう。例えばホスト単位とか、メトリクス単位とか?
ブローカーを複数にする
実際のところブローカーを1つだけってことは、サービス上でありえないしkafkaの意味がないと思うので、今度は、ブローカーを複数にしてみる。 zookeeperとかの設定を初期化しちゃってから以下のようなプロパティファイルを作成して。ブローカーを3つ(3プロセス)起動した。
config/server-1.properties | config/server-2.properties | config/server-3.properties | |
---|---|---|---|
broker.id |
1 | 2 | 3 |
listeners |
PLAINTEXT://:19092 | PLAINTEXT://:29092 | PLAINTEXT://:39092 |
port |
19092 | 29092 | 39092 |
log.dirs |
~/apps/kafka/data/server-1 | ~/apps/kafka/data/server-2 | ~/apps/kafka/data/server-3 |
さっそく、ブローカーが1つの場合に失敗した、--relication-factor
を2でtopicを作成
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 10 --topic replication-topic Created topic "replication-topic" $ bin/kafka-topics.sh --list --zookeeper localhost:2181 replication-topic
やった成功!そして、予想通り、--replication-factor 2 --partitions 10
なので、20個のデータのディレクトリが作成されて、それぞれのパーティションが他のサーバで作成されていた。
あとはKafkaPrducerのブローカーリストの設定を変更してJavaアプリを実行しても問題なくできた。
1プロセスを落としてみる
じゃあ、レプリケーション2なので1つ死んでも大丈夫かと思って1つプロセス(39092)を殺してやってみた。
参考までに殺したときのzookeeperと死んでないブローカーのログは以下のようになった。
停止したブローカーの担当していたパーティションは、1,2,3,4,7,8,9
なので、ログを見ると、生きていたブローカーではレプリカがあるブローカーで担当する対となるパーティションのReplicaFetcherThread
なるものが停止されている。
- zookeeper
- Broker(kafka) - 19092, 29092
そしてJavaアプリケーション側も以下のようにブローカー(39092)が切断されたことを検知した後も問題なくメッセージをやり取りできることを確認できた。
その後、ブローカー(39092)を復帰させた後もJava側でも復帰したことを検知していた。
全体として、簡単に導入できたし性能も報告をみるととても良いとある。障害性や分散も簡単にできそうなのでとても良さそうに感じた。