Apache Kafka 简单生产者示例

让我们使用Java客户端创建一个用于发布和使用消息的应用程序。 Kafka生产者客户端包括以下API。

KafkaProducer API

让我们了解本节中最重要的一组Kafka生产者API。 KafkaProducer API的中心部分是 KafkaProducer 类。 KafkaProducer类提供了一个选项,用于将其构造函数中的Kafka代理连接到以下方法。

  • KafkaProducer类提供send方法以异步方式将消息发送到主题。 send()的签名如下

  • producer.send(new ProducerRecord<byte[],byte[]>(topic, 
    partition, key1, value1) , callback);

    • ProducerRecord - 生产者管理等待发送的记录的缓冲区。

    • 回调 - 当服务器确认记录时执行的用户提供的回调(null表示无回调)。

    • KafkaProducer类提供了一个flush方法,以确保所有先前发送的消息都已实际完成。 flush方法的语法如下 -

    public void flush()
    KafkaProducer类提供了partitionFor方法,这有助于获取给定主题的分区元数据。 这可以用于自定义分区。 这种方法的签名如下 -

  • public Map metrics()
    它返回由生产者维护的内部度量的映射。
    • public void close() - KafkaProducer类提供关闭方法块,直到所有先前发送的请求完成。

    生产者API

    生产者API的中心部分是生产者类。 生产者类提供了一个选项,通过以下方法在其构造函数中连接Kafka代理。

    生产者类

    生产者类提供send方法以使用以下签名向单个或多个主题发送消息。

  • public void send(KeyedMessaget<k,v> message) 
    - sends the data to a single topic,par-titioned by key using either sync or async producer.
    public void send(List<KeyedMessage<k,v>>messages)
    - sends data to multiple topics.
    Properties prop = new Properties();
    prop.put(producer.type,"async")
    ProducerConfig config = new ProducerConfig(prop);

联系我们

邮箱 626512443@qq.com
电话 18611320371(微信)
QQ群 235681453

Copyright © 2015-2024

备案号:京ICP备15003423号-3