[ https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224577#comment-16224577 ]
ASF GitHub Bot commented on FLINK-7838: --------------------------------------- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4915#discussion_r147656743 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java --- @@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() { private void flushNewPartitions() { LOG.info("Flushing new partitions"); + enqueueNewPartitions().await(); + } + + private TransactionalRequestResult enqueueNewPartitions() { Object transactionManager = getValue(kafkaProducer, "transactionManager"); - Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); - invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler}); - TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result"); - Object sender = getValue(kafkaProducer, "sender"); - invoke(sender, "wakeup"); - result.await(); + synchronized (transactionManager) { + Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); --- End diff -- Not related to your changes and can be done in a separate PR but I think we should add this check from the original sources to *"mitigate"* KAFKA-6119: ``` if (!newPartitionsInTransaction.isEmpty()) enqueueRequest(addPartitionsToTransactionHandler()); ``` > Kafka011ProducerExactlyOnceITCase do not finish > ----------------------------------------------- > > Key: FLINK-7838 > URL: https://issues.apache.org/jira/browse/FLINK-7838 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.4.0 > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > Priority: Blocker > Fix For: 1.4.0 > > Attachments: initTransactions_deadlock.txt, log.txt > > > See attached log -- This message was sent by Atlassian JIRA (v6.4.14#64029)