[ 
https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16945485#comment-16945485
 ] 

Jiangjie Qin commented on FLINK-14302:
--------------------------------------

[~tonywei] Really sorry for the late response. I was on a business trip and 
just saw this ping. Thanks for keeping debugging this. The current way Flink 
constructs the KafkaProducer on recovery is indeed a little flaky. I am not 
sure if the NOT_COORDINATOR error code was caused by empty partition list in 
AddPartitionsToTxnRequest.

More specifically, I am not sure why the following log was printed. 
 
*kafka-broker-1:*
{quote} [2019-09-20 02:31:46,182] INFO [TransactionCoordinator id=1] 
Initialized transactionalId map -> Sink: 
sink-2e588ce1c86a9d46e2e85186773ce4fd-3 with producerId 1008 and producer epoch 
1 on partition __transaction_state-37 
(kafka.coordinator.transaction.TransactionCoordinator){quote}
{quote}*[2019-09-20 02:32:45,962] DEBUG [TransactionCoordinator id=1] Returning 
NOT_COORDINATOR error code to client for map -> Sink: 
sink-2e588ce1c86a9d46e2e85186773ce4fd-3's AddPartitions request 
(kafka.coordinator.transaction.TransactionCoordinator)*
[2019-09-20 02:32:46,453] DEBUG [TransactionCoordinator id=1] Aborting append 
of COMMIT to transaction log with coordinator and returning NOT_COORDINATOR 
error to client for map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3's 
EndTransaction request 
(kafka.coordinator.transaction.TransactionCoordinator){quote}
Can you double check the version of the Kafka broker? I'd like to reproduce the 
problem if possible.

Thanks.

> FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if 
> `newPartitionsInTransaction` is empty when enable EoS
> -----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14302
>                 URL: https://issues.apache.org/jira/browse/FLINK-14302
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: Wei-Che Wei
>            Assignee: Wei-Che Wei
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> As the survey in this mailing list thread [1], kafka server will bind the 
> error with topic-partition list when it handles `AddPartitionToTxnRequest`. 
> So when the request body contains no topic-partition, the error won't be sent 
> back to kafka producer client. Moreover, it producer internal api, it always 
> check if `newPartitionsInTransaction` is empty before sending 
> ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if 
> you need to explicitly call it in the first commit phase of two-phase commit 
> sink.
> [1] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to