This sounds like a no-brainer +1 Two things that seem to be obvious, but might be good to double check:
1. All newly discovered partitions will be consumed from the earliest offset possible. That's how it's documented for version 1.12 [1], but not for later versions, which is why I would like to double check. If so, I think it would also be good to restore that in the documentation. 2. Let's assume the following running situation: * Dynamic partition discovery is enabled, set to 30 secs * Job runs * Job checkpoints to checkpoint X * New partitions are discovered and consumption from the new partitions is starting * Job crashes before new checkpoint has been created My assumption is that the job restarts from checkpoint X, which includes the old number of partitions. Either directly when the job restarts, or when the next partitions are discovered, those new/for the job unknown partitions would be consumed. We still have exactly-once guarantees, since there was no new checkpoint made before the job crashed Best regards, Martijn [1] https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery Op vr 13 jan. 2023 om 08:27 schreef Leonard Xu <xbjt...@gmail.com>: > Thanks Qingsheng for driving this, enable the dynamic partition discovery > would be very useful for kafka topic scale partitions scenarios. > > +1 for the change. > > CC: Becket > > > Best, > Leonard > > > > > On Jan 13, 2023, at 3:15 PM, Jark Wu <imj...@gmail.com> wrote: > > > > +1 for the change. I think this is beneficial for users and is > compatible. > > > > Best, > > Jark > > > > On Fri, 13 Jan 2023 at 14:22, 何军 <xuehaijux...@gmail.com> wrote: > > > >>> > >>> +1 for this idea, we have enabled kafka dynamic partition discovery in > >> all > >>> jobs. > >>> > >>> > >> > >