Hi All,

Thanks for all the feedback.

If there are no more comments, I would like to start the vote thread,
thanks again!

Best regards,

Weijie


Xintong Song <tonysong...@gmail.com> 于2024年2月20日周二 14:17写道:

> LGTM
>
> Best,
>
> Xintong
>
>
>
> On Mon, Feb 19, 2024 at 10:48 AM weijie guo <guoweijieres...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > Based on the discussion thread of FLIP-409, I did a synchronized update
> to
> > this one. In simple terms, added `TwoInputBroadcastStreamProcessFunction`
> > related content.
> >
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > weijie guo <guoweijieres...@gmail.com> 于2024年1月31日周三 15:00写道:
> >
> > > Hi Xintong,
> > >
> > > Thanks for the quick reply.
> > >
> > > > Why introduce a new `MetricManager` rather than just return
> > `MetricGroup`
> > > from `RuntimeContext`?
> > >
> > > This is to facilitate possible future extensions. But I thought it
> > > through, MetricGroup itself also plays the role of a manager.
> > > So I think you are right, I will add a `getMetricGroup` method directly
> > in
> > > `RuntimeContext`.
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Xintong Song <tonysong...@gmail.com> 于2024年1月31日周三 14:02写道:
> > >
> > >> >
> > >> > > How can users define custom metrics within the `ProcessFunction`?
> > >> > Will there be a method like `getMetricGroup` available in the
> > >> > `RuntimeContext`?
> > >> >
> > >> > I think this is a reasonable request. For extensibility, I have
> added
> > >> the
> > >> > getMetricManager instead of getMetricGroup to RuntimeContext, we can
> > >> use it
> > >> > to get the MetricGroup.
> > >> >
> > >>
> > >> Why introduce a new `MetricManager` rather than just return
> > `MetricGroup`
> > >> from `RuntimeContext`?
> > >>
> > >> > Q2. The FLIP describes the interface for handling processing
> > >> >  timers (ProcessingTimeManager), but it does not mention
> > >> > how to delete or update an existing timer. V1 API provides
> TimeService
> > >> > that could delete a timer. Does this mean that
> > >> >  once a timer is registered, it cannot be changed?
> > >> >
> > >> > I think we do need to introduce a method to delete the timer, but
> I'm
> > >> kind
> > >> > of curious why we need to update the timer instead of registering a
> > new
> > >> > one. Anyway, I have updated the FLIP to support delete the timer.
> > >> >
> > >>
> > >> Registering a new timer does not mean the old timer should be removed.
> > >> There could be multiple timers.
> > >>
> > >> If we don't support deleting timers, developers can still decide to do
> > >> nothing upon the timer is triggered. In that case, they will need
> > >> additional logic to decide whether the timer should be skipped or not
> in
> > >> `onProcessingTimer`. Besides, there could also be additional
> performance
> > >> overhead in frequent calling and skipping the callback.
> > >>
> > >> Best,
> > >>
> > >> Xintong
> > >>
> > >>
> > >>
> > >> On Tue, Jan 30, 2024 at 3:26 PM weijie guo <guoweijieres...@gmail.com
> >
> > >> wrote:
> > >>
> > >> > Hi Wencong,
> > >> >
> > >> > > Q1. In the "Configuration" section, it is mentioned that
> > >> > configurations can be set continuously using the withXXX methods.
> > >> > Are these configuration options the same as those provided by
> > DataStream
> > >> > V1,
> > >> > or might there be different options compared to V1?
> > >> >
> > >> > I haven't considered options that don't exist in V1 yet, but we may
> > have
> > >> > some new options as we continue to develop.
> > >> >
> > >> > > Q2. The FLIP describes the interface for handling processing
> > >> >  timers (ProcessingTimeManager), but it does not mention
> > >> > how to delete or update an existing timer. V1 API provides
> TimeService
> > >> > that could delete a timer. Does this mean that
> > >> >  once a timer is registered, it cannot be changed?
> > >> >
> > >> > I think we do need to introduce a method to delete the timer, but
> I'm
> > >> kind
> > >> > of curious why we need to update the timer instead of registering a
> > new
> > >> > one. Anyway, I have updated the FLIP to support delete the timer.
> > >> >
> > >> >
> > >> >
> > >> > Best regards,
> > >> >
> > >> > Weijie
> > >> >
> > >> >
> > >> > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 14:35写道:
> > >> >
> > >> > > Hi Xuannan,
> > >> > >
> > >> > > > 1. +1 to only use XXXParititionStream if users only need to use
> > the
> > >> > > configurable PartitionStream.  If there are use cases for both,
> > >> > > perhaps we could use `ProcessConfigurableNonKeyedPartitionStream`
> or
> > >> > > `ConfigurableNonKeyedPartitionStream` for simplicity.
> > >> > >
> > >> > > As for why we need both, you can refer to my reply to Yunfeng's
> > first
> > >> > > question. As for the name, I can accept
> > >> > > ProcessConfigurableNonKeyedPartitionStream or keep the status quo.
> > >> But I
> > >> > > don't want to change it to ConfigurableNonKeyedPartitionStream,
> the
> > >> > reason
> > >> > > is the same, because the configuration is applied to the Process
> > >> rather
> > >> > > than the Stream.
> > >> > >
> > >> > > > Should we allow users to set custom configurations through the
> > >> > > `ProcessConfigurable` interface and access these configurations in
> > the
> > >> > > `ProcessFunction` via `RuntimeContext`? I believe it would be
> useful
> > >> > > for process function developers to be able to define custom
> > >> > > configurations.
> > >> > >
> > >> > > If I understand you correctly, you want to set custom properties
> for
> > >> > > processing. The current configurations are mostly for the runtime
> > >> engine,
> > >> > > such as determining the underlying operator 's parallelism and
> SSG.
> > >> But
> > >> > I'm
> > >> > > not aware of the need to pass in a custom value(independent of the
> > >> > > framework itself) and then get it at runtime from RuntimeContext.
> > >> Could
> > >> > > you give some examples?
> > >> > >
> > >> > > > How can users define custom metrics within the
> `ProcessFunction`?
> > >> > > Will there be a method like `getMetricGroup` available in the
> > >> > > `RuntimeContext`?
> > >> > >
> > >> > > I think this is a reasonable request. For extensibility, I have
> > added
> > >> the
> > >> > > getMetricManager instead of getMetricGroup to RuntimeContext, we
> can
> > >> use
> > >> > > it to get the MetricGroup.
> > >> > >
> > >> > >
> > >> > > Best regards,
> > >> > >
> > >> > > Weijie
> > >> > >
> > >> > >
> > >> > > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 13:45写道:
> > >> > >
> > >> > >> Thanks Yunfeng,
> > >> > >>
> > >> > >> Let me try to answer your question :)
> > >> > >>
> > >> > >> > 1. Would it be better to have all XXXPartitionStream classes
> > >> implement
> > >> > >> ProcessConfigurable, instead of defining both XXXPartitionStream
> > and
> > >> > >> ProcessConfigurableAndXXXPartitionStream? I wonder whether users
> > >> would
> > >> > >> need to operate on a non-configurable PartitionStream.
> > >> > >>
> > >> > >> I thought about this for a while and decided to separate
> DataStream
> > >> from
> > >> > >> ProcessConfigurable. At the core of this is that streams and c
> > >> > >> onfigurations are completely orthogonal concepts, and
> configuration
> > >> is
> > >> > >> only responsible for the `Process`, not the `Stream`. This is why
> > >> only
> > >> > >> the `process/connectAndProcess` returns configurable stream, but
> > >> > >> partitioning like `KeyBy` returns a pure DataStream. This may
> also
> > >> > answer
> > >> > >> your second question in passing.
> > >> > >>
> > >> > >>
> > >> > >> > Apart from the detailed withConfigFoo(foo)/withConfigBar(bar)
> > >> > >> methods, would it be better to also add a general
> > >> > >> withConfig(configKey, configValue) method to the
> > ProcessConfigurable
> > >> > >> interface? Adding a method for each configuration might harm the
> > >> > >> readability and compatibility of configurations.
> > >> > >>
> > >> > >> Sorry, I may not fully understand this question.
> > ProcessConfigurable
> > >> > >> simply refers to the configuration of the Process, which can have
> > the
> > >> > name,
> > >> > >> parallelism, etc of the process. It's not actually the
> > >> > Configuratiion(Contains
> > >> > >> a lot of ConfigOptions) that we usually talk about, but more like
> > >> > >> `SingleOutputStreamOperator` in DataStream V1.
> > >> > >>
> > >> > >> Best regards,
> > >> > >>
> > >> > >> Weijie
> > >> > >>
> > >> > >>
> > >> > >> Xuannan Su <suxuanna...@gmail.com> 于2024年1月29日周一 18:45写道:
> > >> > >>
> > >> > >>> Hi Weijie,
> > >> > >>>
> > >> > >>> Thanks for the FLIP! I have a few questions regarding the FLIP.
> > >> > >>>
> > >> > >>> 1. +1 to only use XXXParititionStream if users only need to use
> > the
> > >> > >>> configurable PartitionStream.  If there are use cases for both,
> > >> > >>> perhaps we could use
> `ProcessConfigurableNonKeyedPartitionStream`
> > or
> > >> > >>> `ConfigurableNonKeyedPartitionStream` for simplicity.
> > >> > >>>
> > >> > >>> 2. Should we allow users to set custom configurations through
> the
> > >> > >>> `ProcessConfigurable` interface and access these configurations
> in
> > >> the
> > >> > >>> `ProcessFunction` via `RuntimeContext`? I believe it would be
> > useful
> > >> > >>> for process function developers to be able to define custom
> > >> > >>> configurations.
> > >> > >>>
> > >> > >>> 3. How can users define custom metrics within the
> > `ProcessFunction`?
> > >> > >>> Will there be a method like `getMetricGroup` available in the
> > >> > >>> `RuntimeContext`?
> > >> > >>>
> > >> > >>> Best,
> > >> > >>> Xuannan
> > >> > >>>
> > >> > >>>
> > >> > >>> On Fri, Jan 26, 2024 at 2:38 PM Yunfeng Zhou
> > >> > >>> <flink.zhouyunf...@gmail.com> wrote:
> > >> > >>> >
> > >> > >>> > Hi Weijie,
> > >> > >>> >
> > >> > >>> > Thanks for introducing this FLIP! I have a few questions about
> > the
> > >> > >>> > designs proposed.
> > >> > >>> >
> > >> > >>> > 1. Would it be better to have all XXXPartitionStream classes
> > >> > implement
> > >> > >>> > ProcessConfigurable, instead of defining both
> XXXPartitionStream
> > >> and
> > >> > >>> > ProcessConfigurableAndXXXPartitionStream? I wonder whether
> users
> > >> > would
> > >> > >>> > need to operate on a non-configurable PartitionStream.
> > >> > >>> >
> > >> > >>> > 2. The name "ProcessConfigurable" seems a little ambiguous to
> > me.
> > >> > Will
> > >> > >>> > there be classes other than XXXPartitionStream that implement
> > this
> > >> > >>> > interface? Will "Process" be accurate enough to describe
> > >> > >>> > PartitionStream and those classes?
> > >> > >>> >
> > >> > >>> > 3. Apart from the detailed
> withConfigFoo(foo)/withConfigBar(bar)
> > >> > >>> > methods, would it be better to also add a general
> > >> > >>> > withConfig(configKey, configValue) method to the
> > >> ProcessConfigurable
> > >> > >>> > interface? Adding a method for each configuration might harm
> the
> > >> > >>> > readability and compatibility of configurations.
> > >> > >>> >
> > >> > >>> > Looking forward to your response.
> > >> > >>> >
> > >> > >>> > Best regards,
> > >> > >>> > Yunfeng Zhou
> > >> > >>> >
> > >> > >>> > On Tue, Dec 26, 2023 at 2:47 PM weijie guo <
> > >> > guoweijieres...@gmail.com>
> > >> > >>> wrote:
> > >> > >>> > >
> > >> > >>> > > Hi devs,
> > >> > >>> > >
> > >> > >>> > >
> > >> > >>> > > I'd like to start a discussion about FLIP-410: Config,
> Context
> > >> and
> > >> > >>> > > Processing Timer Service of DataStream API V2 [1]. This is
> the
> > >> > second
> > >> > >>> > > sub-FLIP of DataStream API V2.
> > >> > >>> > >
> > >> > >>> > >
> > >> > >>> > > In FLIP-409 [2], we have defined the most basic primitive of
> > >> > >>> > > DataStream V2. On this basis, this FLIP will further answer
> > >> several
> > >> > >>> > > important questions closely related to it:
> > >> > >>> > >
> > >> > >>> > >    1.
> > >> > >>> > >    How to configure the processing over the datastreams,
> such
> > as
> > >> > >>> > > setting the parallelism.
> > >> > >>> > >    2.
> > >> > >>> > >    How to get access to the runtime contextual information
> and
> > >> > >>> > > services from inside the process functions.
> > >> > >>> > >    3. How to work with processing-time timers.
> > >> > >>> > >
> > >> > >>> > > You can find more details in this FLIP. Its relationship
> with
> > >> other
> > >> > >>> > > sub-FLIPs can be found in the umbrella FLIP
> > >> > >>> > > [3].
> > >> > >>> > >
> > >> > >>> > >
> > >> > >>> > > Looking forward to hearing from you, thanks!
> > >> > >>> > >
> > >> > >>> > >
> > >> > >>> > > Best regards,
> > >> > >>> > >
> > >> > >>> > > Weijie
> > >> > >>> > >
> > >> > >>> > >
> > >> > >>> > > [1]
> > >> > >>> > >
> > >> > >>>
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2
> > >> > >>> > >
> > >> > >>> > > [2]
> > >> > >>> > >
> > >> > >>>
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction
> > >> > >>> > >
> > >> > >>> > > [3]
> > >> > >>> > >
> > >> > >>>
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2
> > >> > >>>
> > >> > >>
> > >> >
> > >>
> > >
> >
>

Reply via email to