Jürgen Kreileder created FLINK-11654:
----------------------------------------
Summary: ProducerFencedExceptions from Kafka in EXACTLY_ONCE mode
due to identical transactional IDs in multiple jobs
Key: FLINK-11654
URL: https://issues.apache.org/jira/browse/FLINK-11654
Project: Flink
Issue Type: Bug
Components: Kafka Connector
Affects Versions: 1.7.1
Reporter: Jürgen Kreileder
We run multiple jobs on a cluster which write a lot to the same Kafka topic
from identically named sinks. When EXACTLY_ONCE semantic is enabled for the
KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go
into a restart cycle.
Example exception from the Kafka log:
{code:java}
[2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing
append operation on partition finding-commands-dev-1-0
(kafka.server.ReplicaManager)
org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no
longer valid. There is probably another producer with a newer epoch. 483
(request epoch), 484 (server epoch)
{code}
The reason for this is the way FlinkKafkaProducer initializes the
TransactionalIdsGenerator:
The IDs are only guaranteed to be unique for a single Job. But they can clash
between different Jobs (and Clusters).
{code:java}
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -819,6 +819,7 @@ public class FlinkKafkaProducer<IN>
nextTransactionalIdHintState =
context.getOperatorStateStore().getUnionListState(
NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
transactionalIdsGenerator = new TransactionalIdsGenerator(
+ // the prefix probably should include job id and maybe cluster id
getRuntimeContext().getTaskName() + "-" +
((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)