Oh and Mason, definitely interesting! :) On Fri, Feb 10, 2023 at 9:51 PM Martijn Visser <martijnvis...@apache.org> wrote:
> @Qingsheng what are your next steps for this proposal? > > On Thu, Jan 19, 2023 at 9:14 AM Mason Chen <mas.chen6...@gmail.com> wrote: > >> Hi all, >> >> Sorry to come into the discussion late--I saw the thread earlier. >> >> I'm also +1 for the change in general. I think most users have this turned >> on by default since the overhead is quite low. A default in the two digit >> seconds range works well for us. However, I do have two main concerns that >> are related, but don't necessarily block this FLIP: >> >> 1. Timestamp Offset Initializer >> >> Currently, the timestamp offset initializer defaults the offset reset >> strategy to LATEST. This can present some problems if the discovery >> interval is set too large since records from new partitions could be >> skipped (the set timestamp is not found in Kafka, thus resetting to the >> latest). Here is a ticket to allow customizations: >> https://issues.apache.org/jira/browse/FLINK-30200 (Qingsheng, you might >> remember this from a PR review). Thanks for mentioning this in your FLIP! >> >> 2. AdminClient Fault Tolerance >> >> AdminClient, which is used for partition discovery, seems not to handle >> Kafka timeouts as robustly as the KafkaConsumer API, and we have noticed >> that transient network hiccups cause full job restarts (since the >> jobmanager fails) in numerous incidents. Internally, we have introduced an >> error handling strategy based on the number of consecutive partition >> discovery failures. I'm interested in opening a JIRA ticket to contribute >> this feature back to Flink and open making the error handling more >> pluggable. What do you think? >> >> Best, >> Mason >> >> On Sun, Jan 15, 2023 at 11:39 PM Qingsheng Ren <re...@apache.org> wrote: >> >> > Thanks for the input Becket! >> > >> > I reorganized this proposal into FLIP-288 [1]. >> > >> > [1] >> > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source >> > >> > Best, >> > Qingsheng >> > >> > On Sun, Jan 15, 2023 at 9:18 AM Becket Qin <becket....@gmail.com> >> wrote: >> > >> > > Thanks for the proposal, Qingsheng. >> > > >> > > +1 to enable auto partition discovery by default. Just a reminder, we >> > need >> > > a FLIP for this. >> > > >> > > A bit more background on this. >> > > >> > > Most of the Kafka users simply subscribe to a topic and let the >> consumer >> > to >> > > automatically adapt to partition changes. So enabling auto partition >> > > discovery would align with that experience. The counter argument last >> > time >> > > when I proposed to enable auto partition discovery was mainly due to >> the >> > > concern from the Flink users. There were arguments that sometimes >> users >> > > don't want the partition changes to get automatically picked up, but >> want >> > > to do this by restarting the job manually so they can avoid unnoticed >> > > changes in the jobs. >> > > >> > > Given that in the old Flink source, by default the auto partition >> > discovery >> > > was disabled, and there are use cases from both sides, we simply kept >> the >> > > behavior unchanged. From the discussion we have here, it looks like >> > > enabling auto partition discovery is much preferred. So I think we >> should >> > > do it. >> > > >> > > I am not worried about the performance. The new Kafka source will only >> > have >> > > the SplitEnumerator sending metadata requests when the feature is >> > enabled. >> > > It is actually much cheaper than the old Kafka source where every >> > > subtask does that. >> > > >> > > Thanks, >> > > >> > > Jiangjie (Becket) Qin >> > > >> > > >> > > >> > > On Sat, Jan 14, 2023 at 11:46 AM Yun Tang <myas...@live.com> wrote: >> > > >> > > > +1 for this proposal and thanks Qingsheng for driving this. >> > > > >> > > > Considering the interval, we also set the value as 5min, equivalent >> to >> > > the >> > > > default value of metadata.max.age.ms. >> > > > >> > > > >> > > > Best >> > > > Yun Tang >> > > > ________________________________ >> > > > From: Benchao Li <libenc...@apache.org> >> > > > Sent: Friday, January 13, 2023 23:06 >> > > > To: dev@flink.apache.org <dev@flink.apache.org> >> > > > Subject: Re: [DISCUSS] Enabling dynamic partition discovery by >> default >> > in >> > > > Kafka source >> > > > >> > > > +1, we've enabled this by default (10mins) in our production for >> years. >> > > > >> > > > Jing Ge <j...@ververica.com.invalid> 于2023年1月13日周五 22:22写道: >> > > > >> > > > > +1 for the proposal that makes users' daily work easier and >> therefore >> > > > makes >> > > > > Flink more attractive. >> > > > > >> > > > > Best regards, >> > > > > Jing >> > > > > >> > > > > >> > > > > On Fri, Jan 13, 2023 at 11:27 AM Qingsheng Ren <re...@apache.org> >> > > wrote: >> > > > > >> > > > > > Thanks everyone for joining the discussion! >> > > > > > >> > > > > > @Martijn: >> > > > > > >> > > > > > > All newly discovered partitions will be consumed from the >> > earliest >> > > > > offset >> > > > > > possible. >> > > > > > >> > > > > > Thanks for the reminder! I checked the logic of KafkaSource and >> > found >> > > > > that >> > > > > > new partitions will start from the offset initializer specified >> by >> > > the >> > > > > user >> > > > > > instead of the earliest. We need to correct this behavior to >> avoid >> > > > > dropping >> > > > > > messages from new partitions. >> > > > > > >> > > > > > > Job restarts from checkpoint >> > > > > > >> > > > > > I think the current logic guarantees the exactly-once semantic. >> New >> > > > > > partitions created after the checkpoint will be re-discovered >> again >> > > and >> > > > > > picked up by the source. >> > > > > > >> > > > > > @John: >> > > > > > >> > > > > > > If you want to be a little conservative with the default, 5 >> > minutes >> > > > > might >> > > > > > be better than 30 seconds. >> > > > > > >> > > > > > Thanks for the suggestion! I tried to find the equivalent >> config in >> > > > Kafka >> > > > > > but missed it. It would be neat to align with the default value >> of >> > " >> > > > > > metadata.max.age.ms". >> > > > > > >> > > > > > @Gabor: >> > > > > > >> > > > > > > removed partition handling is not yet added >> > > > > > >> > > > > > There was a detailed discussion about removing partitions [1] >> but >> > it >> > > > > looks >> > > > > > like this is not an easy task considering the potential data >> loss >> > and >> > > > > state >> > > > > > inconsistency. I'm afraid there's no clear plan on this one and >> > maybe >> > > > we >> > > > > > could trigger a new discussion thread about how to correctly >> handle >> > > > > removed >> > > > > > partitions. >> > > > > > >> > > > > > [1] >> > https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt >> > > > > > >> > > > > > Best regards, >> > > > > > Qingsheng >> > > > > > >> > > > > > >> > > > > > On Fri, Jan 13, 2023 at 4:33 PM Gabor Somogyi < >> > > > gabor.g.somo...@gmail.com >> > > > > > >> > > > > > wrote: >> > > > > > >> > > > > > > +1 on the overall direction, it's an important feature. >> > > > > > > >> > > > > > > I've had a look on the latest master and looks like removed >> > > partition >> > > > > > > handling is not yet added but I think this is essential. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305 >> > > > > > > >> > > > > > > If a partition all of a sudden disappears then it could lead >> to >> > > data >> > > > > > loss. >> > > > > > > Are you planning to add it? >> > > > > > > If yes then when? >> > > > > > > >> > > > > > > G >> > > > > > > >> > > > > > > >> > > > > > > On Fri, Jan 13, 2023 at 9:22 AM John Roesler < >> > vvcep...@apache.org> >> > > > > > wrote: >> > > > > > > >> > > > > > > > Thanks for this proposal, Qingsheng! >> > > > > > > > >> > > > > > > > If you want to be a little conservative with the default, 5 >> > > minutes >> > > > > > might >> > > > > > > > be better than 30 seconds. >> > > > > > > > >> > > > > > > > The equivalent config in Kafka seems to be >> metadata.max.age.ms >> > ( >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms >> > > > > > > ), >> > > > > > > > which has a default value of 5 minutes. >> > > > > > > > >> > > > > > > > Other than that, I’m in favor. I agree, this should be on by >> > > > default. >> > > > > > > > >> > > > > > > > Thanks again, >> > > > > > > > John >> > > > > > > > >> > > > > > > > On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote: >> > > > > > > > > Thanks Qingsheng for driving this, enable the dynamic >> > partition >> > > > > > > > > discovery would be very useful for kafka topic scale >> > partitions >> > > > > > > > > scenarios. >> > > > > > > > > >> > > > > > > > > +1 for the change. >> > > > > > > > > >> > > > > > > > > CC: Becket >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > Best, >> > > > > > > > > Leonard >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> On Jan 13, 2023, at 3:15 PM, Jark Wu <imj...@gmail.com> >> > > wrote: >> > > > > > > > >> >> > > > > > > > >> +1 for the change. I think this is beneficial for users >> and >> > is >> > > > > > > > compatible. >> > > > > > > > >> >> > > > > > > > >> Best, >> > > > > > > > >> Jark >> > > > > > > > >> >> > > > > > > > >> On Fri, 13 Jan 2023 at 14:22, 何军 <xuehaijux...@gmail.com >> > >> > > > wrote: >> > > > > > > > >> >> > > > > > > > >>>> >> > > > > > > > >>>> +1 for this idea, we have enabled kafka dynamic >> partition >> > > > > > discovery >> > > > > > > in >> > > > > > > > >>> all >> > > > > > > > >>>> jobs. >> > > > > > > > >>>> >> > > > > > > > >>>> >> > > > > > > > >>> >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > >> > > > -- >> > > > >> > > > Best, >> > > > Benchao Li >> > > > >> > > >> > >> >