[ https://issues.apache.org/jira/browse/KAFKA-14947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17717662#comment-17717662 ]
krishnendu Das commented on KAFKA-14947: ---------------------------------------- [~yash.mayya] Thanks for your comment. I have two points based on your comments/suggestion. With our existing connect API code and with the Kafka server (2.6.2), our ingestion mechanism was working fine in the live environment. We checked the Kafka server (2.6.2) WorkerSourceTask::execute() method, and it was following the below-mentioned execution path # Poll the task for the new data. # If get any data, save the new data into the Kafka topic. # Commit the offset. Still today we are able to ingest huge volumes of data in the live env using the Kafka server version 2.6.2. No data duplication is happening. Now we have to upgrade the Kafka version to 3.1.1. The WorkerSourceTask::execute() method in the Kafka server version v3.1.1, it was following the below-mentioned execution path # update the committableOffsets variable (if any offset commit is pending) which is being shared between execute() and commitoffset() functions. # Poll the task for the new data. # If got any data, save the new data into the Kafka topic. Because of the above-mentioned implementation order, the shared variable committableOffsets get set with the latest offset value in the 2nd poll only and will be committed in the next call of WorkerSourceTask::commitOffsets(). So, in the 2nd poll what data u are reading that you already read in the 1st poll itself, wrote to the topic, and commit offset will happen in some time. Because of this execution flow of Kafka server execute(), we are getting duplicate data into the topics. And As you had mentioned "In your provided scenario, why can't the connector simply read from its previous position in the second poll since it should be maintaining some internal state?"...we can store the per file current offset stage in-memory object. But that willn't be persistent. At every start, the object will be reset. Any suggestion, on how we can make it persistent in the new Kafka server (3.1.1) > Duplicate records are getting created in the topic. > ---------------------------------------------------- > > Key: KAFKA-14947 > URL: https://issues.apache.org/jira/browse/KAFKA-14947 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 3.1.1 > Reporter: krishnendu Das > Priority: Blocker > Attachments: Kafka_server_3.1.1_data_duplication_issue_log > > > We are using Kafka connect API (version 2.3.0) and Kafka (version 3.1.1) for > data ingestion purposes. Previously we were using Kafka (version 2.6.2) and > the same Kafka connect API (version 2.3.0). The data ingestion was happening > properly. > > Recently we updated the Kafka version from 2.6.2 to 3.1.1. > Post update we are facing duplicate data issues from the source connector > into the Kafka topic. After debugging the 3.1.1 code, we saw one new function > {*}updateCommittableOffsets{*}() got added and called inside the > {*}WorkerSourceTask::execute{*}() as part of bug fix --"KAFKA-12226: Commit > source task offsets without blocking on batch delivery (#11323)" > > Now because of this function, we are observing this scenario > # Inside the execute() at the start of the flow, the call goes to > updateCommittableOffsets() to check if anything was there to perform the > committed offset or not. As the first poll is still not yet happened, this > function didn't find anything for commit. > # Then Kafka connects API poll() method is called from the > WorkerSourceTask::execute(). *---------> 1st poll* > # Kafka Connect API (using sleepy policy) reads one source file from the > Cloud source directory. > # Read the whole content of the file and send the result set Kafka server to > write to the Kafka topic. > # During the 2nd poll updateCommittableOffsets() found some offset to commit > and its updates a reference variable committableOffsets, which will be used > further by the WorkerSourceTask::commitOffsets() function to perform actual > commit offset. > # Then Kafka connects the API poll() method is called from the > *WorkerSourceTask::execute().* *---------> 2nd poll* > # Kafka Connect API (using sleepy policy) reads the same source file again > from the start, as the offsetStrorageReader::offset() didn’t give the latest > offset. > # Read the whole content of the file and send the result set Kafka server to > write to the Kafka topic.---> These create duplicate data into the topic. > ................................................ > ................................................ > # WorkerSourceTask::commitOffsets() commits the offset. > ................................................ > ................................................ > # Then Kafka connects API poll() method is called from the > {*}WorkerSourceTask::execute(){*}. ---------> 3rd poll > # This time offsetStrorageReader::offset() will be able to give the latest > offset. > # Kafka Connect API (using sleepy policy) reads the same source file from > the last read position. -- This message was sent by Atlassian Jira (v8.20.10#820010)