Hi Oleksandr,

The mapping of state to operator is done based on operator id, not on
its name. That's why changing source's name might not be enough.

That actually might be a valuable addition to check if the restored
partitions still match with the provided topic/topic pattern. Would you
like to open jira ticket for it?

Best,

Dawid


On 13/09/18 11:06, Oleksandr Nitavskyi wrote:
>
> Hello Dawid,
>
>  
>
> Thank you for the answer. In our case we did change the name of the
> Kafka source so we expected it shouldn’t restore state for a given
> Kafka source operator.
>
>  
>
> Anyway shouldn’t FlinkKafkaConsumerBase have a safeguard which do not
> allow restoring of the KafkaTopicPartitions from the topics which are
> different from the currently consumed one.
>
>  
>
> Thank you
>
> Oleksandr
>
>  
>
> *From: *Dawid Wysakowicz <dwysakow...@apache.org>
> *Date: *Thursday, 13 September 2018 at 09:59
> *To: *Juan Gentile <j.gent...@criteo.com>, "user@flink.apache.org"
> <user@flink.apache.org>
> *Cc: *R&D/Product Engineering/PRIME/Delight <deli...@criteo.com>,
> <gor...@data-artisans.com>
> *Subject: *Re: Weird behaviour after change sources in a job.
>
>  
>
> Hi Juan,
>
> I think this is somehow expected behaviour. Flink, in order to provide
> proper processing semantics keeps track of partitions offsets
> internally, and checkpoints those offsets. FlinkKafkaConsumer supports
>
> also new partitions discovery. Having in mind both of those features,
> if you restart your job with savepoint/checkpoint but with changed
> topic, it will restore old partitions with offsets from checkpoint,
> and will discover partitions
>
> from the new topic. This is why it consumes from both old and new
> topic. If you defined your source manually (you were not using
> Kafka010TableSource) what you can do is set new uid for the source and
> enable allowNonRestoredState. This way you will keep state for all
> other operators, but you will lose
>
> information about offsets in Kafka.
>
>  
>
> I also cc @Gordon, who might want to add something to this.
>
>  
>
> On 12/09/18 18:03, Juan Gentile wrote:
>
>     Hello!
>
>      
>
>     We have found a weird issue while replacing the source in one of
>     our Flink SQL Jobs.
>
>      
>
>     We have a job which was reading from a Kafka topic (with
>     externalize checkpoints) and we needed to change the topic while
>     keeping the same logic for the job/SQL.
>
>     After we restarted the job, instead of consuming from the new
>     Kafka topic, it consumed from both! Duplicating the input of our job.
>
>     We were able to reproduce the issue but we don’t understand if
>     this is a bug or expected behavior and in this case we should have
>     restarted from a clean state.
>
>     We are using Flink 1.4 at the moment and Kafka 0.10.2.1
>
>      
>
>     Thank you,
>
>     Juan
>
>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to