[
https://issues.apache.org/jira/browse/KAFKA-13404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yu-Jhe Li updated KAFKA-13404:
------------------------------
Description:
The Kafka sink connectors don't commit offset to the latest log-end offset if
the messages are produced in a transaction.
>From the code of [WorkerSinkTask.java|#L467], we found that the sink connector
>gets offset from messages and commits it to Kafka after the messages are
>processed successfully. But for messages produced in the transaction, there
>are additional record [control
>batches|http://kafka.apache.org/documentation/#controlbatch] that are used to
>indicate the transaction is successful or aborted.
You can reproduce it by running `connect-file-sink` with the following
properties:
{noformat}
/opt/kafka/bin/connect-standalone.sh /connect-standalone.properties
/connect-file-sink.properties{noformat}
{code:java}
# connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# for testing
offset.flush.interval.ms=10000
consumer.isolation.level=read_committed
consumer.auto.offset.reset=latest
{code}
{code:java}
# connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/test.sink.txt
topics=test{code}
And use Java producer to produce 10 messages to
was:
The Kafka sink connectors don't commit offset to the latest log-end offset if
the messages are produced in a transaction.
>From the code of [WorkerSinkTask.java|#L467]], we found that the sink
>connector gets offset from messages and commits it to Kafka after the messages
>are processed successfully.
> Kafka sink connectors do not commit offset correctly if messages are produced
> in transaction
> --------------------------------------------------------------------------------------------
>
> Key: KAFKA-13404
> URL: https://issues.apache.org/jira/browse/KAFKA-13404
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 2.6.1
> Reporter: Yu-Jhe Li
> Priority: Major
>
> The Kafka sink connectors don't commit offset to the latest log-end offset if
> the messages are produced in a transaction.
> From the code of [WorkerSinkTask.java|#L467], we found that the sink
> connector gets offset from messages and commits it to Kafka after the
> messages are processed successfully. But for messages produced in the
> transaction, there are additional record [control
> batches|http://kafka.apache.org/documentation/#controlbatch] that are used to
> indicate the transaction is successful or aborted.
>
> You can reproduce it by running `connect-file-sink` with the following
> properties:
> {noformat}
> /opt/kafka/bin/connect-standalone.sh /connect-standalone.properties
> /connect-file-sink.properties{noformat}
>
> {code:java}
> # connect-standalone.properties
> bootstrap.servers=localhost:9092
> key.converter=org.apache.kafka.connect.storage.StringConverter
> value.converter=org.apache.kafka.connect.storage.StringConverter
> key.converter.schemas.enable=true
> value.converter.schemas.enable=true
> # for testing
> offset.flush.interval.ms=10000
> consumer.isolation.level=read_committed
> consumer.auto.offset.reset=latest
> {code}
>
> {code:java}
> # connect-file-sink.properties
> name=local-file-sink
> connector.class=FileStreamSink
> tasks.max=1
> file=/tmp/test.sink.txt
> topics=test{code}
> And use Java producer to produce 10 messages to
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)