Hello Dawid,

Thank you for the answer. In our case we did change the name of the Kafka 
source so we expected it shouldn’t restore state for a given Kafka source 
operator.

Anyway shouldn’t FlinkKafkaConsumerBase have a safeguard which do not allow 
restoring of the KafkaTopicPartitions from the topics which are different from 
the currently consumed one.

Thank you
Oleksandr

From: Dawid Wysakowicz <dwysakow...@apache.org>
Date: Thursday, 13 September 2018 at 09:59
To: Juan Gentile <j.gent...@criteo.com>, "user@flink.apache.org" 
<user@flink.apache.org>
Cc: R&D/Product Engineering/PRIME/Delight <deli...@criteo.com>, 
<gor...@data-artisans.com>
Subject: Re: Weird behaviour after change sources in a job.


Hi Juan,

I think this is somehow expected behaviour. Flink, in order to provide proper 
processing semantics keeps track of partitions offsets internally, and 
checkpoints those offsets. FlinkKafkaConsumer supports

also new partitions discovery. Having in mind both of those features, if you 
restart your job with savepoint/checkpoint but with changed topic, it will 
restore old partitions with offsets from checkpoint, and will discover 
partitions

from the new topic. This is why it consumes from both old and new topic. If you 
defined your source manually (you were not using Kafka010TableSource) what you 
can do is set new uid for the source and enable allowNonRestoredState. This way 
you will keep state for all other operators, but you will lose

information about offsets in Kafka.



I also cc @Gordon, who might want to add something to this.

On 12/09/18 18:03, Juan Gentile wrote:
Hello!

We have found a weird issue while replacing the source in one of our Flink SQL 
Jobs.

We have a job which was reading from a Kafka topic (with externalize 
checkpoints) and we needed to change the topic while keeping the same logic for 
the job/SQL.
After we restarted the job, instead of consuming from the new Kafka topic, it 
consumed from both! Duplicating the input of our job.
We were able to reproduce the issue but we don’t understand if this is a bug or 
expected behavior and in this case we should have restarted from a clean state.
We are using Flink 1.4 at the moment and Kafka 0.10.2.1

Thank you,
Juan


Reply via email to