Hi Hongshun

Agree with @Etienne. And can you describe this process and the problems in
a figure? Thanks

Best,
Shammon FY

On Wed, Mar 15, 2023 at 5:15 PM Etienne Chauchot <echauc...@apache.org>
wrote:

> Hi,
>
> Why not track this in a FLIP and a ticket and link this discussion thread.
>
> My 2 cents
>
> Etienne
>
> Le 15/03/2023 à 10:01, Hongshun Wang a écrit :
> >   Hi devs,
> > I’d like to join this discussion. CC:Qingsheng
> > As discussed above, new partitions after the first discovery should be
> > consumed from EARLIEST offset.
> >
> > However, when KafkaSourceEnumerator restarts after a job failure, it
> cannot
> > distinguish between unassigned partitions as first-discovered or new,
> > because the snapshot state currently only contains assignedPartitions
> > collection (the assigned partitions). We can solve this by adding a
> > unAssignedInitialPartitons collection to snapshot state, which represents
> > the collection of first discovered partitions that have not yet been
> > assigned. Also, we can combine this two collections into a single
> > collection if we add status to each item.
> >
> > Besides , there is also a problem which often occurs in pattern mode to
> > distinguish between the following two case:
> >
> >     1. Case1:  The first partition discovery is too slow, before which
> the
> >     checkpoint is finished and then job is restarted .At this time, the
> >     restored unAssignedInitialPartitons is an empty collection, which
> means
> >     non-discovery. The next discovery will be treated as the first
> discovery.
> >     2. Case2:  The first time the partition is obtained is empty, and new
> >     partitions can only be obtained after multiple partition
> discoveries. If a
> >     restart occurs between this period, the restored
> >     *unAssignedInitialPartitons* is also an empty collection, which means
> >     empty-discovery. However, the next discovery should be treated as a
> new
> >     discovery.
> >
> > We can solve this problem by adding a boolean value(*firstDiscoveryDone*)
> > to snapshot state, which represents whether the first-discovery has been
> > done.
> >
> > Also two rejected alternatives :
> >
> >     1. Change the KafkaSourceEnumerator's snapshotState method to a
> blocking
> >     one, which resumes only after the first-discovered partition has been
> >     successfully assigned to KafkaSourceReader. The advantage of this
> approach
> >     is no need to change the snapshot state's variable values. However,
> if
> >     first-discovered partitions are not assigned before checkpointing,
> the
> >     SourceCoordinator's event-loop thread will be blocked, but partition
> >     assignment also requires the event-loop thread to execute, which
> will cause
> >     thread self-locking.
> >     2. An alternative to the *firstDiscoveryDone* variable. If we change
> the
> >     first discovery method to a synchronous method, we can ensure that
> Case1
> >     will never happen. Because when the event-loop thread starts, it
> first adds
> >     a discovery event to the blocking queue. When it turns to execute the
> >     checkpoint event, the partition has already been discovered
> successfully.
> >     However, If partition discovery is a heavily time-consuming
> operation, the
> >     SourceCoordinator cannot process other event operations during the
> waiting
> >     period, such as reader registration. It is a waste.
> >
> > Best regards,
> > Hongshun
> >
> > On 2023/01/13 03:31:20 Qingsheng Ren wrote:
> >> Hi devs,
> >>
> >> I’d like to start a discussion about enabling the dynamic partition
> >> discovery feature by default in Kafka source. Dynamic partition
> discovery
> >> [1] is a useful feature in Kafka source especially under the scenario
> when
> >> the consuming Kafka topic scales out, or the source subscribes to
> multiple
> >> Kafka topics with a pattern. Users don’t have to restart the Flink job
> to
> >> consume messages in the new partition with this feature enabled.
> > Currently,
> >> dynamic partition discovery is disabled by default and users have to
> >> explicitly specify the interval of discovery in order to turn it on.
> >>
> >> # Breaking changes
> >>
> >> For Kafka table source:
> >>
> >> - “scan.topic-partition-discovery.interval” will be set to 30 seconds by
> >> default.
> >> - As we need to provide a way for users to disable the feature,
> >> “scan.topic-partition-discovery.interval” = “0” will be used to turn off
> >> the discovery. Before this proposal, “0” means to enable partition
> >> discovery with interval = 0, which is a bit senseless in practice.
> >> Unfortunately we can't use negative values as the type of this option is
> >> Duration.
> >>
> >> For KafkaSource (DataStream API)
> >>
> >> - Dynamic partition discovery in Kafka source will be enabled by
> default,
> >> with discovery interval set to 30 seconds.
> >> - To align with table source, only a positive value for option “
> >> partition.discovery.interval.ms” could be used to specify the discovery
> >> interval. Both negative and zero will be interpreted as disabling the
> >> feature.
> >>
> >> # Overhead of partition discovery
> >>
> >> Partition discovery is made on KafkaSourceEnumerator, which
> asynchronously
> >> fetches topic metadata from Kafka cluster and checks if there’s any new
> >> topic and partition. This shouldn’t introduce performance issues on the
> >> Flink side.
> >>
> >> On the Kafka broker side, partition discovery makes MetadataRequest to
> >> Kafka broker for fetching topic infos. Considering Kafka broker has its
> >> metadata cache and the default request frequency is relatively low (per
> 30
> >> seconds), this is not a heavy operation and the performance of the
> broker
> >> won’t be affected a lot. It'll also be great to get some inputs from
> Kafka
> >> experts.
> >>
> >> Looking forward to your feedback!
> >>
> >> [1]
> >>
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/kafka/#dynamic-partition-discovery
> >> Best regards,
> >> Qingsheng
> >>
>

Reply via email to