[ 
https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161666#comment-17161666
 ] 

Greg Harris commented on KAFKA-5756:
------------------------------------

I was able to reproduce this race condition with the following setup:
 * ConnectDistributedTest.test_bounce
 * clean bounces = True
 * tests/kafkatest/tests/connect/templates/connect-distributed.properties 
edited to include offset.flush.interval.ms=1

This setup makes collisions between the periodic commitOffsets and the 
commitOffsets call from stop()/close() extremely likely. In a single run of 9 
bounces, I managed to have 7 instances of duplication (12 records in 7 
non-consecutive groups).

An example log file when one of these race conditions happens. I've 
interspersed the VerifiableSourceTask's stdout messages to indicate when the 
records are produced and committed.
{noformat}
[2020-07-20 22:45:47,619] DEBUG Submitting 1 entries to backing store. The 
offsets are: {{id=1}={seqno=15709}
[2020-07-20 22:45:52,622] DEBUG Submitting 1 entries to backing store. The 
offsets are: {{id=1}={seqno=16209}}
[2020-07-20 22:45:52.622] 
{"task":1,"seqno":16210,"time_ms":1595285152622,"name":"verifiable-source","topic":"test"}
[2020-07-20 22:45:52,623] DEBUG [Producer 
clientId=connector-producer-verifiable-source-1] Sending PRODUCE request with 
header RequestHeader(apiKey=PRODUCE, apiVersion=8, 
clientId=connector-producer-verifiable-source-1, correlationId=1395) and 
timeout 2147483647 to node 1: 
{acks=-1,timeout=2147483647,partitionSizes=[test-0=185]}
[2020-07-20 22:45:52,623] INFO Stopping task verifiable-source-1
[2020-07-20 22:45:52.627] 
{"task":1,"seqno":16211,"time_ms":1595285152627,"name":"verifiable-source","topic":"test"}
[2020-07-20 22:45:52,627] INFO WorkerSourceTask{id=verifiable-source-1} 
Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-07-20 22:45:52,627] DEBUG [Producer 
clientId=connector-producer-verifiable-source-1] Received PRODUCE response from 
node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=8, 
clientId=connector-producer-verifiable-source-1, correlationId=1395): 
org.apache.kafka.common.requests.ProduceResponse@464a5bec 
(org.apache.kafka.clients.NetworkClient)
[2020-07-20 22:45:52,627] ERROR Invalid call to OffsetStorageWriter flush() 
while already flushing, the framework should not allow this 
(org.apache.kafka.connect.storage.OffsetStorageWriter)
[2020-07-20 22:45:52,630] ERROR WorkerSourceTask{id=verifiable-source-1} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is 
already flushing
        at 
org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:490)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:274)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2020-07-20 22:45:52.630] 
{"committed":true,"task":1,"seqno":16210,"time_ms":1595285152630,"name":"verifiable-source","topic":"test"}
[2020-07-20 22:45:52,630] DEBUG [Producer 
clientId=connector-producer-verifiable-source-1] Sending PRODUCE request with 
header RequestHeader(apiKey=PRODUCE, apiVersion=8, 
clientId=connector-producer-verifiable-source-1, correlationId=1396) and 
timeout 2147483647 to node 1: 
{acks=-1,timeout=2147483647,partitionSizes=[test-0=185]}
[2020-07-20 22:45:52,630] ERROR WorkerSourceTask{id=verifiable-source-1} Task 
is being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask)
[2020-07-20 22:45:52,631] INFO WorkerSourceTask{id=verifiable-source-1} 
Finished commitOffsets successfully in 9 ms 
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-07-20 22:45:52,631] INFO [Producer 
clientId=connector-producer-verifiable-source-1] Closing the Kafka producer 
with timeoutMillis = 30000 ms.
[2020-07-20 22:45:52,632] DEBUG [Producer 
clientId=connector-producer-verifiable-source-1] Received PRODUCE response from 
node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=8, 
clientId=connector-producer-verifiable-source-1, correlationId=1396): 
org.apache.kafka.common.requests.ProduceResponse@6f98d76
[2020-07-20 22:45:52,632] DEBUG [Producer 
clientId=connector-producer-verifiable-source-1] Beginning shutdown of Kafka 
producer I/O thread, sending remaining records.
[2020-07-20 22:45:52.632] ducker10 
{"committed":true,"task":1,"seqno":16211,"time_ms":1595285152632,"name":"verifiable-source","topic":"test"}
[2020-07-20 22:45:52,635] DEBUG [Producer 
clientId=connector-producer-verifiable-source-1] Shutdown of Kafka producer I/O 
thread has completed.
[2020-07-20 22:45:52,636] DEBUG [Producer 
clientId=connector-producer-verifiable-source-1] Kafka producer has been closed 
        
[2020-07-20 22:45:52,639] DEBUG Graceful stop of task verifiable-source-1 
succeeded.
{noformat}
Sequence numbers 16210 and 16211 are then duplicated once the connector 
restarts, since the later commit including those offsets is discarded due to 
the exception.

When the normal interval of 5000ms is used, the test is flakey but passes ~90% 
of the time. In order to resolve that flakiness, we need to resolve this race 
condition.

> Synchronization issue on flush
> ------------------------------
>
>                 Key: KAFKA-5756
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5756
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.10.2.0
>            Reporter: Oleg Kuznetsov
>            Priority: Major
>             Fix For: 0.11.0.1, 1.0.0
>
>
> Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* 
> method, whereas this collection can be accessed from 2 different threads:
> - *WorkerSourceTask.execute()*, finally block
> - *SourceTaskOffsetCommitter*, from periodic flush task



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to