Hello,
I have a Flink 1.20 streaming app that ingests from one Kafka topic, does some
work and then submits to another Kafka topic. I’m using AWS MSK.
Recently, we switched to exactly-once semantic and as part of that migration we
set transaction prefix ID to the application name (which is stable between
deploys). The issue we’re experiencing is that sometimes, on redeploys, the new
app (or, more specifically, the sink part) gets stuck in the initializing
state. That does seem to resolve given enough time (hours, but I don’t have the
exact number), but that’s obviously not a desirable behavior. However, changing
transaction id prefix to something else fixes the problem.
I enabled debug logging and here’s what I see on task managers:
21:12:42,201 DEBUG org.apache.kafka.clients.producer.internals.Sender
- [Producer clientId=producer-service_app.event_bridge-0-1,
transactionalId=service_app.event_bridge-0-1] Sending transactional request
FindCoordinatorRequestData(key='service_app.event_bridge-0-14982', keyType=1,
coordinatorKeys=[]) to node
b-2.kafka-cluster.example.c11.kafka.region.amazonaws.com:9094 (id: 2 rack:
az-6) with correlation ID 44825
21:12:42,201 DEBUG org.apache.kafka.clients.NetworkClient
- [Producer clientId=producer-service_app.event_bridge-0-1,
transactionalId=service_app.event_bridge-0-1] Sending FIND_COORDINATOR request
with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4,
clientId=producer-service_app.event_bridge-0-1, correlationId=44825,
headerVersion=2) and timeout 30000 to node 2:
FindCoordinatorRequestData(key='', keyType=1,
coordinatorKeys=[service_app.event_bridge-0-14982])
21:12:42,202 DEBUG org.apache.kafka.clients.NetworkClient
- [Producer clientId=producer-service_app.event_bridge-0-1,
transactionalId=service_app.event_bridge-0-1] Received FIND_COORDINATOR
response from node 2 for request with header
RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4,
clientId=producer-service_app.event_bridge-0-1, correlationId=44825,
headerVersion=2): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0,
errorMessage='', nodeId=0, host='', port=0,
coordinators=[Coordinator(key='service_app.event_bridge-0-14982', nodeId=2,
host='b-2.kafka-cluster.example.c11.kafka.region.amazonaws.com', port=9094,
errorCode=0, errorMessage='')])
21:12:42,242 DEBUG org.apache.kafka.clients.NetworkClient
- [Consumer clientId=service_app.event_bridge-0,
groupId=service_app.event_bridge] Received FETCH response from node 2 for
request with header RequestHeader(apiKey=FETCH, apiVersion=15,
clientId=service_app.event_bridge-0, correlationId=3108, headerVersion=2):
FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=1756029163,
responses=[])
21:12:42,242 DEBUG org.apache.kafka.clients.FetchSessionHandler
- [Consumer clientId=service_app.event_bridge-0,
groupId=service_app.event_bridge] Node 2 sent an incremental fetch response
with throttleTimeMs = 0 for session 1756029163 with 0 response partition(s), 1
implied partition(s)
21:12:42,242 DEBUG org.apache.kafka.clients.consumer.internals.AbstractFetch
- [Consumer clientId=service_app.event_bridge-0,
groupId=service_app.event_bridge] Removing pending request for node
b-2.kafka-cluster.example.c6.kafka.region.amazonaws.com:9094 (id: 2 rack: az-2)
21:12:42,242 DEBUG org.apache.kafka.clients.consumer.internals.AbstractFetch
- [Consumer clientId=service_app.event_bridge-0,
groupId=service_app.event_bridge] Added READ_COMMITTED fetch request for
partition app_domain.event_type.v1-0 at position FetchPosition{offset=4730884,
offsetEpoch=Optional[198],
currentLeader=LeaderAndEpoch{leader=Optional[b-2.kafka-cluster.example.c6.kafka.region.amazonaws.com:9094
(id: 2 rack: az-2)], epoch=198}} to node
b-2.kafka-cluster.example.c6.kafka.region.amazonaws.com:9094 (id: 2 rack: az-2)
21:12:42,242 DEBUG org.apache.kafka.clients.FetchSessionHandler
- [Consumer clientId=service_app.event_bridge-0,
groupId=service_app.event_bridge] Built incremental fetch
(sessionId=1756029163, epoch=3098) for node 2. Added 0 partition(s), altered 0
partition(s), removed 0 partition(s), replaced 0 partition(s) out of 1
partition(s)
21:12:42,242 DEBUG org.apache.kafka.clients.consumer.internals.AbstractFetch
- [Consumer clientId=service_app.event_bridge-0,
groupId=service_app.event_bridge] Sending READ_COMMITTED
IncrementalFetchRequest(toSend=(), toForget=(), toReplace=(),
implied=(app_domain.event_type.v1-0), canUseTopicIds=True) to broker
b-2.kafka-cluster.example.c6.kafka.region.amazonaws.com:9094 (id: 2 rack: az-2)
21:12:42,242 DEBUG org.apache.kafka.clients.consumer.internals.AbstractFetch
- [Consumer clientId=service_app.event_bridge-0,
groupId=service_app.event_bridge] Adding pending request for node
b-2.kafka-cluster.example.c6.kafka.region.amazonaws.com:9094 (id: 2 rack: az-2)
21:12:42,242 DEBUG org.apache.kafka.clients.NetworkClient
- [Consumer clientId=service_app.event_bridge-0,
groupId=service_app.event_bridge] Sending FETCH request with header
RequestHeader(apiKey=FETCH, apiVersion=15, clientId=service_app.event_bridge-0,
correlationId=3109, headerVersion=2) and timeout 30000 to node 2:
FetchRequestData(clusterId=null, replicaId=-1,
replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500,
minBytes=1, maxBytes=52428800, isolationLevel=1, sessionId=1756029163,
sessionEpoch=3098, topics=[], forgottenTopicsData=[], rackId='')
21:12:42,302 DEBUG org.apache.kafka.clients.producer.internals.Sender
- [Producer clientId=producer-service_app.event_bridge-0-1,
transactionalId=service_app.event_bridge-0-1] Sending transactional request
InitProducerIdRequestData(transactionalId='service_app.event_bridge-0-14982',
transactionTimeoutMs=3600000, producerId=-1, producerEpoch=-1) to node
b-2.kafka-cluster.example.c11.kafka.region.amazonaws.com:9094 (id: 2 rack:
null) with correlation ID 44826
21:12:42,302 DEBUG org.apache.kafka.clients.NetworkClient
- [Producer clientId=producer-service_app.event_bridge-0-1,
transactionalId=service_app.event_bridge-0-1] Sending INIT_PRODUCER_ID request
with header RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4,
clientId=producer-service_app.event_bridge-0-1, correlationId=44826,
headerVersion=2) and timeout 30000 to node 2:
InitProducerIdRequestData(transactionalId='service_app.event_bridge-0-14982',
transactionTimeoutMs=3600000, producerId=-1, producerEpoch=-1)
21:12:42,304 DEBUG org.apache.kafka.clients.NetworkClient
- [Producer clientId=producer-service_app.event_bridge-0-1,
transactionalId=service_app.event_bridge-0-1] Received INIT_PRODUCER_ID
response from node 2 for request with header
RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4,
clientId=producer-service_app.event_bridge-0-1, correlationId=44826,
headerVersion=2): InitProducerIdResponseData(throttleTimeMs=0, errorCode=0,
producerId=13234488, producerEpoch=14)
21:12:42,304 DEBUG
org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer - Change
transaction id from service_app.event_bridge-0-14982 to
service_app.event_bridge-0-14983
21:12:42,304 DEBUG org.apache.kafka.clients.producer.internals.Sender
- [Producer clientId=producer-service_app.event_bridge-0-1,
transactionalId=service_app.event_bridge-0-1] Sending transactional request
InitProducerIdRequestData(transactionalId='service_app.event_bridge-0-14983',
transactionTimeoutMs=3600000, producerId=-1, producerEpoch=-1) to node
b-2.kafka-cluster.example.c11.kafka.region.amazonaws.com:9094 (id: 2 rack:
null) with correlation ID 44827
This looks like some sort of a transaction state conflict, but I’m not totally
sure what’s going on. I checked for hanging transactions, but found none.
Any help would be greatly appreciated!
Best,
Konstantin