Contents
  1. 1. 修改副本数
  • 带账密终端消费
  • Kafka 2.8.0,移除了对Zookeeper的依赖,通过KRaft进行自己的集群管理
    Kafka 3.3.1 Mark KRaft as Production Ready

    基于TCP
    发布订阅模式 topic+partition

    Raft协议是当今最流行的分布式协调算法,Etcd、Consul等系统的基础,就来自于此。

    总结一下其实就是四个要点

    • 顺序读写
    • 零拷贝
    • 消息压缩
    • 分批发送

    https://cloud.tencent.com/developer/article/1547380

    Apache Kafka

    1
    $ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

    flink-kafka demo

    1
    2
    3
    4
    5
    6
    7
    8
    9
    val ks = KafkaSource.builder[String]()  
    .setBootstrapServers("localhost:9092")
    .setTopics("quickstart-events")
    .setGroupId("s")
    .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(classOf[StringDeserializer]))
    .build()
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    env.fromSource[String](ks,WatermarkStrategy.noWatermarks[String](), "Kafka Source").print()
    env.execute()

    kafka-console-consumer.sh

    修改副本数

    increase-replication-factor.json
    1
    2
    3
    {"version":1,"partitions":[
    {"topic":"test","partition":0,"replicas":[2,3,4]},{"topic":"test","partition":1,"replicas":[3,4,5]},{"topic":"test","partition":2,"replicas":[0,1,2]}
    ]}
    1
    ./kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 --reassignment-json-file increase-replication-factor.json --execute

    带账密终端消费

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    export KAFKA_OPTS="-Djava.security.auth.login.config=/dev/stdin"

    echo 'KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="alice" password="alice-secret"; };' \
    | kafka-console-consumer.sh \
    --bootstrap-server node1:9092 \
    --topic test \
    --from-beginning \
    --consumer-property security.protocol=SASL_PLAINTEXT \
    --consumer-property sasl.mechanism=SCRAM-SHA-256 \
    --timeout-ms 3000 \
    --max-messages 5