生产者

屏幕截图 2020-08-20 150154

发送消息:

Properties props = new Properties();
//kafka 集群,broker-list
props.put("bootstrap.servers", "172.24.211.140:9092");
props.put("key.serializer",
        "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
        "org.apache.kafka.common.serialization.StringSerializer");
Producer<String,  String> producer  =  new
        KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
    var record =
            new ProducerRecord<>("test", "Precision Products",
                    "France");
    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            System.out.println(metadata);
        }
    });

}
producer.close();

配置

  • acks
    • 定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的
    • acks=0 ,生产者在成功写入消息之前不会等待任何来自服务器的响应 当 broker 故障时有可能 丢失数据
    • acks=1 ,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应 如果在 follower同步成功之前 leader 故障,那么将会丢失数据
    • acks=all ,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应 如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成 数据重复
  • buffer.memory
    • 设置生产者内存缓冲区的大小
  • compression.type
    • 设置消息压缩算法
  • retries
    • 决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误
  • batch.size
    • 指定了一个批次可以使用的内存大小
  • linger.ms
    • KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去
  • client.id
  • max.in.flight.requests.per.connection
    • 指定了生产者在收到服务器响应之前可以发送多少个消息
  • timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms
  • max.block.ms
    • 调用send时最长的阻塞时间
    • max.request.siz
    • receive.buffer.bytes 和 send.buffer.bytes
    • 分别指定了 TCP socket 接收和发送数据包的缓冲区大小

顺序保证

  • 将max.in.flight.requests.per.connection设置为1

屏幕截图 2020-08-24 085111

保证顺序的方法就是:

  1. 每个主题只分为一个区
  2. 每次发送的消息发送到同一个分区

序列化器

  • 自定义序列化器:实现 Serializer 接口
    • 不推荐使用
  • 其他序列化
    • avro:一种将shcema嵌入在数据里的序列化方式

分区策略

分区的原因:

  • 方便扩展
  • 提高并发

分区原则:

  • 指明 partition 的情况下,直接将指明的值直接作为 partiton 值
  • 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition数进行取余得到 partition 值
  • 否则就是随机取一个值 然后再这个值的基础上进行轮询

自定义分区器:

实现 Partitioner 接口

数据可靠性保证

  • Exactly Once

将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 AtLeast Once 语义

At Least Once + 幂等性 = Exactly Once

results matching " "

No results matching " "

results matching " "

No results matching " "