The new producer is async by default. You can see few examples of how to use it here: https://github.com/gwenshap/kafka-examples/tree/master/SimpleCounter/src/main/java/com/shapira/examples/producer/simplecounter
On Tue, Nov 24, 2015 at 10:40 AM, Amit Karyekar <akarye...@fanatics.com> wrote: > Hi folks, > > We are working with Kafka 8.2.2 and want to use producer.type as async for > sending messages to broker. > > In Kakfka 8.2.2, some new producer properties have been introduced. > However, there is no new name for the property producer.type mentioned in > the documentation. > > We’ve the following configuration for Kafka producer: > > bootstrap.servers: localhost:9092 > > serializer.class: > org.apache.kafka.common.serialization.StringSerializer > > key.serializer: org.apache.kafka.common.serialization.StringSerializer > > value.serializer: > org.apache.kafka.common.serialization.StringSerializer > > acks: 1 > > producer.type: async > > On sending messages, we are seeing following message in logs: > > 2015-11-23 18:18:50 TRACE KafkaProducer:158 - Starting the Kafka producer > 2015-11-23 18:18:50 DEBUG Metadata:141 - Updated cluster metadata version > 1 to Cluster(nodes = [Node(localhost, 9092)], partitions = []) > 2015-11-23 18:18:50 WARN ProducerConfig:121 - The configuration > producer.type = null was supplied but isn't a known config. > 2015-11-23 18:18:50 WARN ProducerConfig:121 - The configuration > serializer.class = null was supplied but isn't a known config. > 2015-11-23 18:18:50 DEBUG KafkaProducer:231 - Kafka producer started > 2015-11-23 18:18:50 DEBUG Sender:117 - Starting Kafka producer I/O thread. > 2015-11-23 18:18:50 TRACE KafkaProducer:374 - Requesting metadata update > for topic ArgosHeartbeat. > 2015-11-23 18:18:50 DEBUG NetworkClient:387 - Trying to send metadata > request to node -1 > 2015-11-23 18:18:50 DEBUG NetworkClient:397 - Init connection to node -1 > for sending metadata request in the next iteration > 2015-11-23 18:18:50 DEBUG NetworkClient:415 - Initiating connection to > node -1 at localhost:9092. > 2015-11-23 18:18:50 DEBUG NetworkClient:387 - Trying to send metadata > request to node -1 > 2015-11-23 18:18:50 DEBUG NetworkClient:348 - Completed connection to node > -1 > 2015-11-23 18:18:50 DEBUG NetworkClient:387 - Trying to send metadata > request to node -1 > 2015-11-23 18:18:50 DEBUG NetworkClient:392 - Sending metadata request > ClientRequest(expectResponse=true, payload=null, > request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=producer-9}, > body={topics=[ArgosHeartbeat]})) to node -1 > 2015-11-23 18:18:50 DEBUG Metadata:141 - Updated cluster metadata version > 2 to Cluster(nodes = [Node(0, 192.168.99.1, 9092)], partitions = > [Partition(topic = ArgosHeartbeat, partition = 2, leader = 0, replicas = > [0,], isr = [0,], Partition(topic = ArgosHeartbeat, partition = 0, leader = > 0, replicas = [0,], isr = [0,], Partition(topic = ArgosHeartbeat, partition > = 1, leader = 0, replicas = [0,], isr = [0,]]) > 2015-11-23 18:18:50 TRACE KafkaProducer:337 - Sending record > ProducerRecord(topic=ArgosHeartbeat, partition=0, key=S109, > value={"logType":"heartbeat","applicationType":"Java","StreamId":"S109","RequestId":"R109","ArgosTimestamp":1448331530502,"Timestamp":1447896190} > with callback null to topic ArgosHeartbeat partition 0 > 2015-11-23 18:18:50 TRACE RecordAccumulator:156 - Allocating a new 16384 > byte message buffer for topic ArgosHeartbeat partition 0 > 2015-11-23 18:18:50 TRACE KafkaProducer:340 - Waking up the sender since > topic ArgosHeartbeat partition 0 is either full or getting a new batch > 2015-11-23 18:18:50 DEBUG NetworkClient:415 - Initiating connection to > node 0 at 192.168.99.1:9092. > 2015-11-23 18:18:50 TRACE KafkaProducer:419 - Closing the Kafka producer. > 2015-11-23 18:18:50 DEBUG NetworkClient:348 - Completed connection to node > 0 > 2015-11-23 18:18:50 DEBUG Sender:128 - Beginning shutdown of Kafka > producer I/O thread, sending remaining records. > 2015-11-23 18:18:50 TRACE Sender:182 - Nodes with data ready to send: > [Node(0, 192.168.99.1, 9092)] > 2015-11-23 18:18:50 TRACE Sender:183 - Created 1 produce requests: > [ClientRequest(expectResponse=true, > payload={ArgosHeartbeat-0=RecordBatch(topicPartition=ArgosHeartbeat-0, > recordCount=1)}, > request=RequestSend(header={api_key=0,api_version=0,correlation_id=1,client_id=producer-9}, > body={acks=1,timeout=30000,topic_data=[{topic=ArgosHeartbeat,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 > lim=169 cap=16384]}]}]}))] > 2015-11-23 18:18:50 TRACE Sender:223 - Received produce response from node > 0 with correlation id 1 > 2015-11-23 18:18:50 TRACE RecordBatch:81 - Produced messages to > topic-partition ArgosHeartbeat-0 with base offset offset 41 and error: null. > 2015-11-23 18:18:50 DEBUG Sender:143 - Shutdown of Kafka producer I/O > thread has completed. > 2015-11-23 18:18:50 DEBUG KafkaProducer:429 - The Kafka producer has > closed. > 2015-11-23 18:18:50 INFO ProducerConfig:113 - ProducerConfig values: > compression.type = none > metric.reporters = [] > metadata.max.age.ms = 300000 > metadata.fetch.timeout.ms = 60000 > acks = 1 > batch.size = 16384 > reconnect.backoff.ms = 10 > bootstrap.servers = [localhost:9092] > receive.buffer.bytes = 32768 > retry.backoff.ms = 100 > buffer.memory = 33554432 > timeout.ms = 30000 > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > retries = 0 > max.request.size = 1048576 > block.on.buffer.full = true > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > metrics.sample.window.ms = 30000 > send.buffer.bytes = 131072 > max.in.flight.requests.per.connection = 5 > metrics.num.samples = 2 > linger.ms = 0 > client.id = > > > In regards to above log, we wanted to know how to set producer.type > parameter to async in Kafka 8.2.2. > Also, it is unable to recognize serializer.class parameter which was a > parameter in old Kafka producer config. > > Regards, > Amit > Information contained in this e-mail message is confidential. This e-mail > message is intended only for the personal use of the recipient(s) named > above. If you are not an intended recipient, do not read, distribute or > reproduce this transmission (including any attachments). If you have > received this email in error, please immediately notify the sender by email > reply and delete the original message. >