Rumel created KAFKA-9547:
----------------------------
Summary: Kafka transaction - skip one offset when the application
stops and be started again
Key: KAFKA-9547
URL: https://issues.apache.org/jira/browse/KAFKA-9547
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 2.4.0
Environment: I am using kafka-clients 2.4.0 and
wurstmeister/kafka:2.12-2.3.0
Reporter: Rumel
To be fair, I have tested it with normal kafka without transaction scheme, and
it does not skip the offset when I try to rerun the ProducerTest like a lot of
times.
{code:java}
object ProducerTest extends LazyLogging {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "kafka.local:9092")
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
props.put("acks", "all")
props.put("retries", "3")
val producer = new KafkaProducer[String, String](props)
val record = new ProducerRecord[String, String]("zxc", "key", "value")
val record2 = new ProducerRecord[String, String]("zxc", "key2", "value2")
val record3 = new ProducerRecord[String, String]("zxc", "key3", "value3")
producer.send(record)
producer.send(record2)
producer.send(record3)
Thread.sleep(3000)
}
}{code}
But when I enable transaction on producer, it will skip one offset when the
ProducerTestWithTransaction application is rerun. Like when I first started it,
it has an offset of 0,1,2 then after rerun, it will be 4,5,6 which skips 3, and
so on and so forth.
{code:java}
object ProducerTestWithTransaction extends LazyLogging {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "kafka.local:9092")
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
props.put("enable.idempotence", "true")
props.put("transactional.id", "alona")
props.put("acks", "all")
props.put("retries", "3")
val producer = new KafkaProducer[String, String](props)
val record = new ProducerRecord[String, String]("wew", "key", "value")
val record2 = new ProducerRecord[String, String]("wew", "key2", "value2")
val record3 = new ProducerRecord[String, String]("wew", "key3", "value3")
producer.initTransactions()
try {
producer.beginTransaction()
producer.send(record)
producer.send(record2)
producer.send(record3)
producer.commitTransaction()
} catch {
case e: ProducerFencedException => producer.close()
case e: Exception => producer.abortTransaction();
}
}
}{code}
Please enlighten me why this is happening? Is this the standard behavior when
we are using transaction? Is there any workaround on this to not skip an
offset. Thanks!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)