让我们使用Java客户端创建一个用于发布和使用消息的应用程序。 Kafka生产者客户端包括以下API。
让我们了解本节中最重要的一组Kafka生产者API。 KafkaProducer API的中心部分是 KafkaProducer 类。 KafkaProducer类提供了一个选项,用于将其构造函数中的Kafka代理连接到以下方法。
KafkaProducer类提供send方法以异步方式将消息发送到主题。 send()的签名如下
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1) , callback);
ProducerRecord - 生产者管理等待发送的记录的缓冲区。
回调 - 当服务器确认记录时执行的用户提供的回调(null表示无回调)。
KafkaProducer类提供了一个flush方法,以确保所有先前发送的消息都已实际完成。 flush方法的语法如下 -
public void flush()KafkaProducer类提供了partitionFor方法,这有助于获取给定主题的分区元数据。 这可以用于自定义分区。 这种方法的签名如下 -
public Map metrics()它返回由生产者维护的内部度量的映射。
public void close() - KafkaProducer类提供关闭方法块,直到所有先前发送的请求完成。
生产者API的中心部分是生产者类。 生产者类提供了一个选项,通过以下方法在其构造函数中连接Kafka代理。
生产者类提供send方法以使用以下签名向单个或多个主题发送消息。
public void send(KeyedMessaget<k,v> message) - sends the data to a single topic,par-titioned by key using either sync or async producer. public void send(List<KeyedMessage<k,v>>messages) - sends data to multiple topics. Properties prop = new Properties(); prop.put(producer.type,"async") ProducerConfig config = new ProducerConfig(prop);