Hello,

I have a Flink 1.20 streaming app that ingests from one Kafka topic, does some 
work and then submits to another Kafka topic. I’m using AWS MSK.

Recently, we switched to exactly-once semantic and as part of that migration we 
set transaction prefix ID to the application name (which is stable between 
deploys). The issue we’re experiencing is that sometimes, on redeploys, the new 
app (or, more specifically, the sink part) gets stuck in the initializing 
state. That does seem to resolve given enough time (hours, but I don’t have the 
exact number), but that’s obviously not a desirable behavior. However, changing 
transaction id prefix to something else fixes the problem.

I enabled debug logging and here’s what I see on task managers:

21:12:42,201 DEBUG org.apache.kafka.clients.producer.internals.Sender           
 - [Producer clientId=producer-service_app.event_bridge-0-1, 
transactionalId=service_app.event_bridge-0-1] Sending transactional request 
FindCoordinatorRequestData(key='service_app.event_bridge-0-14982', keyType=1, 
coordinatorKeys=[]) to node 
b-2.kafka-cluster.example.c11.kafka.region.amazonaws.com:9094 (id: 2 rack: 
az-6) with correlation ID 44825
21:12:42,201 DEBUG org.apache.kafka.clients.NetworkClient                       
 - [Producer clientId=producer-service_app.event_bridge-0-1, 
transactionalId=service_app.event_bridge-0-1] Sending FIND_COORDINATOR request 
with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, 
clientId=producer-service_app.event_bridge-0-1, correlationId=44825, 
headerVersion=2) and timeout 30000 to node 2: 
FindCoordinatorRequestData(key='', keyType=1, 
coordinatorKeys=[service_app.event_bridge-0-14982])
21:12:42,202 DEBUG org.apache.kafka.clients.NetworkClient                       
 - [Producer clientId=producer-service_app.event_bridge-0-1, 
transactionalId=service_app.event_bridge-0-1] Received FIND_COORDINATOR 
response from node 2 for request with header 
RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, 
clientId=producer-service_app.event_bridge-0-1, correlationId=44825, 
headerVersion=2): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, 
errorMessage='', nodeId=0, host='', port=0, 
coordinators=[Coordinator(key='service_app.event_bridge-0-14982', nodeId=2, 
host='b-2.kafka-cluster.example.c11.kafka.region.amazonaws.com', port=9094, 
errorCode=0, errorMessage='')])
21:12:42,242 DEBUG org.apache.kafka.clients.NetworkClient                       
 - [Consumer clientId=service_app.event_bridge-0, 
groupId=service_app.event_bridge] Received FETCH response from node 2 for 
request with header RequestHeader(apiKey=FETCH, apiVersion=15, 
clientId=service_app.event_bridge-0, correlationId=3108, headerVersion=2): 
FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=1756029163, 
responses=[])
21:12:42,242 DEBUG org.apache.kafka.clients.FetchSessionHandler                 
 - [Consumer clientId=service_app.event_bridge-0, 
groupId=service_app.event_bridge] Node 2 sent an incremental fetch response 
with throttleTimeMs = 0 for session 1756029163 with 0 response partition(s), 1 
implied partition(s)
21:12:42,242 DEBUG org.apache.kafka.clients.consumer.internals.AbstractFetch    
 - [Consumer clientId=service_app.event_bridge-0, 
groupId=service_app.event_bridge] Removing pending request for node 
b-2.kafka-cluster.example.c6.kafka.region.amazonaws.com:9094 (id: 2 rack: az-2)
21:12:42,242 DEBUG org.apache.kafka.clients.consumer.internals.AbstractFetch    
 - [Consumer clientId=service_app.event_bridge-0, 
groupId=service_app.event_bridge] Added READ_COMMITTED fetch request for 
partition app_domain.event_type.v1-0 at position FetchPosition{offset=4730884, 
offsetEpoch=Optional[198], 
currentLeader=LeaderAndEpoch{leader=Optional[b-2.kafka-cluster.example.c6.kafka.region.amazonaws.com:9094
 (id: 2 rack: az-2)], epoch=198}} to node 
