Greg Harris created KAFKA-17871:
-----------------------------------

             Summary: Source task source offset reads can block herder task 
cancellation
                 Key: KAFKA-17871
                 URL: https://issues.apache.org/jira/browse/KAFKA-17871
             Project: Kafka
          Issue Type: Bug
          Components: connect
    Affects Versions: 2.5.0
            Reporter: Greg Harris


In KAFKA-9051, source task offsets reading was modified to allow for 
in-progress read Futures to be cancelled during task shutdown. The 
OffsetReaderStorageImpl#offsetReadFutures uses explicitly synchronized accesses 
to prevent data races between task cancellation and connectors reading offsets.

A thread executing OffsetReaderStorageImpl#offsets method can lock the Set, and 
then call Producer#flush inside KafkaBasedLog#flush.

At the same time, the herder thread may try to shut down the task, time out, 
and call AbstractWorkerSourceTask#cancel. This cancellation attempts to lock 
the Set again, and must wait for the Producer#flush to complete. If the task's 
producer is unhealthy, this can block the herder thread indefinitely.

See the following stacktrace:

 
{noformat}
    java.lang.Thread.State: BLOCKED (on object monitor)
         at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.close(OffsetStorageReaderImpl.java:148)
         - waiting to lock <0x00000006e6ce0748> (a java.util.HashSet)
         at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.cancel(AbstractWorkerSourceTask.java:288)
         at 
org.apache.kafka.connect.runtime.Worker.awaitStopTask(Worker.java:1036)
         at 
org.apache.kafka.connect.runtime.Worker.awaitStopTasks(Worker.java:1054)
         at 
org.apache.kafka.connect.runtime.Worker.stopAndAwaitTask(Worker.java:1082)
         at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$restartTask$23(DistributedHerder.java:1369)
         at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$$Lambda$1647/0x00007f3d01941b28.call(Unknown
 Source)
         at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2240)
         at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
Oct 24 15:55:10 hh-stage-aiven-cdc-kafka-connect-4 java[1180436]:         at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
         at 
java.util.concurrent.Executors$RunnableAdapter.call(java.base@17.0.12/Executors.java:539)
         at 
java.util.concurrent.FutureTask.run(java.base@17.0.12/FutureTask.java:264)
         at 
java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.12/ThreadPoolExecutor.java:1136)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.12/ThreadPoolExecutor.java:635)
         at java.lang.Thread.run(java.base@17.0.12/Thread.java:840){noformat}
and

 

 
{noformat}
    java.lang.Thread.State: WAITING (parking)
         at jdk.internal.misc.Unsafe.park(java.base@17.0.12/Native Method)
         - parking to wait for  <0x00000006e4f9d610> (a 
java.util.concurrent.CountDownLatch$Sync)
         at 
java.util.concurrent.locks.LockSupport.park(java.base@17.0.12/LockSupport.java:211)
         at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@17.0.12/AbstractQueuedSynchronizer.java:715)
         at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@17.0.12/AbstractQueuedSynchronizer.java:1047)
         at 
java.util.concurrent.CountDownLatch.await(java.base@17.0.12/CountDownLatch.java:230)
         at 
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
         at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:1075)
         at 
org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1215)
         at 
org.apache.kafka.connect.util.KafkaBasedLog$$Lambda$861/0x00007f3d017dc9d8.accept(Unknown
 Source)
         at java.util.Optional.ifPresent(java.base@17.0.12/Optional.java:178)
         at 
org.apache.kafka.connect.util.KafkaBasedLog.flush(KafkaBasedLog.java:345)
         at 
org.apache.kafka.connect.util.KafkaBasedLog.readToEnd(KafkaBasedLog.java:334)
         at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.get(KafkaOffsetBackingStore.java:295)
         at 
org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.lambda$getFromStore$5(ConnectorOffsetBackingStore.java:348)
         at 
org.apache.kafka.connect.storage.ConnectorOffsetBackingStore$$Lambda$1277/0x00007f3d0184d600.apply(Unknown
 Source)
         at java.util.Optional.map(java.base@17.0.12/Optional.java:260)
         at 
org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.getFromStore(ConnectorOffsetBackingStore.java:348)
         at 
org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.get(ConnectorOffsetBackingStore.java:208)
         at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:96)
         - locked <0x00000006e6ce0748> (a java.util.HashSet)
         at 
io.debezium.connector.common.OffsetReader.offsets(OffsetReader.java:42)
         at 
io.debezium.connector.common.BaseSourceTask.getPreviousOffsets(BaseSourceTask.java:365)
         at 
io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:112)
         at 
io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:251)
         at 
io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:178)
         at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
         at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
         at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
         at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
         at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
         at 
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
         at 
org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$990/0x00007f3d01814e08.run(Unknown
 Source)
         at 
java.util.concurrent.Executors$RunnableAdapter.call(java.base@17.0.12/Executors.java:539)
         at 
java.util.concurrent.FutureTask.run(java.base@17.0.12/FutureTask.java:264)
         at 
java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.12/ThreadPoolExecutor.java:1136)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.12/ThreadPoolExecutor.java:635)
         at java.lang.Thread.run(java.base@17.0.12/Thread.java:840){noformat}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to