Re: Change Kafka cluster without loosing the Flink's state

2017-09-14 Thread kla
Hi Gordon, Thanks again for your answer. But I am not sure if I understood this part: "The workaround, for now, would be to explicitly disable chaining of the consumer source with any stateful operators before taking the savepoint and changing the operator UID." So my code looks like this:

Re: Change Kafka cluster without loosing the Flink's state

2017-09-14 Thread Tzu-Li (Gordon) Tai
Simply like this: env.addSource(new FlinkKafkaConsumer(...)).uid(“some-unique-id”) The same goes for any other operator. However, do keep in mind this bug that was just recently uncovered:  https://issues.apache.org/jira/browse/FLINK-7623. What I described in my previous reply would not work as e

Re: Change Kafka cluster without loosing the Flink's state

2017-09-14 Thread kla
Hi Gordon, Thanks for your quick reply. I have following consumer: jobConfiguration.getEnv().addSource( new FlinkKafkaConsumer010<>(properties.getProperty(TOPIC), deserializer, properties)); How can I set the UID for the consumer ? Thanks again for help! Regards, Konstantin -- Sent from: h

Re: Change Kafka cluster without loosing the Flink's state

2017-09-14 Thread Tzu-Li (Gordon) Tai
Hi Konstantin, After migrating and connecting to the new Kafka cluster, do you want the Kafka consumer to start fresh without any partition offset state (and therefore will re-establish its partition-to-subtask assignments), while keeping all other operator state in the pipeline intact? If so,