[ https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161666#comment-17161666 ]
Greg Harris commented on KAFKA-5756: ------------------------------------ I was able to reproduce this race condition with the following setup: * ConnectDistributedTest.test_bounce * clean bounces = True * tests/kafkatest/tests/connect/templates/connect-distributed.properties edited to include offset.flush.interval.ms=1 This setup makes collisions between the periodic commitOffsets and the commitOffsets call from stop()/close() extremely likely. In a single run of 9 bounces, I managed to have 7 instances of duplication (12 records in 7 non-consecutive groups). An example log file when one of these race conditions happens. I've interspersed the VerifiableSourceTask's stdout messages to indicate when the records are produced and committed. {noformat} [2020-07-20 22:45:47,619] DEBUG Submitting 1 entries to backing store. The offsets are: {{id=1}={seqno=15709} [2020-07-20 22:45:52,622] DEBUG Submitting 1 entries to backing store. The offsets are: {{id=1}={seqno=16209}} [2020-07-20 22:45:52.622] {"task":1,"seqno":16210,"time_ms":1595285152622,"name":"verifiable-source","topic":"test"} [2020-07-20 22:45:52,623] DEBUG [Producer clientId=connector-producer-verifiable-source-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=connector-producer-verifiable-source-1, correlationId=1395) and timeout 2147483647 to node 1: {acks=-1,timeout=2147483647,partitionSizes=[test-0=185]} [2020-07-20 22:45:52,623] INFO Stopping task verifiable-source-1 [2020-07-20 22:45:52.627] {"task":1,"seqno":16211,"time_ms":1595285152627,"name":"verifiable-source","topic":"test"} [2020-07-20 22:45:52,627] INFO WorkerSourceTask{id=verifiable-source-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) [2020-07-20 22:45:52,627] DEBUG [Producer clientId=connector-producer-verifiable-source-1] Received PRODUCE response from node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=connector-producer-verifiable-source-1, correlationId=1395): org.apache.kafka.common.requests.ProduceResponse@464a5bec (org.apache.kafka.clients.NetworkClient) [2020-07-20 22:45:52,627] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter) [2020-07-20 22:45:52,630] ERROR WorkerSourceTask{id=verifiable-source-1} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111) at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:490) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:274) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2020-07-20 22:45:52.630] {"committed":true,"task":1,"seqno":16210,"time_ms":1595285152630,"name":"verifiable-source","topic":"test"} [2020-07-20 22:45:52,630] DEBUG [Producer clientId=connector-producer-verifiable-source-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=connector-producer-verifiable-source-1, correlationId=1396) and timeout 2147483647 to node 1: {acks=-1,timeout=2147483647,partitionSizes=[test-0=185]} [2020-07-20 22:45:52,630] ERROR WorkerSourceTask{id=verifiable-source-1} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [2020-07-20 22:45:52,631] INFO WorkerSourceTask{id=verifiable-source-1} Finished commitOffsets successfully in 9 ms (org.apache.kafka.connect.runtime.WorkerSourceTask) [2020-07-20 22:45:52,631] INFO [Producer clientId=connector-producer-verifiable-source-1] Closing the Kafka producer with timeoutMillis = 30000 ms. [2020-07-20 22:45:52,632] DEBUG [Producer clientId=connector-producer-verifiable-source-1] Received PRODUCE response from node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=connector-producer-verifiable-source-1, correlationId=1396): org.apache.kafka.common.requests.ProduceResponse@6f98d76 [2020-07-20 22:45:52,632] DEBUG [Producer clientId=connector-producer-verifiable-source-1] Beginning shutdown of Kafka producer I/O thread, sending remaining records. [2020-07-20 22:45:52.632] ducker10 {"committed":true,"task":1,"seqno":16211,"time_ms":1595285152632,"name":"verifiable-source","topic":"test"} [2020-07-20 22:45:52,635] DEBUG [Producer clientId=connector-producer-verifiable-source-1] Shutdown of Kafka producer I/O thread has completed. [2020-07-20 22:45:52,636] DEBUG [Producer clientId=connector-producer-verifiable-source-1] Kafka producer has been closed [2020-07-20 22:45:52,639] DEBUG Graceful stop of task verifiable-source-1 succeeded. {noformat} Sequence numbers 16210 and 16211 are then duplicated once the connector restarts, since the later commit including those offsets is discarded due to the exception. When the normal interval of 5000ms is used, the test is flakey but passes ~90% of the time. In order to resolve that flakiness, we need to resolve this race condition. > Synchronization issue on flush > ------------------------------ > > Key: KAFKA-5756 > URL: https://issues.apache.org/jira/browse/KAFKA-5756 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 0.10.2.0 > Reporter: Oleg Kuznetsov > Priority: Major > Fix For: 0.11.0.1, 1.0.0 > > > Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* > method, whereas this collection can be accessed from 2 different threads: > - *WorkerSourceTask.execute()*, finally block > - *SourceTaskOffsetCommitter*, from periodic flush task -- This message was sent by Atlassian Jira (v8.3.4#803005)