Matthias:

Thanks for your reply. With your answer, I have found the cause of my
problem.

There is nothing wrong with the KafkaProducer code. The problem is
with the use of KafkaComsuer. I am storing committed offsets outside
of Kafka. I am counting the received consumer records to compute
committed offset. Since I do not take the "commit markers" into
account, duplicated consumer records are received.
Matthias J. Sax <matth...@confluent.io> 于2018年8月23日周四 上午12:41写道:
>
> I would assume, that you refer to "commit markers". Each time you call
> commitTransaction(), a special message called commit marker is written
> to the log to indicate a successful transaction (there are also "abort
> markers" if a transaction gets aborted).
>
> Those markers "eat up" one offset, but wont' be delivered to the
> application but are filtered out on read. Thus, using transaction, you
> cannot infer from `endOffset - startOffset` of a partition how many
> message are actually in the topic.
>
> You can verify this by consuming the topic and inspecting the offsets of
> returned messages -- commit/abort markers are skipped and you wont
> receive message with consecutive offsets.
>
> -Matthias
>
> On 8/22/18 8:34 AM, jingguo yao wrote:
> > I am sending some Kafka messages over the Internet. The message sizes
> > are about 400K. The essential logic of my code is as follows:
> >
> > Properties config = new Properties();
> > config.put("bootstrap.servers", "...");
> > config.put("client.id", "...");
> > config.put("key.serializer",
> > "org.apache.kafka.common.serialization.ByteArraySerializer");
> > config.put("value.serializer",
> > "org.apache.kafka.common.serialization.ByteArraySerializer");
> > config.put("transactional.id", "...");
> >
> > producer = new KafkaProducer<>(config());
> > producer.initTransactions();
> >
> > producer.beginTransaction();
> > for (byte[] bytesMessage : batch) {
> >   producer.send(
> >       new ProducerRecord<>("...", null, bytesMessage));
> > }
> > producer.commitTransaction();
> >
> > I found that there were two records for some bytesMessage on the
> > topic. Is there something wrong with my code? Or duplicated message
> > deliveries are still possible with transactional.id set.
> >
>


-- 
Jingguo

Reply via email to