Thanks, JING and Arvid!

Interesting.  That's good to know.  I've been planning for incompatible
schema changes.  I'll look into new source too.

On Thu, Jul 29, 2021 at 4:56 AM Arvid Heise <ar...@apache.org> wrote:

> I'm assuming you have an incompatible schema change. If you don't, there
> are several easy ways.
>
> Your plan actually looks like the best option. Of course, this requires
> that you eventually union the inputs. If you can do that without a custom
> mapper and with one read schema only, you may even use 1 source with 2
> topics and the same reader schema.
>
> For idleness detection, I recommend using the new Kafka Source in 1.12.4+
> with the new source interface that supports idleness out-of-the-box.
>
> On Mon, Jul 26, 2021 at 5:52 AM JING ZHANG <beyond1...@gmail.com> wrote:
>
>> Hi Dan,
>> Do you plan to continue to read a new Kafka topic after finished read
>> current Kafka topic?
>> If yes, Your plan could works.
>>
>> BTW, if the schema of data in the new Kafka topic and the current topic
>> are same with each other, however their topic name are different with each
>> other, maybe you could try the following method, which is more simpler.
>> 1. Do a savepoint (without drain) for the job after finish all current
>> topic.
>> 2. Update the Kafka source topic name to the new topic name
>> 3. Restored job from the savepoint.
>> After restored,  the job would read the data from new topic from
>> earliest_offset because new topic name is different the previous one, so
>> those KafkaTopicPartition could not be found in restored state.
>> And restored state would be overwritten with new Kafka topic and offsets
>> after a checkpoint.
>> pease ensure that UID of the successor operators are not changed.
>>
>> Best,
>> JING ZHANG
>>
>> Dan Hill <quietgol...@gmail.com> 于2021年7月25日周日 上午3:56写道:
>>
>>> Hi!
>>>
>>> *Scenario*
>>> I want to eventually do a breaking change to a Kafka source (major
>>> version change) which requires a new Kafka topic.
>>>
>>> *Question*
>>> What utilities exist to help with this in Flink?  What best practices
>>> exist?
>>>
>>> My plan is roughly the following:
>>> 1. Change my Flink job to support both kafka sources.  Union them.  Deal
>>> with idle data sources (either temp with flag or force through watermark
>>> events).
>>> 2. Change the Kafka producer to write to the new topic.
>>> 3. When enough time has passed, delete the old operator (using
>>> allowNonRestoredState).
>>>
>>

Reply via email to