Kafka 2.8.0,移除了对Zookeeper的依赖,通过KRaft进行自己的集群管理
Kafka 3.3.1 Mark KRaft as Production Ready
基于TCP
发布订阅模式 topic+partition
Raft协议是当今最流行的分布式协调算法,Etcd、Consul等系统的基础,就来自于此。
总结一下其实就是四个要点
https://cloud.tencent.com/developer/article/1547380
Apache Kafka
1
| $ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
|
flink-kafka demo
1 2 3 4 5 6 7 8 9
| val ks = KafkaSource.builder[String]() .setBootstrapServers("localhost:9092") .setTopics("quickstart-events") .setGroupId("s") .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(classOf[StringDeserializer])) .build() val env = StreamExecutionEnvironment.getExecutionEnvironment() env.fromSource[String](ks,WatermarkStrategy.noWatermarks[String](), "Kafka Source").print() env.execute()
|
kafka-console-consumer.sh
修改副本数
increase-replication-factor.json1 2 3
| {"version":1,"partitions":[ {"topic":"test","partition":0,"replicas":[2,3,4]},{"topic":"test","partition":1,"replicas":[3,4,5]},{"topic":"test","partition":2,"replicas":[0,1,2]} ]}
|
1
| ./kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 --reassignment-json-file increase-replication-factor.json --execute
|
带账密终端消费
1 2 3 4 5 6 7 8 9 10 11 12
| export KAFKA_OPTS="-Djava.security.auth.login.config=/dev/stdin"
echo 'KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="alice" password="alice-secret"; };' \ | kafka-console-consumer.sh \ --bootstrap-server node1:9092 \ --topic test \ --from-beginning \ --consumer-property security.protocol=SASL_PLAINTEXT \ --consumer-property sasl.mechanism=SCRAM-SHA-256 \ --timeout-ms 3000 \ --max-messages 5
|