Hi All,

Based on the discussion thread of FLIP-409, I did a synchronized update to
this one. In simple terms, added `TwoInputBroadcastStreamProcessFunction`
related content.


Best regards,

Weijie


weijie guo <guoweijieres...@gmail.com> 于2024年1月31日周三 15:00写道:

> Hi Xintong,
>
> Thanks for the quick reply.
>
> > Why introduce a new `MetricManager` rather than just return `MetricGroup`
> from `RuntimeContext`?
>
> This is to facilitate possible future extensions. But I thought it
> through, MetricGroup itself also plays the role of a manager.
> So I think you are right, I will add a `getMetricGroup` method directly in
> `RuntimeContext`.
>
> Best regards,
>
> Weijie
>
>
> Xintong Song <tonysong...@gmail.com> 于2024年1月31日周三 14:02写道:
>
>> >
>> > > How can users define custom metrics within the `ProcessFunction`?
>> > Will there be a method like `getMetricGroup` available in the
>> > `RuntimeContext`?
>> >
>> > I think this is a reasonable request. For extensibility, I have added
>> the
>> > getMetricManager instead of getMetricGroup to RuntimeContext, we can
>> use it
>> > to get the MetricGroup.
>> >
>>
>> Why introduce a new `MetricManager` rather than just return `MetricGroup`
>> from `RuntimeContext`?
>>
>> > Q2. The FLIP describes the interface for handling processing
>> >  timers (ProcessingTimeManager), but it does not mention
>> > how to delete or update an existing timer. V1 API provides TimeService
>> > that could delete a timer. Does this mean that
>> >  once a timer is registered, it cannot be changed?
>> >
>> > I think we do need to introduce a method to delete the timer, but I'm
>> kind
>> > of curious why we need to update the timer instead of registering a new
>> > one. Anyway, I have updated the FLIP to support delete the timer.
>> >
>>
>> Registering a new timer does not mean the old timer should be removed.
>> There could be multiple timers.
>>
>> If we don't support deleting timers, developers can still decide to do
>> nothing upon the timer is triggered. In that case, they will need
>> additional logic to decide whether the timer should be skipped or not in
>> `onProcessingTimer`. Besides, there could also be additional performance
>> overhead in frequent calling and skipping the callback.
>>
>> Best,
>>
>> Xintong
>>
>>
>>
>> On Tue, Jan 30, 2024 at 3:26 PM weijie guo <guoweijieres...@gmail.com>
>> wrote:
>>
>> > Hi Wencong,
>> >
>> > > Q1. In the "Configuration" section, it is mentioned that
>> > configurations can be set continuously using the withXXX methods.
>> > Are these configuration options the same as those provided by DataStream
>> > V1,
>> > or might there be different options compared to V1?
>> >
>> > I haven't considered options that don't exist in V1 yet, but we may have
>> > some new options as we continue to develop.
>> >
>> > > Q2. The FLIP describes the interface for handling processing
>> >  timers (ProcessingTimeManager), but it does not mention
>> > how to delete or update an existing timer. V1 API provides TimeService
>> > that could delete a timer. Does this mean that
>> >  once a timer is registered, it cannot be changed?
>> >
>> > I think we do need to introduce a method to delete the timer, but I'm
>> kind
>> > of curious why we need to update the timer instead of registering a new
>> > one. Anyway, I have updated the FLIP to support delete the timer.
>> >
>> >
>> >
>> > Best regards,
>> >
>> > Weijie
>> >
>> >
>> > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 14:35写道:
>> >
>> > > Hi Xuannan,
>> > >
>> > > > 1. +1 to only use XXXParititionStream if users only need to use the
>> > > configurable PartitionStream.  If there are use cases for both,
>> > > perhaps we could use `ProcessConfigurableNonKeyedPartitionStream` or
>> > > `ConfigurableNonKeyedPartitionStream` for simplicity.
>> > >
>> > > As for why we need both, you can refer to my reply to Yunfeng's first
>> > > question. As for the name, I can accept
>> > > ProcessConfigurableNonKeyedPartitionStream or keep the status quo.
>> But I
>> > > don't want to change it to ConfigurableNonKeyedPartitionStream, the
>> > reason
>> > > is the same, because the configuration is applied to the Process
>> rather
>> > > than the Stream.
>> > >
>> > > > Should we allow users to set custom configurations through the
>> > > `ProcessConfigurable` interface and access these configurations in the
>> > > `ProcessFunction` via `RuntimeContext`? I believe it would be useful
>> > > for process function developers to be able to define custom
>> > > configurations.
>> > >
>> > > If I understand you correctly, you want to set custom properties for
>> > > processing. The current configurations are mostly for the runtime
>> engine,
>> > > such as determining the underlying operator 's parallelism and SSG.
>> But
>> > I'm
>> > > not aware of the need to pass in a custom value(independent of the
>> > > framework itself) and then get it at runtime from RuntimeContext.
>> Could
>> > > you give some examples?
>> > >
>> > > > How can users define custom metrics within the `ProcessFunction`?
>> > > Will there be a method like `getMetricGroup` available in the
>> > > `RuntimeContext`?
>> > >
>> > > I think this is a reasonable request. For extensibility, I have added
>> the
>> > > getMetricManager instead of getMetricGroup to RuntimeContext, we can
>> use
>> > > it to get the MetricGroup.
>> > >
>> > >
>> > > Best regards,
>> > >
>> > > Weijie
>> > >
>> > >
>> > > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 13:45写道:
>> > >
>> > >> Thanks Yunfeng,
>> > >>
>> > >> Let me try to answer your question :)
>> > >>
>> > >> > 1. Would it be better to have all XXXPartitionStream classes
>> implement
>> > >> ProcessConfigurable, instead of defining both XXXPartitionStream and
>> > >> ProcessConfigurableAndXXXPartitionStream? I wonder whether users
>> would
>> > >> need to operate on a non-configurable PartitionStream.
>> > >>
>> > >> I thought about this for a while and decided to separate DataStream
>> from
>> > >> ProcessConfigurable. At the core of this is that streams and c
>> > >> onfigurations are completely orthogonal concepts, and configuration
>> is
>> > >> only responsible for the `Process`, not the `Stream`. This is why
>> only
>> > >> the `process/connectAndProcess` returns configurable stream, but
>> > >> partitioning like `KeyBy` returns a pure DataStream. This may also
>> > answer
>> > >> your second question in passing.
>> > >>
>> > >>
>> > >> > Apart from the detailed withConfigFoo(foo)/withConfigBar(bar)
>> > >> methods, would it be better to also add a general
>> > >> withConfig(configKey, configValue) method to the ProcessConfigurable
>> > >> interface? Adding a method for each configuration might harm the
>> > >> readability and compatibility of configurations.
>> > >>
>> > >> Sorry, I may not fully understand this question. ProcessConfigurable
>> > >> simply refers to the configuration of the Process, which can have the
>> > name,
>> > >> parallelism, etc of the process. It's not actually the
>> > Configuratiion(Contains
>> > >> a lot of ConfigOptions) that we usually talk about, but more like
>> > >> `SingleOutputStreamOperator` in DataStream V1.
>> > >>
>> > >> Best regards,
>> > >>
>> > >> Weijie
>> > >>
>> > >>
>> > >> Xuannan Su <suxuanna...@gmail.com> 于2024年1月29日周一 18:45写道:
>> > >>
>> > >>> Hi Weijie,
>> > >>>
>> > >>> Thanks for the FLIP! I have a few questions regarding the FLIP.
>> > >>>
>> > >>> 1. +1 to only use XXXParititionStream if users only need to use the
>> > >>> configurable PartitionStream.  If there are use cases for both,
>> > >>> perhaps we could use `ProcessConfigurableNonKeyedPartitionStream` or
>> > >>> `ConfigurableNonKeyedPartitionStream` for simplicity.
>> > >>>
>> > >>> 2. Should we allow users to set custom configurations through the
>> > >>> `ProcessConfigurable` interface and access these configurations in
>> the
>> > >>> `ProcessFunction` via `RuntimeContext`? I believe it would be useful
>> > >>> for process function developers to be able to define custom
>> > >>> configurations.
>> > >>>
>> > >>> 3. How can users define custom metrics within the `ProcessFunction`?
>> > >>> Will there be a method like `getMetricGroup` available in the
>> > >>> `RuntimeContext`?
>> > >>>
>> > >>> Best,
>> > >>> Xuannan
>> > >>>
>> > >>>
>> > >>> On Fri, Jan 26, 2024 at 2:38 PM Yunfeng Zhou
>> > >>> <flink.zhouyunf...@gmail.com> wrote:
>> > >>> >
>> > >>> > Hi Weijie,
>> > >>> >
>> > >>> > Thanks for introducing this FLIP! I have a few questions about the
>> > >>> > designs proposed.
>> > >>> >
>> > >>> > 1. Would it be better to have all XXXPartitionStream classes
>> > implement
>> > >>> > ProcessConfigurable, instead of defining both XXXPartitionStream
>> and
>> > >>> > ProcessConfigurableAndXXXPartitionStream? I wonder whether users
>> > would
>> > >>> > need to operate on a non-configurable PartitionStream.
>> > >>> >
>> > >>> > 2. The name "ProcessConfigurable" seems a little ambiguous to me.
>> > Will
>> > >>> > there be classes other than XXXPartitionStream that implement this
>> > >>> > interface? Will "Process" be accurate enough to describe
>> > >>> > PartitionStream and those classes?
>> > >>> >
>> > >>> > 3. Apart from the detailed withConfigFoo(foo)/withConfigBar(bar)
>> > >>> > methods, would it be better to also add a general
>> > >>> > withConfig(configKey, configValue) method to the
>> ProcessConfigurable
>> > >>> > interface? Adding a method for each configuration might harm the
>> > >>> > readability and compatibility of configurations.
>> > >>> >
>> > >>> > Looking forward to your response.
>> > >>> >
>> > >>> > Best regards,
>> > >>> > Yunfeng Zhou
>> > >>> >
>> > >>> > On Tue, Dec 26, 2023 at 2:47 PM weijie guo <
>> > guoweijieres...@gmail.com>
>> > >>> wrote:
>> > >>> > >
>> > >>> > > Hi devs,
>> > >>> > >
>> > >>> > >
>> > >>> > > I'd like to start a discussion about FLIP-410: Config, Context
>> and
>> > >>> > > Processing Timer Service of DataStream API V2 [1]. This is the
>> > second
>> > >>> > > sub-FLIP of DataStream API V2.
>> > >>> > >
>> > >>> > >
>> > >>> > > In FLIP-409 [2], we have defined the most basic primitive of
>> > >>> > > DataStream V2. On this basis, this FLIP will further answer
>> several
>> > >>> > > important questions closely related to it:
>> > >>> > >
>> > >>> > >    1.
>> > >>> > >    How to configure the processing over the datastreams, such as
>> > >>> > > setting the parallelism.
>> > >>> > >    2.
>> > >>> > >    How to get access to the runtime contextual information and
>> > >>> > > services from inside the process functions.
>> > >>> > >    3. How to work with processing-time timers.
>> > >>> > >
>> > >>> > > You can find more details in this FLIP. Its relationship with
>> other
>> > >>> > > sub-FLIPs can be found in the umbrella FLIP
>> > >>> > > [3].
>> > >>> > >
>> > >>> > >
>> > >>> > > Looking forward to hearing from you, thanks!
>> > >>> > >
>> > >>> > >
>> > >>> > > Best regards,
>> > >>> > >
>> > >>> > > Weijie
>> > >>> > >
>> > >>> > >
>> > >>> > > [1]
>> > >>> > >
>> > >>>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2
>> > >>> > >
>> > >>> > > [2]
>> > >>> > >
>> > >>>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction
>> > >>> > >
>> > >>> > > [3]
>> > >>> > >
>> > >>>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2
>> > >>>
>> > >>
>> >
>>
>

Reply via email to