[ 
https://issues.apache.org/jira/browse/KAFKA-13404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Jhe Li updated KAFKA-13404:
------------------------------
    Description: 
The Kafka sink connectors don't commit offset to the latest log-end offset if 
the messages are produced in a transaction.

>From the code of [WorkerSinkTask.java|#L467], we found that the sink connector 
>gets offset from messages and commits it to Kafka after the messages are 
>processed successfully. But for messages produced in the transaction, there 
>are additional record [control 
>batches|http://kafka.apache.org/documentation/#controlbatch] that are used to 
>indicate the transaction is successful or aborted.

 

You can reproduce it by running `connect-file-sink` with the following 
properties:
{noformat}
/opt/kafka/bin/connect-standalone.sh /connect-standalone.properties 
/connect-file-sink.properties{noformat}
 
{code:java}
# connect-standalone.properties
bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

# for testing
offset.flush.interval.ms=10000

consumer.isolation.level=read_committed
consumer.auto.offset.reset=latest
{code}
 
{code:java}
# connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/test.sink.txt
topics=test{code}
And use Java producer to produce 10 messages to 

 

 

 

  was:
The Kafka sink connectors don't commit offset to the latest log-end offset if 
the messages are produced in a transaction.

>From the code of [WorkerSinkTask.java|#L467]], we found that the sink 
>connector gets offset from messages and commits it to Kafka after the messages 
>are processed successfully.

 


> Kafka sink connectors do not commit offset correctly if messages are produced 
> in transaction
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13404
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13404
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.6.1
>            Reporter: Yu-Jhe Li
>            Priority: Major
>
> The Kafka sink connectors don't commit offset to the latest log-end offset if 
> the messages are produced in a transaction.
> From the code of [WorkerSinkTask.java|#L467], we found that the sink 
> connector gets offset from messages and commits it to Kafka after the 
> messages are processed successfully. But for messages produced in the 
> transaction, there are additional record [control 
> batches|http://kafka.apache.org/documentation/#controlbatch] that are used to 
> indicate the transaction is successful or aborted.
>  
> You can reproduce it by running `connect-file-sink` with the following 
> properties:
> {noformat}
> /opt/kafka/bin/connect-standalone.sh /connect-standalone.properties 
> /connect-file-sink.properties{noformat}
>  
> {code:java}
> # connect-standalone.properties
> bootstrap.servers=localhost:9092
> key.converter=org.apache.kafka.connect.storage.StringConverter
> value.converter=org.apache.kafka.connect.storage.StringConverter
> key.converter.schemas.enable=true
> value.converter.schemas.enable=true
> # for testing
> offset.flush.interval.ms=10000
> consumer.isolation.level=read_committed
> consumer.auto.offset.reset=latest
> {code}
>  
> {code:java}
> # connect-file-sink.properties
> name=local-file-sink
> connector.class=FileStreamSink
> tasks.max=1
> file=/tmp/test.sink.txt
> topics=test{code}
> And use Java producer to produce 10 messages to 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to