Hi,

I found that the source code [1] in kafka showed that it always check if
`newPartitionsInTransaction`
is empty before calling `enqueueRequest(addPartitionsToTransactionHandler())`,
that is not
applied to flink kafka producer code [2].

I wrote a simple producer with the `flushNewPartitions` copied from flink
kafka producer, and
successfully reproduce this exception. Then, I modified the logic in
`enqueueNewPartitions` to check
if there is any `newPartitionsInTransaction` before make this request. And
this would work well even
if I restarted the broker who owned this transaction's coordinator, since
the empty transaction won't
make any request to server.

The attachments are my simple producer code. Please help to verify what I
thought is correct. Thanks.

Best,
Tony Wei

[1]
https://github.com/apache/kafka/blob/c0019e653891182d7a95464175c9b4ef63f8bae1/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L316
[2]
https://github.com/apache/flink/blob/09f96b339f4890d7a44ae92c915ea8c0f6f244cb/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java#L273

Tony Wei <tony19920...@gmail.com> 於 2019年9月20日 週五 上午11:56寫道:

> Hi,
>
> Trying to dig out why `Error.NOT_COORDINATOR` happened in broker, I opened
> flink's log level to DEBUG for producer. And I found some logs from flink
> side
> regarding this error. Below is some log snippet.
>
> It seems that producer client didn't catch this error and retry to find
> new coordinator.
> This caused the transaction state is inconsistent between client side and
> server side.
> Would it be possible that the problem is caused
> by FlinkKafkaInternalProducer using
> java reflection to send `addPartitionsToTransactionHandler` request in
> `FlinkKafkaInternalProducer#flushNewPartitions`? Is there any expert who
> is familiar
> with both kafka and flink's kafka connector could help me solve this?
> Thanks very much.
>
> The attachment is my code to reproduce this problem.
> The cluster's versions are the same as I mentioned in my first email.
>
> Best,
> Tony Wei
>
> *flink taskmanager:*
>
>> 2019-09-20 02:32:45,927 INFO
>>  
>> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
>>  - Flushing new partitions
>> 2019-09-20 02:32:45,927 DEBUG
>> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer
>> clientId=producer-29, transactionalId=map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Enqueuing transactional request
>> (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1,
>> partitions=[])
>>
> 2019-09-20 02:32:45,931 DEBUG
>> org.apache.kafka.clients.producer.internals.Sender            - [Producer
>> clientId=producer-29, transactionalId=map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Sending transactional request
>> (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1,
>> partitions=[]) to node *kafka-broker-1:9092* (id: 1 rack: null)
>> 2019-09-20 02:32:45,931 DEBUG org.apache.kafka.clients.NetworkClient
>>                    - [Producer clientId=producer-29, transactionalId=map ->
>> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Using older server API v0 to
>> send ADD_PARTITIONS_TO_TXN {transactional_id=map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3,producer_id=1008,producer_epoch=1,topics=[]}
>> with correlation id 12 to node 1
>> 2019-09-20 02:32:45,937 DEBUG
>> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer
>> clientId=producer-29, transactionalId=map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Successfully added partitions []
>> to transaction
>
>
> *kafka-broker-1:*
>
>>  [2019-09-20 02:31:46,182] INFO [TransactionCoordinator id=1] Initialized
>> transactionalId map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3 with
>> producerId 1008 and producer epoch 1 on partition __transaction_state-37
>> (kafka.coordinator.transaction.TransactionCoordinator)
>
> [2019-09-20 02:32:45,962] DEBUG [TransactionCoordinator id=1] Returning
>> NOT_COORDINATOR error code to client for map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3's AddPartitions request
>> (kafka.coordinator.transaction.TransactionCoordinator)
>> [2019-09-20 02:32:46,453] DEBUG [TransactionCoordinator id=1] Aborting
>> append of COMMIT to transaction log with coordinator and returning
>> NOT_COORDINATOR error to client for map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3's EndTransaction request
>> (kafka.coordinator.transaction.TransactionCoordinator)
>
>
>
>
> Tony Wei <tony19920...@gmail.com> 於 2019年9月19日 週四 下午6:25寫道:
>
>> Hi Becket,
>>
>> I found that those transactions were tend to be failed
>> with InvalidTxnStateException if
>> they never sent any records but committed after some brokers being
>> restarted.
>>
>> Because the error state transition always failed from EMPTY to COMMIT, I
>> run a
>> job with only one parallelism with or without output to Kafka. I tried to
>> restart brokers
>> and see what happened on these two situations and found that I couldn't
>> make job failed
>> when job continuously emitted output to Kafka, but it could fail when it
>> didn't send any
>> output to Kafka.
>>
>> I'm not familiar with FlinkKafkaProducer's behavior. I tried to use kafka
>> java producer
>> to reproduce the exception, but it worked well. Maybe my observation is
>> not correct,
>> but the experiment result seems like that. Do you have any thoughts on
>> this?
>>
>> Best,
>> Tony Wei
>>
>> Tony Wei <tony19920...@gmail.com> 於 2019年9月19日 週四 上午11:08寫道:
>>
>>> Hi Becket,
>>>
>>> One more thing, I have tried to restart other brokers without active
>>> controller, but
>>> this exception might happen as well. So it should be independent  of the
>>> active
>>> controller like you said.
>>>
>>> Best,
>>> Tony Wei
>>>
>>> Tony Wei <tony19920...@gmail.com> 於 2019年9月18日 週三 下午6:14寫道:
>>>
>>>> Hi Becket,
>>>>
>>>> I have reproduced this problem in our development environment. Below is
>>>> the log message with debug level.
>>>> Seems that the exception was from broker-3, and I also found other
>>>> error code in broker-2 during the time.
>>>>
>>>> There are others INVALID_TXN_STATE error for other transaction id. I
>>>> just list one of them. Above log messages only
>>>> shows message with
>>>> `kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's` substring before
>>>> `2019-09-18 07:14`.
>>>>
>>>> I didn't see other information to find out why producer tried to make
>>>> transaction state from EMPTY to COMMIT, and what
>>>> made NOT_COORDINATOR happened. Do you have any thought about what's
>>>> happening? Thanks.
>>>>
>>>> *Number of Kafka brokers: 3*
>>>> *logging config for kafka:*
>>>>
>>>>> log4j.appender.transactionAppender=org.apache.log4j.RollingFileAppender
>>>>>
>>>>> log4j.appender.transactionAppender.File=${kafka.logs.dir}/kafka-transaction.log
>>>>>
>>>>> log4j.appender.transactionAppender.layout=org.apache.log4j.PatternLayout
>>>>> log4j.appender.transactionAppender.layout.ConversionPattern=[%d] %p %m
>>>>> (%c)%n
>>>>> log4j.appender.transactionAppender.MaxFileSize=10MB
>>>>> log4j.appender.transactionAppender.MaxBackupIndex=10
>>>>> log4j.logger.kafka.coordinator.transaction=DEBUG, transactionAppender
>>>>> log4j.additivity.kafka.coordinator.transaction=true
>>>>>
>>>>
>>>>
>>>> *flink-ui*
>>>>>
>>>>> Timestamp: 2019-09-18, 07:13:43
>>>>>
>>>>
>>>>
>>>> java.lang.RuntimeException: Error while confirming checkpoint
>>>>>     at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218)
>>>>>     at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one
>>>>> of transactions failed, logging first encountered failure
>>>>>     at
>>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
>>>>>     at
>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
>>>>>     at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213)
>>>>>     ... 5 more
>>>>> Caused by: org.apache.kafka.common.errors.InvalidTxnStateException:
>>>>> The producer attempted a transactional operation in an invalid state
>>>>>
>>>>
>>>>
>>>> *broker-3*
>>>>>
>>>>> [2019-09-18 07:13:43,768] DEBUG [TransactionCoordinator id=3]
>>>>> TransactionalId: blacklist -> Sink:
>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but
>>>>> received transaction marker result to send: COMMIT
>>>>> (kafka.coordinator.transaction.TransactionCoordinator)
>>>>> [2019-09-18 07:13:43,769] DEBUG [TransactionCoordinator id=3] Aborting
>>>>> append of COMMIT to transaction log with coordinator and returning
>>>>> INVALID_TXN_STATE error to client for blacklist -> Sink:
>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction 
>>>>> request
>>>>> (kafka.coordinator.transaction.TransactionCoordinator)
>>>>> [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3]
>>>>> TransactionalId: blacklist -> Sink:
>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's state is Empty, but
>>>>> received transaction marker result to send: COMMIT
>>>>> (kafka.coordinator.transaction.TransactionCoordinator)
>>>>> [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] Aborting
>>>>> append of COMMIT to transaction log with coordinator and returning
>>>>> INVALID_TXN_STATE error to client for blacklist -> Sink:
>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction 
>>>>> request
>>>>> (kafka.coordinator.transaction.TransactionCoordinator)
>>>>> [2019-09-18 07:13:46,840] DEBUG [Transaction State Manager 3]:
>>>>> Updating blacklist -> Sink:
>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to
>>>>> TxnTransitMetadata(producerId=7019, producerEpoch=4, txnTimeoutMs=5400000,
>>>>> txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1,
>>>>> txnLastUpdateTimestamp=1568790826831) with coordinator epoch 4 for
>>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7
>>>>> succeeded (kafka.coordinator.transaction.TransactionStateManager)
>>>>>
>>>>
>>>> *broker-2*
>>>>
>>>>> [2019-09-18 06:45:26,324] DEBUG [Transaction State Manager 2]:
>>>>> Updating blacklist -> Sink:
>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to
>>>>> TxnTransitMetadata(producerId=7019, produc
>>>>> erEpoch=0, txnTimeoutMs=5400000, txnState=Empty,
>>>>> topicPartitions=Set(), txnStartTimestamp=-1,
>>>>> txnLastUpdateTimestamp=1568789126318) with coordinator epoch 0 for
>>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e6
>>>>> 0de7e4744f3307058f865-7 succeeded
>>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2019-09-18 06:54:27,981] DEBUG [Transaction State Manager 2]:
>>>>> Updating blacklist -> Sink:
>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to
>>>>> TxnTransitMetadata(producerId=7019, producerEpoch=1, txnTimeoutMs=5400000,
>>>>> txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1,
>>>>> txnLastUpdateTimestamp=1568789667979) with coordinator epoch 0 for
>>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7
>>>>> succeeded (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2019-09-18 07:06:25,419] DEBUG [Transaction State Manager 2]:
>>>>> Updating blacklist -> Sink:
>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to
>>>>> TxnTransitMetadata(producerId=7019, producerEpoch=2, txnTimeoutMs=5400000,
>>>>> txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1,
>>>>> txnLastUpdateTimestamp=1568790385417) with coordinator epoch 0 for
>>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7
>>>>> succeeded (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2019-09-18 07:11:42,981] DEBUG [Transaction State Manager 2]:
>>>>> Updating blacklist -> Sink:
>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's transaction state to
>>>>> TxnTransitMetadata(producerId=7019, producerEpoch=3, txnTimeoutMs=5400000,
>>>>> txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1,
>>>>> txnLastUpdateTimestamp=1568790702969) with coordinator epoch 0 for
>>>>> blacklist -> Sink: kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7
>>>>> succeeded (kafka.coordinator.transaction.TransactionStateManager)
>>>>> [2019-09-18 07:13:42,779] DEBUG [TransactionCoordinator id=2]
>>>>> Returning NOT_COORDINATOR error code to client for blacklist -> Sink:
>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's AddPartitions request
>>>>> (kafka.coordinator.transaction.TransactionCoordinator)
>>>>> [2019-09-18 07:13:43,633] DEBUG [TransactionCoordinator id=2] Aborting
>>>>> append of COMMIT to transaction log with coordinator and returning
>>>>> NOT_COORDINATOR error to client for blacklist -> Sink:
>>>>> kafka-sink-xxxx-eba862242e60de7e4744f3307058f865-7's EndTransaction 
>>>>> request
>>>>> (kafka.coordinator.transaction.TransactionCoordinator)
>>>>>
>>>>
>>>> Best,
>>>> Tony Wei
>>>>
>>>>
>>>> Becket Qin <becket....@gmail.com> 於 2019年9月2日 週一 下午10:03寫道:
>>>>
>>>>> Hi Tony,
>>>>>
>>>>> From the symptom it is not quite clear to me what may cause this
>>>>> issue. Supposedly the TransactionCoordinator is independent of the active
>>>>> controller, so bouncing the active controller should not have special
>>>>> impact on the transactions (at least not every time). If this is stably
>>>>> reproducible, is it possible to turn on debug level logging
>>>>> on kafka.coordinator.transaction.TransactionCoordinator to see what does
>>>>> the broker say?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jiangjie (Becket) Qin
>>>>>
>>>>> On Thu, Aug 29, 2019 at 3:55 PM Tony Wei <tony19920...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Has anyone run into the same problem? I have updated my producer
>>>>>> transaction timeout to 1.5 hours,
>>>>>> but the problem sill happened when I restarted broker with active
>>>>>> controller. It might not due to the
>>>>>> problem that checkpoint duration is too long causing transaction
>>>>>> timeout. I had no more clue to find out
>>>>>> what's wrong about my kafka producer. Could someone help me please?
>>>>>>
>>>>>> Best,
>>>>>> Tony Wei
>>>>>>
>>>>>> Fabian Hueske <fhue...@gmail.com> 於 2019年8月16日 週五 下午4:10寫道:
>>>>>>
>>>>>>> Hi Tony,
>>>>>>>
>>>>>>> I'm sorry I cannot help you with this issue, but Becket (in CC)
>>>>>>> might have an idea what went wrong here.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <
>>>>>>> tony19920...@gmail.com>:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Currently, I was trying to update our kafka cluster with larger `
>>>>>>>> transaction.max.timeout.ms`. The
>>>>>>>> original setting is kafka's default value (i.e. 15 minutes) and I
>>>>>>>> tried to set as 3 hours.
>>>>>>>>
>>>>>>>> When I was doing rolling-restart for my brokers, this exception
>>>>>>>> came to me on the next checkpoint
>>>>>>>> after I restarted the broker with active controller.
>>>>>>>>
>>>>>>>> java.lang.RuntimeException: Error while confirming checkpoint at
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at
>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>>>  at
>>>>>>>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>>> at java.lang.Thread.run(Thread.java:748) Caused by:
>>>>>>>>> org.apache.flink.util.FlinkRuntimeException: Committing one of 
>>>>>>>>> transactions
>>>>>>>>> failed, logging first encountered failure at
>>>>>>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) 
>>>>>>>>> ... 5
>>>>>>>>> more Caused by: 
>>>>>>>>> org.apache.kafka.common.errors.InvalidTxnStateException:
>>>>>>>>> The producer attempted a transactional operation in an invalid state
>>>>>>>>
>>>>>>>>
>>>>>>>> I have no idea why it happened, and I didn't find any error log
>>>>>>>> from brokers. Does anyone have
>>>>>>>> this exception before? How can I prevent from this exception when I
>>>>>>>> tried to restart kafka cluster?
>>>>>>>> Does this exception mean that I will lost data in some of these
>>>>>>>> transactions?
>>>>>>>>
>>>>>>>> flink cluster version: 1.8.1
>>>>>>>> kafka cluster version: 1.0.1
>>>>>>>> flink kafka producer version: universal
>>>>>>>> producer transaction timeout: 15 minutes
>>>>>>>> checkpoint interval: 5 minutes
>>>>>>>> number of concurrent checkpoint: 1
>>>>>>>> max checkpoint duration before and after the exception occurred:  <
>>>>>>>> 2 minutes
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Tony Wei
>>>>>>>>
>>>>>>>

Attachment: Util.java
Description: Binary data

Attachment: Main.scala
Description: Binary data

Reply via email to