Hi, I found that the source code [1] in kafka showed that it always check if `newPartitionsInTransaction` is empty before calling `enqueueRequest(addPartitionsToTransactionHandler())`, that is not applied to flink kafka producer code [2].
I wrote a simple producer with the `flushNewPartitions` copied from flink kafka producer, and successfully reproduce this exception. Then, I modified the logic in `enqueueNewPartitions` to check if there is any `newPartitionsInTransaction` before make this request. And this would work well even if I restarted the broker who owned this transaction's coordinator, since the empty transaction won't make any request to server. The attachments are my simple producer code. Please help to verify what I thought is correct. Thanks. Best, Tony Wei [1] https://github.com/apache/kafka/blob/c0019e653891182d7a95464175c9b4ef63f8bae1/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L316 [2] https://github.com/apache/flink/blob/09f96b339f4890d7a44ae92c915ea8c0f6f244cb/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java#L273 Tony Wei <tony19920...@gmail.com> 於 2019年9月20日 週五 上午11:56寫道: > Hi, > > Trying to dig out why `Error.NOT_COORDINATOR` happened in broker, I opened > flink's log level to DEBUG for producer. And I found some logs from flink > side > regarding this error. Below is some log snippet. > > It seems that producer client didn't catch this error and retry to find > new coordinator. > This caused the transaction state is inconsistent between client side and > server side. > Would it be possible that the problem is caused > by FlinkKafkaInternalProducer using > java reflection to send `addPartitionsToTransactionHandler` request in > `FlinkKafkaInternalProducer#flushNewPartitions`? Is there any expert who > is familiar > with both kafka and flink's kafka connector could help me solve this? > Thanks very much. > > The attachment is my code to reproduce this problem. > The cluster's versions are the same as I mentioned in my first email. > > Best, > Tony Wei > > *flink taskmanager:* > >> 2019-09-20 02:32:45,927 INFO >> >> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer >> - Flushing new partitions >> 2019-09-20 02:32:45,927 DEBUG >> org.apache.kafka.clients.producer.internals.TransactionManager - [Producer >> clientId=producer-29, transactionalId=map -> Sink: >> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Enqueuing transactional request >> (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink: >> sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1, >> partitions=[]) >> > 2019-09-20 02:32:45,931 DEBUG >> org.apache.kafka.clients.producer.internals.Sender - [Producer >> clientId=producer-29, transactionalId=map -> Sink: >> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Sending transactional request >> (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink: >> sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1, >> partitions=[]) to node *kafka-broker-1:9092* (id: 1 rack: null) >> 2019-09-20 02:32:45,931 DEBUG org.apache.kafka.clients.NetworkClient >> - [Producer clientId=producer-29, transactionalId=map -> >> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Using older server API v0 to >> send ADD_PARTITIONS_TO_TXN {transactional_id=map -> Sink: >> sink-2e588ce1c86a9d46e2e85186773ce4fd-3,producer_id=1008,producer_epoch=1,topics=[]} >> with correlation id 12 to node 1 >> 2019-09-20 02:32:45,937 DEBUG >> org.apache.kafka.clients.producer.internals.TransactionManager - [Producer >> clientId=producer-29, transactionalId=map -> Sink: >> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Successfully added partitions [] >> to transaction > > > *kafka-broker-1:* > >> [2019-09-20 02:31:46,182] INFO [TransactionCoordinator id=1] Initialized >> transactionalId map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3 with >> producerId 1008 and producer epoch 1 on partition __transaction_state-37 >> (kafka.coordinator.transaction.TransactionCoordinator) > > [2019-09-20 02:32:45,962] DEBUG [TransactionCoordinator id=1] Returning >> NOT_COORDINATOR error code to client for map -> Sink: >> sink-2e588ce1c86a9d46e2e85186773ce4fd-3's AddPartitions request >> (kafka.coordinator.transaction.TransactionCoordinator) >> [2019-09-20 02:32:46,453] DEBUG [TransactionCoordinator id=1] Aborting >> append of COMMIT to transaction log with coordinator and returning >> NOT_COORDINATOR error to client for map -> Sink: >> sink-2e588ce1c86a9d46e2e85186773ce4fd-3's EndTransaction request >> (kafka.coordinator.transaction.TransactionCoordinator) > > > > > Tony Wei <tony19920...@gmail.com> 於 2019年9月19日 週四 下午6:25寫道: > >> Hi Becket, >> >> I found that those transactions were tend to be failed >> with InvalidTxnStateException if >> they never sent any records but committed after some brokers being >> restarted. >> >> Because the error state transition always failed from EMPTY to COMMIT, I >> run a >> job with only one parallelism with or without output to Kafka. I tried to >> restart brokers >> and see what happened on these two situations and found that I couldn't >> make job failed >> when job continuously emitted output to Kafka, but it could fail when it >> didn't send any >> output to Kafka. >> >> I'm not familiar with FlinkKafkaProducer's behavior. I tried to use kafka >> java producer >> to reproduce the exception, but it worked well. Maybe my observation is >> not correct, >> but the experiment result seems like that. Do you have any thoughts on >> this? >> >> Best, >> Tony Wei >> >> Tony Wei <tony19920...@gmail.com> 於 2019年9月19日 週四 上午11:08寫道: >> >>> Hi Becket, >>> >>> One more thing, I have tried to restart other brokers without active >>> controller, but >>> this exception might happen as well. So it should be independent of the >>> active >>> controller like you said. >>> >>> Best, >>> Tony Wei >>> >>> Tony Wei <tony19920...@gmail.com> 於 2019年9月18日 週三 下午6:14寫道: >>> >>>> Hi Becket, >>>> >>>> I have reproduced this problem in our development environment. Below is >>>> the log message with debug level. >>>> Seems that the exception was from broker-3, and I also found other >>>> error code in broker-2 during the time. >>>> >>>> There are others INVALID_TXN_STATE error for other transaction id. I >>>> just list one of them. Above log messages only >>>> shows message with >>>> `kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's` substring before >>>> `2019-09-18 07:14`. >>>> >>>> I didn't see other information to find out why producer tried to make >>>> transaction state from EMPTY to COMMIT, and what >>>> made NOT_COORDINATOR happened. Do you have any thought about what's >>>> happening? Thanks. >>>> >>>> *Number of Kafka brokers: 3* >>>> *logging config for kafka:* >>>> >>>>> log4j.appender.transactionAppender=org.apache.log4j.RollingFileAppender >>>>> >>>>> log4j.appender.transactionAppender.File=${kafka.logs.dir}/kafka-transaction.log >>>>> >>>>> log4j.appender.transactionAppender.layout=org.apache.log4j.PatternLayout >>>>> log4j.appender.transactionAppender.layout.ConversionPattern=[%d] %p %m >>>>> (%c)%n >>>>> log4j.appender.transactionAppender.MaxFileSize=10MB >>>>> log4j.appender.transactionAppender.MaxBackupIndex=10 >>>>> log4j.logger.kafka.coordinator.transaction=DEBUG, transactionAppender >>>>> log4j.additivity.kafka.coordinator.transaction=true >>>>> >>>> >>>> >>>> *flink-ui* >>>>> >>>>> Timestamp: 2019-09-18, 07:13:43 >>>>> >>>> >>>> >>>> java.lang.RuntimeException: Error while confirming checkpoint >>>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) >>>>> at >>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one >>>>> of transactions failed, logging first encountered failure >>>>> at >>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) >>>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) >>>>> ... 5 more >>>>> Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: >>>>> The producer attempted a transactional operation in an invalid state >>>>> >>>> >>>> >>>> *broker-3* >>>>> >>>>> [2019-09-18 07:13:43,768] DEBUG [TransactionCoordinator id=3] >>>>> TransactionalId: blacklist -> Sink: >>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but >>>>> received transaction marker result to send: COMMIT >>>>> (kafka.coordinator.transaction.TransactionCoordinator) >>>>> [2019-09-18 07:13:43,769] DEBUG [TransactionCoordinator id=3] Aborting >>>>> append of COMMIT to transaction log with coordinator and returning >>>>> INVALID_TXN_STATE error to client for blacklist -> Sink: >>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction >>>>> request >>>>> (kafka.coordinator.transaction.TransactionCoordinator) >>>>> [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] >>>>> TransactionalId: blacklist -> Sink: >>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but >>>>> received transaction marker result to send: COMMIT >>>>> (kafka.coordinator.transaction.TransactionCoordinator) >>>>> [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] Aborting >>>>> append of COMMIT to transaction log with coordinator and returning >>>>> INVALID_TXN_STATE error to client for blacklist -> Sink: >>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction >>>>> request >>>>> (kafka.coordinator.transaction.TransactionCoordinator) >>>>> [2019-09-18 07:13:46,840] DEBUG [Transaction State Manager 3]: >>>>> Updating blacklist -> Sink: >>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to >>>>> TxnTransitMetadata(producerId=7019, producerEpoch=4, txnTimeoutMs=5400000, >>>>> txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, >>>>> txnLastUpdateTimestamp=1568790826831) with coordinator epoch 4 for >>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 >>>>> succeeded (kafka.coordinator.transaction.TransactionStateManager) >>>>> >>>> >>>> *broker-2* >>>> >>>>> [2019-09-18 06:45:26,324] DEBUG [Transaction State Manager 2]: >>>>> Updating blacklist -> Sink: >>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to >>>>> TxnTransitMetadata(producerId=7019, produc >>>>> erEpoch=0, txnTimeoutMs=5400000, txnState=Empty, >>>>> topicPartitions=Set(), txnStartTimestamp=-1, >>>>> txnLastUpdateTimestamp=1568789126318) with coordinator epoch 0 for >>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e6 >>>>> 0de7e4744f3307058f865-7 succeeded >>>>> (kafka.coordinator.transaction.TransactionStateManager) >>>>> [2019-09-18 06:54:27,981] DEBUG [Transaction State Manager 2]: >>>>> Updating blacklist -> Sink: >>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to >>>>> TxnTransitMetadata(producerId=7019, producerEpoch=1, txnTimeoutMs=5400000, >>>>> txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, >>>>> txnLastUpdateTimestamp=1568789667979) with coordinator epoch 0 for >>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 >>>>> succeeded (kafka.coordinator.transaction.TransactionStateManager) >>>>> [2019-09-18 07:06:25,419] DEBUG [Transaction State Manager 2]: >>>>> Updating blacklist -> Sink: >>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to >>>>> TxnTransitMetadata(producerId=7019, producerEpoch=2, txnTimeoutMs=5400000, >>>>> txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, >>>>> txnLastUpdateTimestamp=1568790385417) with coordinator epoch 0 for >>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 >>>>> succeeded (kafka.coordinator.transaction.TransactionStateManager) >>>>> [2019-09-18 07:11:42,981] DEBUG [Transaction State Manager 2]: >>>>> Updating blacklist -> Sink: >>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to >>>>> TxnTransitMetadata(producerId=7019, producerEpoch=3, txnTimeoutMs=5400000, >>>>> txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, >>>>> txnLastUpdateTimestamp=1568790702969) with coordinator epoch 0 for >>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7 >>>>> succeeded (kafka.coordinator.transaction.TransactionStateManager) >>>>> [2019-09-18 07:13:42,779] DEBUG [TransactionCoordinator id=2] >>>>> Returning NOT_COORDINATOR error code to client for blacklist -> Sink: >>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's AddPartitions request >>>>> (kafka.coordinator.transaction.TransactionCoordinator) >>>>> [2019-09-18 07:13:43,633] DEBUG [TransactionCoordinator id=2] Aborting >>>>> append of COMMIT to transaction log with coordinator and returning >>>>> NOT_COORDINATOR error to client for blacklist -> Sink: >>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction >>>>> request >>>>> (kafka.coordinator.transaction.TransactionCoordinator) >>>>> >>>> >>>> Best, >>>> Tony Wei >>>> >>>> >>>> Becket Qin <becket....@gmail.com> 於 2019年9月2日 週一 下午10:03寫道: >>>> >>>>> Hi Tony, >>>>> >>>>> From the symptom it is not quite clear to me what may cause this >>>>> issue. Supposedly the TransactionCoordinator is independent of the active >>>>> controller, so bouncing the active controller should not have special >>>>> impact on the transactions (at least not every time). If this is stably >>>>> reproducible, is it possible to turn on debug level logging >>>>> on kafka.coordinator.transaction.TransactionCoordinator to see what does >>>>> the broker say? >>>>> >>>>> Thanks, >>>>> >>>>> Jiangjie (Becket) Qin >>>>> >>>>> On Thu, Aug 29, 2019 at 3:55 PM Tony Wei <tony19920...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Has anyone run into the same problem? I have updated my producer >>>>>> transaction timeout to 1.5 hours, >>>>>> but the problem sill happened when I restarted broker with active >>>>>> controller. It might not due to the >>>>>> problem that checkpoint duration is too long causing transaction >>>>>> timeout. I had no more clue to find out >>>>>> what's wrong about my kafka producer. Could someone help me please? >>>>>> >>>>>> Best, >>>>>> Tony Wei >>>>>> >>>>>> Fabian Hueske <fhue...@gmail.com> 於 2019年8月16日 週五 下午4:10寫道: >>>>>> >>>>>>> Hi Tony, >>>>>>> >>>>>>> I'm sorry I cannot help you with this issue, but Becket (in CC) >>>>>>> might have an idea what went wrong here. >>>>>>> >>>>>>> Best, Fabian >>>>>>> >>>>>>> Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei < >>>>>>> tony19920...@gmail.com>: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Currently, I was trying to update our kafka cluster with larger ` >>>>>>>> transaction.max.timeout.ms`. The >>>>>>>> original setting is kafka's default value (i.e. 15 minutes) and I >>>>>>>> tried to set as 3 hours. >>>>>>>> >>>>>>>> When I was doing rolling-restart for my brokers, this exception >>>>>>>> came to me on the next checkpoint >>>>>>>> after I restarted the broker with active controller. >>>>>>>> >>>>>>>> java.lang.RuntimeException: Error while confirming checkpoint at >>>>>>>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at >>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>>>>>> at >>>>>>>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at >>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>>>>>> at >>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>>>>>> at java.lang.Thread.run(Thread.java:748) Caused by: >>>>>>>>> org.apache.flink.util.FlinkRuntimeException: Committing one of >>>>>>>>> transactions >>>>>>>>> failed, logging first encountered failure at >>>>>>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) >>>>>>>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) >>>>>>>>> ... 5 >>>>>>>>> more Caused by: >>>>>>>>> org.apache.kafka.common.errors.InvalidTxnStateException: >>>>>>>>> The producer attempted a transactional operation in an invalid state >>>>>>>> >>>>>>>> >>>>>>>> I have no idea why it happened, and I didn't find any error log >>>>>>>> from brokers. Does anyone have >>>>>>>> this exception before? How can I prevent from this exception when I >>>>>>>> tried to restart kafka cluster? >>>>>>>> Does this exception mean that I will lost data in some of these >>>>>>>> transactions? >>>>>>>> >>>>>>>> flink cluster version: 1.8.1 >>>>>>>> kafka cluster version: 1.0.1 >>>>>>>> flink kafka producer version: universal >>>>>>>> producer transaction timeout: 15 minutes >>>>>>>> checkpoint interval: 5 minutes >>>>>>>> number of concurrent checkpoint: 1 >>>>>>>> max checkpoint duration before and after the exception occurred: < >>>>>>>> 2 minutes >>>>>>>> >>>>>>>> Best, >>>>>>>> Tony Wei >>>>>>>> >>>>>>>
Util.java
Description: Binary data
Main.scala
Description: Binary data