Thanks Hongshun for picking up the ticket and analyzing the detail deeply. As you have discussed with Qingsheng offline, I think we can update the content to FLIP-288[1] and then start the FLIP discussion.
Best, Leonard [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source > On Mar 15, 2023, at 5:25 PM, Hongshun Wang <loserwang1...@gmail.com> wrote: > > Thanks for your advise! I will do this later. > > > Best, Hongshun > > On Wed, Mar 15, 2023 at 5:15 PM Etienne Chauchot <echauc...@apache.org> > wrote: > >> Hi, >> >> Why not track this in a FLIP and a ticket and link this discussion thread. >> >> My 2 cents >> >> Etienne >> >> Le 15/03/2023 à 10:01, Hongshun Wang a écrit : >>> Hi devs, >>> I’d like to join this discussion. CC:Qingsheng >>> As discussed above, new partitions after the first discovery should be >>> consumed from EARLIEST offset. >>> >>> However, when KafkaSourceEnumerator restarts after a job failure, it >> cannot >>> distinguish between unassigned partitions as first-discovered or new, >>> because the snapshot state currently only contains assignedPartitions >>> collection (the assigned partitions). We can solve this by adding a >>> unAssignedInitialPartitons collection to snapshot state, which represents >>> the collection of first discovered partitions that have not yet been >>> assigned. Also, we can combine this two collections into a single >>> collection if we add status to each item. >>> >>> Besides , there is also a problem which often occurs in pattern mode to >>> distinguish between the following two case: >>> >>> 1. Case1: The first partition discovery is too slow, before which >> the >>> checkpoint is finished and then job is restarted .At this time, the >>> restored unAssignedInitialPartitons is an empty collection, which >> means >>> non-discovery. The next discovery will be treated as the first >> discovery. >>> 2. Case2: The first time the partition is obtained is empty, and new >>> partitions can only be obtained after multiple partition >> discoveries. If a >>> restart occurs between this period, the restored >>> *unAssignedInitialPartitons* is also an empty collection, which means >>> empty-discovery. However, the next discovery should be treated as a >> new >>> discovery. >>> >>> We can solve this problem by adding a boolean value(*firstDiscoveryDone*) >>> to snapshot state, which represents whether the first-discovery has been >>> done. >>> >>> Also two rejected alternatives : >>> >>> 1. Change the KafkaSourceEnumerator's snapshotState method to a >> blocking >>> one, which resumes only after the first-discovered partition has been >>> successfully assigned to KafkaSourceReader. The advantage of this >> approach >>> is no need to change the snapshot state's variable values. However, >> if >>> first-discovered partitions are not assigned before checkpointing, >> the >>> SourceCoordinator's event-loop thread will be blocked, but partition >>> assignment also requires the event-loop thread to execute, which >> will cause >>> thread self-locking. >>> 2. An alternative to the *firstDiscoveryDone* variable. If we change >> the >>> first discovery method to a synchronous method, we can ensure that >> Case1 >>> will never happen. Because when the event-loop thread starts, it >> first adds >>> a discovery event to the blocking queue. When it turns to execute the >>> checkpoint event, the partition has already been discovered >> successfully. >>> However, If partition discovery is a heavily time-consuming >> operation, the >>> SourceCoordinator cannot process other event operations during the >> waiting >>> period, such as reader registration. It is a waste. >>> >>> Best regards, >>> Hongshun >>> >>> On 2023/01/13 03:31:20 Qingsheng Ren wrote: >>>> Hi devs, >>>> >>>> I’d like to start a discussion about enabling the dynamic partition >>>> discovery feature by default in Kafka source. Dynamic partition >> discovery >>>> [1] is a useful feature in Kafka source especially under the scenario >> when >>>> the consuming Kafka topic scales out, or the source subscribes to >> multiple >>>> Kafka topics with a pattern. Users don’t have to restart the Flink job >> to >>>> consume messages in the new partition with this feature enabled. >>> Currently, >>>> dynamic partition discovery is disabled by default and users have to >>>> explicitly specify the interval of discovery in order to turn it on. >>>> >>>> # Breaking changes >>>> >>>> For Kafka table source: >>>> >>>> - “scan.topic-partition-discovery.interval” will be set to 30 seconds by >>>> default. >>>> - As we need to provide a way for users to disable the feature, >>>> “scan.topic-partition-discovery.interval” = “0” will be used to turn off >>>> the discovery. Before this proposal, “0” means to enable partition >>>> discovery with interval = 0, which is a bit senseless in practice. >>>> Unfortunately we can't use negative values as the type of this option is >>>> Duration. >>>> >>>> For KafkaSource (DataStream API) >>>> >>>> - Dynamic partition discovery in Kafka source will be enabled by >> default, >>>> with discovery interval set to 30 seconds. >>>> - To align with table source, only a positive value for option “ >>>> partition.discovery.interval.ms” could be used to specify the discovery >>>> interval. Both negative and zero will be interpreted as disabling the >>>> feature. >>>> >>>> # Overhead of partition discovery >>>> >>>> Partition discovery is made on KafkaSourceEnumerator, which >> asynchronously >>>> fetches topic metadata from Kafka cluster and checks if there’s any new >>>> topic and partition. This shouldn’t introduce performance issues on the >>>> Flink side. >>>> >>>> On the Kafka broker side, partition discovery makes MetadataRequest to >>>> Kafka broker for fetching topic infos. Considering Kafka broker has its >>>> metadata cache and the default request frequency is relatively low (per >> 30 >>>> seconds), this is not a heavy operation and the performance of the >> broker >>>> won’t be affected a lot. It'll also be great to get some inputs from >> Kafka >>>> experts. >>>> >>>> Looking forward to your feedback! >>>> >>>> [1] >>>> >>> >> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/kafka/#dynamic-partition-discovery >>>> Best regards, >>>> Qingsheng >>>> >>