[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16788239#comment-16788239
 ] 

Tim edited comment on FLINK-11654 at 3/8/19 7:40 PM:
-----------------------------------------------------

It is not clear to me what the _recommended_ workaround is.   Is it to name the 
task so that different jobs will end up with different transaction IDs?

FWIW - I think the TransactionIdSeed is a good idea in that it makes it clear 
to the user that a token is used to uniquely identify the pool of 
KafkaProducers.   I have not thought about if and how that would impact 
recovery and savepoints though.

Also, adding my findings as well.   
http://mail-archives.apache.org/mod_mbox/flink-user/201903.mbox/%3C9E3F033F-0CB0-4659-A487-39BB728C4F01%40comcast.com%3E


was (Author: victtim):
It is not clear to me what the _recommended_ workaround is.   Is it to name the 
task so that different jobs will end up with different transaction IDs?

FWIW - I think the TransactionIdSeed is a good idea in that it makes it clear 
to the user that a token is used to uniquely identify the pool of 
KafkaProducers.   I have not thought about if and how that would impact 
recovery and savepoints though.

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-11654
>                 URL: https://issues.apache.org/jira/browse/FLINK-11654
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.7.1
>            Reporter: Jürgen Kreileder
>            Priority: Major
>             Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer<IN>
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to