Hi Dan, sorry for the mixup. I think the idleness definition [1] is orthogonal to the used source interface. The new source interface just makes it more obvious to the user that he can override the watermark strategy.
I'd still recommend having a look at the new Kafka source though. One interesting thing is that you can now specify end offsets. So you could define a time where you switch from old to new and the old source would then close automatically. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources On Fri, Jul 30, 2021 at 3:17 AM Dan Hill <quietgol...@gmail.com> wrote: > Are there any docs that talk about the new idleness support? I want to > understand it better. > > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/ > > On Thu, Jul 29, 2021 at 6:15 PM Dan Hill <quietgol...@gmail.com> wrote: > >> Thanks, JING and Arvid! >> >> Interesting. That's good to know. I've been planning for incompatible >> schema changes. I'll look into new source too. >> >> On Thu, Jul 29, 2021 at 4:56 AM Arvid Heise <ar...@apache.org> wrote: >> >>> I'm assuming you have an incompatible schema change. If you don't, there >>> are several easy ways. >>> >>> Your plan actually looks like the best option. Of course, this requires >>> that you eventually union the inputs. If you can do that without a custom >>> mapper and with one read schema only, you may even use 1 source with 2 >>> topics and the same reader schema. >>> >>> For idleness detection, I recommend using the new Kafka Source in >>> 1.12.4+ with the new source interface that supports idleness out-of-the-box. >>> >>> On Mon, Jul 26, 2021 at 5:52 AM JING ZHANG <beyond1...@gmail.com> wrote: >>> >>>> Hi Dan, >>>> Do you plan to continue to read a new Kafka topic after finished read >>>> current Kafka topic? >>>> If yes, Your plan could works. >>>> >>>> BTW, if the schema of data in the new Kafka topic and the current topic >>>> are same with each other, however their topic name are different with each >>>> other, maybe you could try the following method, which is more simpler. >>>> 1. Do a savepoint (without drain) for the job after finish all current >>>> topic. >>>> 2. Update the Kafka source topic name to the new topic name >>>> 3. Restored job from the savepoint. >>>> After restored, the job would read the data from new topic from >>>> earliest_offset because new topic name is different the previous one, so >>>> those KafkaTopicPartition could not be found in restored state. >>>> And restored state would be overwritten with new Kafka topic and >>>> offsets after a checkpoint. >>>> pease ensure that UID of the successor operators are not changed. >>>> >>>> Best, >>>> JING ZHANG >>>> >>>> Dan Hill <quietgol...@gmail.com> 于2021年7月25日周日 上午3:56写道: >>>> >>>>> Hi! >>>>> >>>>> *Scenario* >>>>> I want to eventually do a breaking change to a Kafka source (major >>>>> version change) which requires a new Kafka topic. >>>>> >>>>> *Question* >>>>> What utilities exist to help with this in Flink? What best practices >>>>> exist? >>>>> >>>>> My plan is roughly the following: >>>>> 1. Change my Flink job to support both kafka sources. Union them. >>>>> Deal with idle data sources (either temp with flag or force through >>>>> watermark events). >>>>> 2. Change the Kafka producer to write to the new topic. >>>>> 3. When enough time has passed, delete the old operator (using >>>>> allowNonRestoredState). >>>>> >>>>