b-2.kafka-cluster.example.c6.kafka.region.amazonaws.com:9094 (id: 2 rack: az-2)
21:12:42,242 DEBUG org.apache.kafka.clients.FetchSessionHandler                 
 - [Consumer clientId=service_app.event_bridge-0, 
groupId=service_app.event_bridge] Built incremental fetch 
(sessionId=1756029163, epoch=3098) for node 2. Added 0 partition(s), altered 0 
partition(s), removed 0 partition(s), replaced 0 partition(s) out of 1 
partition(s)
21:12:42,242 DEBUG org.apache.kafka.clients.consumer.internals.AbstractFetch    
 - [Consumer clientId=service_app.event_bridge-0, 
groupId=service_app.event_bridge] Sending READ_COMMITTED 
IncrementalFetchRequest(toSend=(), toForget=(), toReplace=(), 
implied=(app_domain.event_type.v1-0), canUseTopicIds=True) to broker 
b-2.kafka-cluster.example.c6.kafka.region.amazonaws.com:9094 (id: 2 rack: az-2)
21:12:42,242 DEBUG org.apache.kafka.clients.consumer.internals.AbstractFetch    
 - [Consumer clientId=service_app.event_bridge-0, 
groupId=service_app.event_bridge] Adding pending request for node 
b-2.kafka-cluster.example.c6.kafka.region.amazonaws.com:9094 (id: 2 rack: az-2)
21:12:42,242 DEBUG org.apache.kafka.clients.NetworkClient                       
 - [Consumer clientId=service_app.event_bridge-0, 
groupId=service_app.event_bridge] Sending FETCH request with header 
RequestHeader(apiKey=FETCH, apiVersion=15, clientId=service_app.event_bridge-0, 
correlationId=3109, headerVersion=2) and timeout 30000 to node 2: 
FetchRequestData(clusterId=null, replicaId=-1, 
replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, 
minBytes=1, maxBytes=52428800, isolationLevel=1, sessionId=1756029163, 
sessionEpoch=3098, topics=[], forgottenTopicsData=[], rackId='')
21:12:42,302 DEBUG org.apache.kafka.clients.producer.internals.Sender           
 - [Producer clientId=producer-service_app.event_bridge-0-1, 
transactionalId=service_app.event_bridge-0-1] Sending transactional request 
InitProducerIdRequestData(transactionalId='service_app.event_bridge-0-14982', 
transactionTimeoutMs=3600000, producerId=-1, producerEpoch=-1) to node 
b-2.kafka-cluster.example.c11.kafka.region.amazonaws.com:9094 (id: 2 rack: 
null) with correlation ID 44826
21:12:42,302 DEBUG org.apache.kafka.clients.NetworkClient                       
 - [Producer clientId=producer-service_app.event_bridge-0-1, 
transactionalId=service_app.event_bridge-0-1] Sending INIT_PRODUCER_ID request 
with header RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
clientId=producer-service_app.event_bridge-0-1, correlationId=44826, 
headerVersion=2) and timeout 30000 to node 2: 
InitProducerIdRequestData(transactionalId='service_app.event_bridge-0-14982', 
transactionTimeoutMs=3600000, producerId=-1, producerEpoch=-1)
21:12:42,304 DEBUG org.apache.kafka.clients.NetworkClient                       
 - [Producer clientId=producer-service_app.event_bridge-0-1, 
transactionalId=service_app.event_bridge-0-1] Received INIT_PRODUCER_ID 
response from node 2 for request with header 
RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
clientId=producer-service_app.event_bridge-0-1, correlationId=44826, 
headerVersion=2): InitProducerIdResponseData(throttleTimeMs=0, errorCode=0, 
producerId=13234488, producerEpoch=14)
21:12:42,304 DEBUG 
org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer  - Change 
transaction id from service_app.event_bridge-0-14982 to 
service_app.event_bridge-0-14983
21:12:42,304 DEBUG org.apache.kafka.clients.producer.internals.Sender           
 - [Producer clientId=producer-service_app.event_bridge-0-1, 
transactionalId=service_app.event_bridge-0-1] Sending transactional request 
InitProducerIdRequestData(transactionalId='service_app.event_bridge-0-14983', 
transactionTimeoutMs=3600000, producerId=-1, producerEpoch=-1) to node 
b-2.kafka-cluster.example.c11.kafka.region.amazonaws.com:9094 (id: 2 rack: 
null) with correlation ID 44827


This looks like some sort of a transaction state conflict, but I’m not totally 
sure what’s going on. I checked for hanging transactions, but found none.
Any help would be greatly appreciated!

Best,
Konstantin

Reply via email to