2017-07-15
下载安装
下载地址 最新版本kafka_2.12-0.11.0.0.tgz.
zhouhh@/Users/zhouhh/java $ curl http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.11.0.0/kafka_2.12-0.11.0.0.tgz -o kafka_2.12-0.11.0.0.tgz
zhouhh@/Users/zhouhh/java $ tar zxvf kafka_2.12-0.11.0.0.tgz kafka_2.12-0.11.0.0/
zhouhh@/Users/zhouhh/java $ ln -s kafka_2.12-0.11.0.0 kafka
zhouhh@/Users/zhouhh/java $ vi ~/.zshrc
# kafka
export KAFKA_HOME="/Users/zhouhh/java/kafka"
export PATH="$KAFKA_HOME/bin:$PATH"
zhouhh@/Users/zhouhh/java $ source ~/.zshrc
安装zookeeper
安装zookeeper.并配置kafka连接到zookeeper, 测试可以采用kafka自带zookeeper.
启动zookeeper
zhouhh@/Users/zhouhh/java/kafka $ zookeeper-server-start.sh config/zookeeper.properties
启动kafka
zhouhh@/Users/zhouhh/java/kafka $ kafka-server-start.sh config/server.properties
操作kafka
创建topic
zhouhh@/Users/zhouhh/java $ kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic zhhtest
Created topic "zhhtest".
查看topic
zhouhh@/Users/zhouhh/java/kafka $ kafka-topics.sh --list --zookeeper localhost:2181
zhhtest
生产消息
zhouhh@/Users/zhouhh/java/kafka $ kafka-console-producer.sh --broker-list localhost:9092 --topic zhhtest
>hello
>中文
消费消息
zhouhh@/Users/zhouhh/java/kafka $ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic zhhtest --from-beginning
hello
中文
kafka集群
zhouhh@/Users/zhouhh/java/kafka/config $ cp server.properties server-1.properties
zhouhh@/Users/zhouhh/java/kafka/config $ vi server-1.properties
broker.id=1
log.dirs=/tmp/kafka-logs-1
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
listeners=PLAINTEXT://:9093
zhouhh@/Users/zhouhh/java/kafka/config $ cp server-1.properties server-2.properties
zhouhh@/Users/zhouhh/java/kafka/config $ vi server-2.properties
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2
启动服务
zhouhh@/Users/zhouhh/java/kafka $ kafka-server-start.sh config/server-1.properties
zhouhh@/Users/zhouhh/java/kafka $ kafka-server-start.sh config/server-2.properties
创建topic
创建一个复制三份的topic, 一个分区
zhouhh@/Users/zhouhh/java/kafka $ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic zhh-replicated-topic
Created topic "zhh-replicated-topic".
查看topic
用describe 查看集群中该topic每个节点情况
zhouhh@/Users/zhouhh/java/kafka $ kafka-topics.sh --describe --zookeeper localhost:2181 --topic zhh-replicated-topic
Topic:zhh-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: zhh-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
第一行表示汇总信息. 有1个分区, 3份备份 第二行表示每个分区的信息,对分区0,领导节点id是2, 备份到2,0,1.
- leader 表示负责某分区全部读写的节点. 每个分区都会有随机选择的leader.
- Replicas 表示需要复制到的节点, 不管是否活着.
- Isr 表示(“in-sync” replicas), 正在同步的备份, 表示可用的活着的节点
多备份,多分区
zhouhh@/Users/zhouhh/java/kafka $ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic zhh-replicated-partitions-topic
Created topic "zhh-replicated-partitions-topic".
zhouhh@/Users/zhouhh/java/kafka $ kafka-topics.sh --describe --zookeeper localhost:2181 --topic zhh-replicated-partitions-topic
Topic:zhh-replicated-partitions-topic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: zhh-replicated-partitions-topic Partition: 0 Leader: 2 Replicas: 2,0,1Isr: 2,0,1
Topic: zhh-replicated-partitions-topic Partition: 1 Leader: 0 Replicas: 0,1,2Isr: 0,1,2
Topic: zhh-replicated-partitions-topic Partition: 2 Leader: 1 Replicas: 1,2,0Isr: 1,2,0
可以看到每个分区, 其leader不在一个节点上.
没有备份的节点详情
zhouhh@/Users/zhouhh/java/kafka $ kafka-topics.sh --zookeeper localhost:2181 --list
__consumer_offsets
connect-test
zhh-replicated-partitions-topic
zhh-replicated-topic
zhhtest
zhouhh@/Users/zhouhh/java/kafka $ kafka-topics.sh --describe --zookeeper localhost:2181 --topic zhhtest
Topic:zhhtest PartitionCount:1 ReplicationFactor:1 Configs:
Topic: zhhtest Partition: 0 Leader: 0 Replicas: 0 Isr: 0
只有一个备份和一个分区.
消息测试
zhouhh@/Users/zhouhh/java/kafka $ kafka-console-producer.sh --broker-list localhost:9092 --topic zhh-replicated-topic
>第一个消息
>second
zhouhh@/Users/zhouhh/java/kafka $ kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic zhh-replicated-topic
第一个消息
second
可用性检测
节点崩溃
zhouhh@/Users/zhouhh/java/kafka_2.12-0.11.0.0 $ ps aux | grep server.properties
zhouhh 73370 0.2 2.1 6239704 175116 s000 S+ 11:37上午 1:34.39 ...
zhouhh@/Users/zhouhh/java/kafka_2.12-0.11.0.0 $ kill -9 73370
[1] 73370 killed kafka-server-start.sh config/server.properties
另两个节点打印错误信息
[2017-07-15 16:17:54,838] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2017-07-15 16:17:57,662] INFO Partition [zhh-replicated-partitions-topic,2] on broker 1: Shrinking ISR from 1,2,0 to 1,2 (kafka.cluster.Partition)
[2017-07-15 16:18:05,858] WARN [ReplicaFetcherThread-0-0]: Error in fetch to broker 0, request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={zhh-replicated-partitions-topic-1=(offset=0, logStartOffset=0, maxBytes=1048576)}) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 0 was disconnected before the response was read
at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:93)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:93)
at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:207)
at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:151)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
[2017-07-15 16:18:07,310] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions zhh-replicated-partitions-topic-1 (kafka.server.ReplicaFetcherManager)
[2017-07-15 16:18:07,310] INFO Partition [zhh-replicated-partitions-topic,1] on broker 1: zhh-replicated-partitions-topic-1 starts at Leader Epoch 1 from offset 0. Previous Leader Epoch was: 0 (kafka.cluster.Partition)
[2017-07-15 16:18:07,312] INFO [ReplicaFetcherThread-0-0]: Shutting down (kafka.server.ReplicaFetcherThread)
[2017-07-15 16:18:07,322] INFO [ReplicaFetcherThread-0-0]: Stopped (kafka.server.ReplicaFetcherThread)
[2017-07-15 16:18:07,323] INFO [ReplicaFetcherThread-0-0]: Shutdown completed (kafka.server.ReplicaFetcherThread)
zookeeper 错误信息
[2017-07-15 16:17:54,394] WARN caught end of stream exception (org.apache.zookeeper.server.NIOServerCnxn)
EndOfStreamException: Unable to read additional data from client sessionid 0x15d453002070003, likely client has closed socket
at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:239)
at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)
at java.lang.Thread.run(Thread.java:745)
[2017-07-15 16:17:54,404] INFO Closed socket connection for client /0:0:0:0:0:0:0:1:49913 which had sessionid 0x15d453002070003 (org.apache.zookeeper.server.NIOServerCnxn)
consumer 端错误信息 此时收不到信息. 因为该consumer连接到localhost:9092 而该节点被杀掉了.
[2017-07-15 16:18:05,872] WARN Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-07-15 16:18:05,878] WARN Auto-commit of offsets {zhh-replicated-topic-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group console-consumer-97557: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: null (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
查看节点情况
zhouhh@/Users/zhouhh/java/kafka $ kafka-topics.sh --describe --zookeeper localhost:2181 --topic zhh-replicated-topic
Topic:zhh-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: zhh-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,1
zhouhh@/Users/zhouhh/java/kafka $ kafka-topics.sh --describe --zookeeper localhost:2181 --topic zhh-replicated-partitions-topic
Topic:zhh-replicated-partitions-topic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: zhh-replicated-partitions-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,1
Topic: zhh-replicated-partitions-topic Partition: 1 Leader: 1 Replicas: 0,1,2 Isr: 1,2
Topic: zhh-replicated-partitions-topic Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2
消息消费
zhouhh@/Users/zhouhh/java/kafka_2.12-0.11.0.0 $ kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic zhh-replicated-topic
[2017-07-15 16:32:36,078] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
zhouhh@/Users/zhouhh/java/kafka_2.12-0.11.0.0 $ kafka-console-consumer.sh --bootstrap-server localhost:9093 --from-beginning --topic zhh-replicated-topic
都收不到消息. 必须启动第一个节点, 才能收到消息. 不知是何原因.
杀掉其他节点,则不影响消息.
producer端会有警告, consumer端没有警告
>[2017-07-15 16:37:39,148] WARN Connection to node 2 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
zhouhh@/Users/zhouhh/java/kafka $ kafka-topics.sh --describe --zookeeper localhost:2181 --topic zhh-replicated-topic
Topic:zhh-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: zhh-replicated-topic Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 0,1
zhouhh@/Users/zhouhh/java/kafka $ kafka-topics.sh --describe --zookeeper localhost:2181 --topic zhh-replicated-partitions-topic
Topic:zhh-replicated-partitions-topic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: zhh-replicated-partitions-topic Partition: 0 Leader: 0 Replicas: 2,0,1Isr: 0,1
Topic: zhh-replicated-partitions-topic Partition: 1 Leader: 1 Replicas: 0,1,2Isr: 1,0
Topic: zhh-replicated-partitions-topic Partition: 2 Leader: 1 Replicas: 1,2,0Isr: 1,0
zhouhh@/Users/zhouhh/java/kafka $ kafka-topics.sh --describe --zookeeper localhost:2181 --topic zhhtest
Topic:zhhtest PartitionCount:1 ReplicationFactor:1 Configs:
Topic: zhhtest Partition: 0 Leader: 0 Replicas: 0 Isr: 0
kafka connect 输入输出数据
命令行可以方便演示和操作. 但实际环境经常需要和外部数据打交道, 向kafka输入数据, 从kafka输出数据. 这是kafka connect的工作.
下面演示基于文件的数据输入输出, 会在kafka中创建相应的topic
zhouhh@/Users/zhouhh/java/kafka $ cat config/connect-standalone.properties
bootstrap.servers=localhost:9092
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
zhouhh@/Users/zhouhh/java/kafka $ cat config/connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
zhouhh@/Users/zhouhh/java/kafka $ cat config/connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
zhouhh@/Users/zhouhh/java/kafka $ connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
zhouhh@/Users/zhouhh/java/kafka $ echo -e "foo\nbar" > test.txt
zhouhh@/Users/zhouhh/java/kafka $ cat test.sink.txt
foo
bar
zhouhh@/Users/zhouhh/java/kafka $ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
zhouhh@/Users/zhouhh/java/kafka $ echo -e "中文" >> test.txt
zhouhh@/Users/zhouhh/java/kafka $ cat test.sink.txt
foo
bar
中文
zhouhh@/Users/zhouhh/java/kafka $ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"中文"}
参考
http://kafka.apache.org/quickstart
如非注明转载, 均为原创. 本站遵循知识共享CC协议,转载请注明来源