Oh and Mason, definitely interesting! :)

On Fri, Feb 10, 2023 at 9:51 PM Martijn Visser <martijnvis...@apache.org>
wrote:

> @Qingsheng what are your next steps for this proposal?
>
> On Thu, Jan 19, 2023 at 9:14 AM Mason Chen <mas.chen6...@gmail.com> wrote:
>
>> Hi all,
>>
>> Sorry to come into the discussion late--I saw the thread earlier.
>>
>> I'm also +1 for the change in general. I think most users have this turned
>> on by default since the overhead is quite low. A default in the two digit
>> seconds range works well for us. However, I do have two main concerns that
>> are related, but don't necessarily block this FLIP:
>>
>> 1. Timestamp Offset Initializer
>>
>> Currently, the timestamp offset initializer defaults the offset reset
>> strategy to LATEST. This can present some problems if the discovery
>> interval is set too large since records from new partitions could be
>> skipped (the set timestamp is not found in Kafka, thus resetting to the
>> latest). Here is a ticket to allow customizations:
>> https://issues.apache.org/jira/browse/FLINK-30200 (Qingsheng, you might
>> remember this from a PR review). Thanks for mentioning this in your FLIP!
>>
>> 2. AdminClient Fault Tolerance
>>
>> AdminClient, which is used for partition discovery, seems not to handle
>> Kafka timeouts as robustly as the KafkaConsumer API, and we have noticed
>> that transient network hiccups cause full job restarts (since the
>> jobmanager fails) in numerous incidents. Internally, we have introduced an
>> error handling strategy based on the number of consecutive partition
>> discovery failures. I'm interested in opening a JIRA ticket to contribute
>> this feature back to Flink and open making the error handling more
>> pluggable. What do you think?
>>
>> Best,
>> Mason
>>
>> On Sun, Jan 15, 2023 at 11:39 PM Qingsheng Ren <re...@apache.org> wrote:
>>
>> > Thanks for the input Becket!
>> >
>> > I reorganized this proposal into FLIP-288 [1].
>> >
>> > [1]
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source
>> >
>> > Best,
>> > Qingsheng
>> >
>> > On Sun, Jan 15, 2023 at 9:18 AM Becket Qin <becket....@gmail.com>
>> wrote:
>> >
>> > > Thanks for the proposal, Qingsheng.
>> > >
>> > > +1 to enable auto partition discovery by default. Just a reminder, we
>> > need
>> > > a FLIP for this.
>> > >
>> > > A bit more background on this.
>> > >
>> > > Most of the Kafka users simply subscribe to a topic and let the
>> consumer
>> > to
>> > > automatically adapt to partition changes. So enabling auto partition
>> > > discovery would align with that experience. The counter argument last
>> > time
>> > > when I proposed to enable auto partition discovery was mainly due to
>> the
>> > > concern from the Flink users. There were arguments that sometimes
>> users
>> > > don't want the partition changes to get automatically picked up, but
>> want
>> > > to do this by restarting the job manually so they can avoid unnoticed
>> > > changes in the jobs.
>> > >
>> > > Given that in the old Flink source, by default the auto partition
>> > discovery
>> > > was disabled, and there are use cases from both sides, we simply kept
>> the
>> > > behavior unchanged. From the discussion we have here, it looks like
>> > > enabling auto partition discovery is much preferred. So I think we
>> should
>> > > do it.
>> > >
>> > > I am not worried about the performance. The new Kafka source will only
>> > have
>> > > the SplitEnumerator sending metadata requests when the feature is
>> > enabled.
>> > > It is actually much cheaper than the old Kafka source where every
>> > > subtask does that.
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > >
>> > >
>> > > On Sat, Jan 14, 2023 at 11:46 AM Yun Tang <myas...@live.com> wrote:
>> > >
>> > > > +1 for this proposal and thanks Qingsheng for driving this.
>> > > >
>> > > > Considering the interval, we also set the value as 5min, equivalent
>> to
>> > > the
>> > > > default value of metadata.max.age.ms.
>> > > >
>> > > >
>> > > > Best
>> > > > Yun Tang
>> > > > ________________________________
>> > > > From: Benchao Li <libenc...@apache.org>
>> > > > Sent: Friday, January 13, 2023 23:06
>> > > > To: dev@flink.apache.org <dev@flink.apache.org>
>> > > > Subject: Re: [DISCUSS] Enabling dynamic partition discovery by
>> default
>> > in
>> > > > Kafka source
>> > > >
>> > > > +1, we've enabled this by default (10mins) in our production for
>> years.
>> > > >
>> > > > Jing Ge <j...@ververica.com.invalid> 于2023年1月13日周五 22:22写道:
>> > > >
>> > > > > +1 for the proposal that makes users' daily work easier and
>> therefore
>> > > > makes
>> > > > > Flink more attractive.
>> > > > >
>> > > > > Best regards,
>> > > > > Jing
>> > > > >
>> > > > >
>> > > > > On Fri, Jan 13, 2023 at 11:27 AM Qingsheng Ren <re...@apache.org>
>> > > wrote:
>> > > > >
>> > > > > > Thanks everyone for joining the discussion!
>> > > > > >
>> > > > > > @Martijn:
>> > > > > >
>> > > > > > > All newly discovered partitions will be consumed from the
>> > earliest
>> > > > > offset
>> > > > > > possible.
>> > > > > >
>> > > > > > Thanks for the reminder! I checked the logic of KafkaSource and
>> > found
>> > > > > that
>> > > > > > new partitions will start from the offset initializer specified
>> by
>> > > the
>> > > > > user
>> > > > > > instead of the earliest. We need to correct this behavior to
>> avoid
>> > > > > dropping
>> > > > > > messages from new partitions.
>> > > > > >
>> > > > > > > Job restarts from checkpoint
>> > > > > >
>> > > > > > I think the current logic guarantees the exactly-once semantic.
>> New
>> > > > > > partitions created after the checkpoint will be re-discovered
>> again
>> > > and
>> > > > > > picked up by the source.
>> > > > > >
>> > > > > > @John:
>> > > > > >
>> > > > > > > If you want to be a little conservative with the default, 5
>> > minutes
>> > > > > might
>> > > > > > be better than 30 seconds.
>> > > > > >
>> > > > > > Thanks for the suggestion! I tried to find the equivalent
>> config in
>> > > > Kafka
>> > > > > > but missed it. It would be neat to align with the default value
>> of
>> > "
>> > > > > > metadata.max.age.ms".
>> > > > > >
>> > > > > > @Gabor:
>> > > > > >
>> > > > > > > removed partition handling is not yet added
>> > > > > >
>> > > > > > There was a detailed discussion about removing partitions [1]
>> but
>> > it
>> > > > > looks
>> > > > > > like this is not an easy task considering the potential data
>> loss
>> > and
>> > > > > state
>> > > > > > inconsistency. I'm afraid there's no clear plan on this one and
>> > maybe
>> > > > we
>> > > > > > could trigger a new discussion thread about how to correctly
>> handle
>> > > > > removed
>> > > > > > partitions.
>> > > > > >
>> > > > > > [1]
>> > https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt
>> > > > > >
>> > > > > > Best regards,
>> > > > > > Qingsheng
>> > > > > >
>> > > > > >
>> > > > > > On Fri, Jan 13, 2023 at 4:33 PM Gabor Somogyi <
>> > > > gabor.g.somo...@gmail.com
>> > > > > >
>> > > > > > wrote:
>> > > > > >
>> > > > > > > +1 on the overall direction, it's an important feature.
>> > > > > > >
>> > > > > > > I've had a look on the latest master and looks like removed
>> > > partition
>> > > > > > > handling is not yet added but I think this is essential.
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/28c3e1a3923ba560b559a216985c1abeb794ebaa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305
>> > > > > > >
>> > > > > > > If a partition all of a sudden disappears then it could lead
>> to
>> > > data
>> > > > > > loss.
>> > > > > > > Are you planning to add it?
>> > > > > > > If yes then when?
>> > > > > > >
>> > > > > > > G
>> > > > > > >
>> > > > > > >
>> > > > > > > On Fri, Jan 13, 2023 at 9:22 AM John Roesler <
>> > vvcep...@apache.org>
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > Thanks for this proposal, Qingsheng!
>> > > > > > > >
>> > > > > > > > If you want to be a little conservative with the default, 5
>> > > minutes
>> > > > > > might
>> > > > > > > > be better than 30 seconds.
>> > > > > > > >
>> > > > > > > > The equivalent config in Kafka seems to be
>> metadata.max.age.ms
>> > (
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms
>> > > > > > > ),
>> > > > > > > > which has a default value of 5 minutes.
>> > > > > > > >
>> > > > > > > > Other than that, I’m in favor. I agree, this should be on by
>> > > > default.
>> > > > > > > >
>> > > > > > > > Thanks again,
>> > > > > > > > John
>> > > > > > > >
>> > > > > > > > On Fri, Jan 13, 2023, at 08:26, Leonard Xu wrote:
>> > > > > > > > > Thanks Qingsheng for driving this, enable the dynamic
>> > partition
>> > > > > > > > > discovery would be very useful for kafka topic scale
>> > partitions
>> > > > > > > > > scenarios.
>> > > > > > > > >
>> > > > > > > > > +1 for the change.
>> > > > > > > > >
>> > > > > > > > > CC: Becket
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > Best,
>> > > > > > > > > Leonard
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >> On Jan 13, 2023, at 3:15 PM, Jark Wu <imj...@gmail.com>
>> > > wrote:
>> > > > > > > > >>
>> > > > > > > > >> +1 for the change. I think this is beneficial for users
>> and
>> > is
>> > > > > > > > compatible.
>> > > > > > > > >>
>> > > > > > > > >> Best,
>> > > > > > > > >> Jark
>> > > > > > > > >>
>> > > > > > > > >> On Fri, 13 Jan 2023 at 14:22, 何军 <xuehaijux...@gmail.com
>> >
>> > > > wrote:
>> > > > > > > > >>
>> > > > > > > > >>>>
>> > > > > > > > >>>> +1 for this idea, we have enabled kafka dynamic
>> partition
>> > > > > > discovery
>> > > > > > > in
>> > > > > > > > >>> all
>> > > > > > > > >>>> jobs.
>> > > > > > > > >>>>
>> > > > > > > > >>>>
>> > > > > > > > >>>
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > > >
>> > > > --
>> > > >
>> > > > Best,
>> > > > Benchao Li
>> > > >
>> > >
>> >
>>
>

Reply via email to