Thanks Hongshun for picking up the ticket and analyzing the detail deeply.

As you have discussed with Qingsheng offline, I think we can update the content 
to FLIP-288[1] and then start the FLIP discussion.


Best,
Leonard
[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source

> On Mar 15, 2023, at 5:25 PM, Hongshun Wang <loserwang1...@gmail.com> wrote:
> 
> Thanks for your advise! I will do this later.
> 
> 
> Best, Hongshun
> 
> On Wed, Mar 15, 2023 at 5:15 PM Etienne Chauchot <echauc...@apache.org>
> wrote:
> 
>> Hi,
>> 
>> Why not track this in a FLIP and a ticket and link this discussion thread.
>> 
>> My 2 cents
>> 
>> Etienne
>> 
>> Le 15/03/2023 à 10:01, Hongshun Wang a écrit :
>>>  Hi devs,
>>> I’d like to join this discussion. CC:Qingsheng
>>> As discussed above, new partitions after the first discovery should be
>>> consumed from EARLIEST offset.
>>> 
>>> However, when KafkaSourceEnumerator restarts after a job failure, it
>> cannot
>>> distinguish between unassigned partitions as first-discovered or new,
>>> because the snapshot state currently only contains assignedPartitions
>>> collection (the assigned partitions). We can solve this by adding a
>>> unAssignedInitialPartitons collection to snapshot state, which represents
>>> the collection of first discovered partitions that have not yet been
>>> assigned. Also, we can combine this two collections into a single
>>> collection if we add status to each item.
>>> 
>>> Besides , there is also a problem which often occurs in pattern mode to
>>> distinguish between the following two case:
>>> 
>>>    1. Case1:  The first partition discovery is too slow, before which
>> the
>>>    checkpoint is finished and then job is restarted .At this time, the
>>>    restored unAssignedInitialPartitons is an empty collection, which
>> means
>>>    non-discovery. The next discovery will be treated as the first
>> discovery.
>>>    2. Case2:  The first time the partition is obtained is empty, and new
>>>    partitions can only be obtained after multiple partition
>> discoveries. If a
>>>    restart occurs between this period, the restored
>>>    *unAssignedInitialPartitons* is also an empty collection, which means
>>>    empty-discovery. However, the next discovery should be treated as a
>> new
>>>    discovery.
>>> 
>>> We can solve this problem by adding a boolean value(*firstDiscoveryDone*)
>>> to snapshot state, which represents whether the first-discovery has been
>>> done.
>>> 
>>> Also two rejected alternatives :
>>> 
>>>    1. Change the KafkaSourceEnumerator's snapshotState method to a
>> blocking
>>>    one, which resumes only after the first-discovered partition has been
>>>    successfully assigned to KafkaSourceReader. The advantage of this
>> approach
>>>    is no need to change the snapshot state's variable values. However,
>> if
>>>    first-discovered partitions are not assigned before checkpointing,
>> the
>>>    SourceCoordinator's event-loop thread will be blocked, but partition
>>>    assignment also requires the event-loop thread to execute, which
>> will cause
>>>    thread self-locking.
>>>    2. An alternative to the *firstDiscoveryDone* variable. If we change
>> the
>>>    first discovery method to a synchronous method, we can ensure that
>> Case1
>>>    will never happen. Because when the event-loop thread starts, it
>> first adds
>>>    a discovery event to the blocking queue. When it turns to execute the
>>>    checkpoint event, the partition has already been discovered
>> successfully.
>>>    However, If partition discovery is a heavily time-consuming
>> operation, the
>>>    SourceCoordinator cannot process other event operations during the
>> waiting
>>>    period, such as reader registration. It is a waste.
>>> 
>>> Best regards,
>>> Hongshun
>>> 
>>> On 2023/01/13 03:31:20 Qingsheng Ren wrote:
>>>> Hi devs,
>>>> 
>>>> I’d like to start a discussion about enabling the dynamic partition
>>>> discovery feature by default in Kafka source. Dynamic partition
>> discovery
>>>> [1] is a useful feature in Kafka source especially under the scenario
>> when
>>>> the consuming Kafka topic scales out, or the source subscribes to
>> multiple
>>>> Kafka topics with a pattern. Users don’t have to restart the Flink job
>> to
>>>> consume messages in the new partition with this feature enabled.
>>> Currently,
>>>> dynamic partition discovery is disabled by default and users have to
>>>> explicitly specify the interval of discovery in order to turn it on.
>>>> 
>>>> # Breaking changes
>>>> 
>>>> For Kafka table source:
>>>> 
>>>> - “scan.topic-partition-discovery.interval” will be set to 30 seconds by
>>>> default.
>>>> - As we need to provide a way for users to disable the feature,
>>>> “scan.topic-partition-discovery.interval” = “0” will be used to turn off
>>>> the discovery. Before this proposal, “0” means to enable partition
>>>> discovery with interval = 0, which is a bit senseless in practice.
>>>> Unfortunately we can't use negative values as the type of this option is
>>>> Duration.
>>>> 
>>>> For KafkaSource (DataStream API)
>>>> 
>>>> - Dynamic partition discovery in Kafka source will be enabled by
>> default,
>>>> with discovery interval set to 30 seconds.
>>>> - To align with table source, only a positive value for option “
>>>> partition.discovery.interval.ms” could be used to specify the discovery
>>>> interval. Both negative and zero will be interpreted as disabling the
>>>> feature.
>>>> 
>>>> # Overhead of partition discovery
>>>> 
>>>> Partition discovery is made on KafkaSourceEnumerator, which
>> asynchronously
>>>> fetches topic metadata from Kafka cluster and checks if there’s any new
>>>> topic and partition. This shouldn’t introduce performance issues on the
>>>> Flink side.
>>>> 
>>>> On the Kafka broker side, partition discovery makes MetadataRequest to
>>>> Kafka broker for fetching topic infos. Considering Kafka broker has its
>>>> metadata cache and the default request frequency is relatively low (per
>> 30
>>>> seconds), this is not a heavy operation and the performance of the
>> broker
>>>> won’t be affected a lot. It'll also be great to get some inputs from
>> Kafka
>>>> experts.
>>>> 
>>>> Looking forward to your feedback!
>>>> 
>>>> [1]
>>>> 
>>> 
>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/kafka/#dynamic-partition-discovery
>>>> Best regards,
>>>> Qingsheng
>>>> 
>> 

Reply via email to