Hi Oleksandr, The mapping of state to operator is done based on operator id, not on its name. That's why changing source's name might not be enough.
That actually might be a valuable addition to check if the restored partitions still match with the provided topic/topic pattern. Would you like to open jira ticket for it? Best, Dawid On 13/09/18 11:06, Oleksandr Nitavskyi wrote: > > Hello Dawid, > > > > Thank you for the answer. In our case we did change the name of the > Kafka source so we expected it shouldn’t restore state for a given > Kafka source operator. > > > > Anyway shouldn’t FlinkKafkaConsumerBase have a safeguard which do not > allow restoring of the KafkaTopicPartitions from the topics which are > different from the currently consumed one. > > > > Thank you > > Oleksandr > > > > *From: *Dawid Wysakowicz <dwysakow...@apache.org> > *Date: *Thursday, 13 September 2018 at 09:59 > *To: *Juan Gentile <j.gent...@criteo.com>, "user@flink.apache.org" > <user@flink.apache.org> > *Cc: *R&D/Product Engineering/PRIME/Delight <deli...@criteo.com>, > <gor...@data-artisans.com> > *Subject: *Re: Weird behaviour after change sources in a job. > > > > Hi Juan, > > I think this is somehow expected behaviour. Flink, in order to provide > proper processing semantics keeps track of partitions offsets > internally, and checkpoints those offsets. FlinkKafkaConsumer supports > > also new partitions discovery. Having in mind both of those features, > if you restart your job with savepoint/checkpoint but with changed > topic, it will restore old partitions with offsets from checkpoint, > and will discover partitions > > from the new topic. This is why it consumes from both old and new > topic. If you defined your source manually (you were not using > Kafka010TableSource) what you can do is set new uid for the source and > enable allowNonRestoredState. This way you will keep state for all > other operators, but you will lose > > information about offsets in Kafka. > > > > I also cc @Gordon, who might want to add something to this. > > > > On 12/09/18 18:03, Juan Gentile wrote: > > Hello! > > > > We have found a weird issue while replacing the source in one of > our Flink SQL Jobs. > > > > We have a job which was reading from a Kafka topic (with > externalize checkpoints) and we needed to change the topic while > keeping the same logic for the job/SQL. > > After we restarted the job, instead of consuming from the new > Kafka topic, it consumed from both! Duplicating the input of our job. > > We were able to reproduce the issue but we don’t understand if > this is a bug or expected behavior and in this case we should have > restarted from a clean state. > > We are using Flink 1.4 at the moment and Kafka 0.10.2.1 > > > > Thank you, > > Juan > > >
signature.asc
Description: OpenPGP digital signature