Matthias: Thanks for your reply. With your answer, I have found the cause of my problem.
There is nothing wrong with the KafkaProducer code. The problem is with the use of KafkaComsuer. I am storing committed offsets outside of Kafka. I am counting the received consumer records to compute committed offset. Since I do not take the "commit markers" into account, duplicated consumer records are received. Matthias J. Sax <matth...@confluent.io> 于2018年8月23日周四 上午12:41写道: > > I would assume, that you refer to "commit markers". Each time you call > commitTransaction(), a special message called commit marker is written > to the log to indicate a successful transaction (there are also "abort > markers" if a transaction gets aborted). > > Those markers "eat up" one offset, but wont' be delivered to the > application but are filtered out on read. Thus, using transaction, you > cannot infer from `endOffset - startOffset` of a partition how many > message are actually in the topic. > > You can verify this by consuming the topic and inspecting the offsets of > returned messages -- commit/abort markers are skipped and you wont > receive message with consecutive offsets. > > -Matthias > > On 8/22/18 8:34 AM, jingguo yao wrote: > > I am sending some Kafka messages over the Internet. The message sizes > > are about 400K. The essential logic of my code is as follows: > > > > Properties config = new Properties(); > > config.put("bootstrap.servers", "..."); > > config.put("client.id", "..."); > > config.put("key.serializer", > > "org.apache.kafka.common.serialization.ByteArraySerializer"); > > config.put("value.serializer", > > "org.apache.kafka.common.serialization.ByteArraySerializer"); > > config.put("transactional.id", "..."); > > > > producer = new KafkaProducer<>(config()); > > producer.initTransactions(); > > > > producer.beginTransaction(); > > for (byte[] bytesMessage : batch) { > > producer.send( > > new ProducerRecord<>("...", null, bytesMessage)); > > } > > producer.commitTransaction(); > > > > I found that there were two records for some bytesMessage on the > > topic. Is there something wrong with my code? Or duplicated message > > deliveries are still possible with transactional.id set. > > > -- Jingguo