Hi Hongshun Agree with @Etienne. And can you describe this process and the problems in a figure? Thanks
Best, Shammon FY On Wed, Mar 15, 2023 at 5:15 PM Etienne Chauchot <echauc...@apache.org> wrote: > Hi, > > Why not track this in a FLIP and a ticket and link this discussion thread. > > My 2 cents > > Etienne > > Le 15/03/2023 à 10:01, Hongshun Wang a écrit : > > Hi devs, > > I’d like to join this discussion. CC:Qingsheng > > As discussed above, new partitions after the first discovery should be > > consumed from EARLIEST offset. > > > > However, when KafkaSourceEnumerator restarts after a job failure, it > cannot > > distinguish between unassigned partitions as first-discovered or new, > > because the snapshot state currently only contains assignedPartitions > > collection (the assigned partitions). We can solve this by adding a > > unAssignedInitialPartitons collection to snapshot state, which represents > > the collection of first discovered partitions that have not yet been > > assigned. Also, we can combine this two collections into a single > > collection if we add status to each item. > > > > Besides , there is also a problem which often occurs in pattern mode to > > distinguish between the following two case: > > > > 1. Case1: The first partition discovery is too slow, before which > the > > checkpoint is finished and then job is restarted .At this time, the > > restored unAssignedInitialPartitons is an empty collection, which > means > > non-discovery. The next discovery will be treated as the first > discovery. > > 2. Case2: The first time the partition is obtained is empty, and new > > partitions can only be obtained after multiple partition > discoveries. If a > > restart occurs between this period, the restored > > *unAssignedInitialPartitons* is also an empty collection, which means > > empty-discovery. However, the next discovery should be treated as a > new > > discovery. > > > > We can solve this problem by adding a boolean value(*firstDiscoveryDone*) > > to snapshot state, which represents whether the first-discovery has been > > done. > > > > Also two rejected alternatives : > > > > 1. Change the KafkaSourceEnumerator's snapshotState method to a > blocking > > one, which resumes only after the first-discovered partition has been > > successfully assigned to KafkaSourceReader. The advantage of this > approach > > is no need to change the snapshot state's variable values. However, > if > > first-discovered partitions are not assigned before checkpointing, > the > > SourceCoordinator's event-loop thread will be blocked, but partition > > assignment also requires the event-loop thread to execute, which > will cause > > thread self-locking. > > 2. An alternative to the *firstDiscoveryDone* variable. If we change > the > > first discovery method to a synchronous method, we can ensure that > Case1 > > will never happen. Because when the event-loop thread starts, it > first adds > > a discovery event to the blocking queue. When it turns to execute the > > checkpoint event, the partition has already been discovered > successfully. > > However, If partition discovery is a heavily time-consuming > operation, the > > SourceCoordinator cannot process other event operations during the > waiting > > period, such as reader registration. It is a waste. > > > > Best regards, > > Hongshun > > > > On 2023/01/13 03:31:20 Qingsheng Ren wrote: > >> Hi devs, > >> > >> I’d like to start a discussion about enabling the dynamic partition > >> discovery feature by default in Kafka source. Dynamic partition > discovery > >> [1] is a useful feature in Kafka source especially under the scenario > when > >> the consuming Kafka topic scales out, or the source subscribes to > multiple > >> Kafka topics with a pattern. Users don’t have to restart the Flink job > to > >> consume messages in the new partition with this feature enabled. > > Currently, > >> dynamic partition discovery is disabled by default and users have to > >> explicitly specify the interval of discovery in order to turn it on. > >> > >> # Breaking changes > >> > >> For Kafka table source: > >> > >> - “scan.topic-partition-discovery.interval” will be set to 30 seconds by > >> default. > >> - As we need to provide a way for users to disable the feature, > >> “scan.topic-partition-discovery.interval” = “0” will be used to turn off > >> the discovery. Before this proposal, “0” means to enable partition > >> discovery with interval = 0, which is a bit senseless in practice. > >> Unfortunately we can't use negative values as the type of this option is > >> Duration. > >> > >> For KafkaSource (DataStream API) > >> > >> - Dynamic partition discovery in Kafka source will be enabled by > default, > >> with discovery interval set to 30 seconds. > >> - To align with table source, only a positive value for option “ > >> partition.discovery.interval.ms” could be used to specify the discovery > >> interval. Both negative and zero will be interpreted as disabling the > >> feature. > >> > >> # Overhead of partition discovery > >> > >> Partition discovery is made on KafkaSourceEnumerator, which > asynchronously > >> fetches topic metadata from Kafka cluster and checks if there’s any new > >> topic and partition. This shouldn’t introduce performance issues on the > >> Flink side. > >> > >> On the Kafka broker side, partition discovery makes MetadataRequest to > >> Kafka broker for fetching topic infos. Considering Kafka broker has its > >> metadata cache and the default request frequency is relatively low (per > 30 > >> seconds), this is not a heavy operation and the performance of the > broker > >> won’t be affected a lot. It'll also be great to get some inputs from > Kafka > >> experts. > >> > >> Looking forward to your feedback! > >> > >> [1] > >> > > > https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/kafka/#dynamic-partition-discovery > >> Best regards, > >> Qingsheng > >> >