[ https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224721#comment-16224721 ]
ASF GitHub Bot commented on FLINK-7838: --------------------------------------- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4915#discussion_r147674418 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java --- @@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() { private void flushNewPartitions() { LOG.info("Flushing new partitions"); + enqueueNewPartitions().await(); + } + + private TransactionalRequestResult enqueueNewPartitions() { Object transactionManager = getValue(kafkaProducer, "transactionManager"); - Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); - invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler}); - TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result"); - Object sender = getValue(kafkaProducer, "sender"); - invoke(sender, "wakeup"); - result.await(); + synchronized (transactionManager) { + Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); + invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler}); + TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result"); + Object sender = getValue(kafkaProducer, "sender"); + invoke(sender, "wakeup"); --- End diff -- Good catch. It shouldn't make a difference, but I will change it to better match original code. > Kafka011ProducerExactlyOnceITCase do not finish > ----------------------------------------------- > > Key: FLINK-7838 > URL: https://issues.apache.org/jira/browse/FLINK-7838 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.4.0 > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > Priority: Blocker > Fix For: 1.4.0 > > Attachments: initTransactions_deadlock.txt, log.txt > > > See attached log -- This message was sent by Atlassian JIRA (v6.4.14#64029)