LGTM Best,
Xintong On Mon, Feb 19, 2024 at 10:48 AM weijie guo <guoweijieres...@gmail.com> wrote: > Hi All, > > Based on the discussion thread of FLIP-409, I did a synchronized update to > this one. In simple terms, added `TwoInputBroadcastStreamProcessFunction` > related content. > > > Best regards, > > Weijie > > > weijie guo <guoweijieres...@gmail.com> 于2024年1月31日周三 15:00写道: > > > Hi Xintong, > > > > Thanks for the quick reply. > > > > > Why introduce a new `MetricManager` rather than just return > `MetricGroup` > > from `RuntimeContext`? > > > > This is to facilitate possible future extensions. But I thought it > > through, MetricGroup itself also plays the role of a manager. > > So I think you are right, I will add a `getMetricGroup` method directly > in > > `RuntimeContext`. > > > > Best regards, > > > > Weijie > > > > > > Xintong Song <tonysong...@gmail.com> 于2024年1月31日周三 14:02写道: > > > >> > > >> > > How can users define custom metrics within the `ProcessFunction`? > >> > Will there be a method like `getMetricGroup` available in the > >> > `RuntimeContext`? > >> > > >> > I think this is a reasonable request. For extensibility, I have added > >> the > >> > getMetricManager instead of getMetricGroup to RuntimeContext, we can > >> use it > >> > to get the MetricGroup. > >> > > >> > >> Why introduce a new `MetricManager` rather than just return > `MetricGroup` > >> from `RuntimeContext`? > >> > >> > Q2. The FLIP describes the interface for handling processing > >> > timers (ProcessingTimeManager), but it does not mention > >> > how to delete or update an existing timer. V1 API provides TimeService > >> > that could delete a timer. Does this mean that > >> > once a timer is registered, it cannot be changed? > >> > > >> > I think we do need to introduce a method to delete the timer, but I'm > >> kind > >> > of curious why we need to update the timer instead of registering a > new > >> > one. Anyway, I have updated the FLIP to support delete the timer. > >> > > >> > >> Registering a new timer does not mean the old timer should be removed. > >> There could be multiple timers. > >> > >> If we don't support deleting timers, developers can still decide to do > >> nothing upon the timer is triggered. In that case, they will need > >> additional logic to decide whether the timer should be skipped or not in > >> `onProcessingTimer`. Besides, there could also be additional performance > >> overhead in frequent calling and skipping the callback. > >> > >> Best, > >> > >> Xintong > >> > >> > >> > >> On Tue, Jan 30, 2024 at 3:26 PM weijie guo <guoweijieres...@gmail.com> > >> wrote: > >> > >> > Hi Wencong, > >> > > >> > > Q1. In the "Configuration" section, it is mentioned that > >> > configurations can be set continuously using the withXXX methods. > >> > Are these configuration options the same as those provided by > DataStream > >> > V1, > >> > or might there be different options compared to V1? > >> > > >> > I haven't considered options that don't exist in V1 yet, but we may > have > >> > some new options as we continue to develop. > >> > > >> > > Q2. The FLIP describes the interface for handling processing > >> > timers (ProcessingTimeManager), but it does not mention > >> > how to delete or update an existing timer. V1 API provides TimeService > >> > that could delete a timer. Does this mean that > >> > once a timer is registered, it cannot be changed? > >> > > >> > I think we do need to introduce a method to delete the timer, but I'm > >> kind > >> > of curious why we need to update the timer instead of registering a > new > >> > one. Anyway, I have updated the FLIP to support delete the timer. > >> > > >> > > >> > > >> > Best regards, > >> > > >> > Weijie > >> > > >> > > >> > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 14:35写道: > >> > > >> > > Hi Xuannan, > >> > > > >> > > > 1. +1 to only use XXXParititionStream if users only need to use > the > >> > > configurable PartitionStream. If there are use cases for both, > >> > > perhaps we could use `ProcessConfigurableNonKeyedPartitionStream` or > >> > > `ConfigurableNonKeyedPartitionStream` for simplicity. > >> > > > >> > > As for why we need both, you can refer to my reply to Yunfeng's > first > >> > > question. As for the name, I can accept > >> > > ProcessConfigurableNonKeyedPartitionStream or keep the status quo. > >> But I > >> > > don't want to change it to ConfigurableNonKeyedPartitionStream, the > >> > reason > >> > > is the same, because the configuration is applied to the Process > >> rather > >> > > than the Stream. > >> > > > >> > > > Should we allow users to set custom configurations through the > >> > > `ProcessConfigurable` interface and access these configurations in > the > >> > > `ProcessFunction` via `RuntimeContext`? I believe it would be useful > >> > > for process function developers to be able to define custom > >> > > configurations. > >> > > > >> > > If I understand you correctly, you want to set custom properties for > >> > > processing. The current configurations are mostly for the runtime > >> engine, > >> > > such as determining the underlying operator 's parallelism and SSG. > >> But > >> > I'm > >> > > not aware of the need to pass in a custom value(independent of the > >> > > framework itself) and then get it at runtime from RuntimeContext. > >> Could > >> > > you give some examples? > >> > > > >> > > > How can users define custom metrics within the `ProcessFunction`? > >> > > Will there be a method like `getMetricGroup` available in the > >> > > `RuntimeContext`? > >> > > > >> > > I think this is a reasonable request. For extensibility, I have > added > >> the > >> > > getMetricManager instead of getMetricGroup to RuntimeContext, we can > >> use > >> > > it to get the MetricGroup. > >> > > > >> > > > >> > > Best regards, > >> > > > >> > > Weijie > >> > > > >> > > > >> > > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 13:45写道: > >> > > > >> > >> Thanks Yunfeng, > >> > >> > >> > >> Let me try to answer your question :) > >> > >> > >> > >> > 1. Would it be better to have all XXXPartitionStream classes > >> implement > >> > >> ProcessConfigurable, instead of defining both XXXPartitionStream > and > >> > >> ProcessConfigurableAndXXXPartitionStream? I wonder whether users > >> would > >> > >> need to operate on a non-configurable PartitionStream. > >> > >> > >> > >> I thought about this for a while and decided to separate DataStream > >> from > >> > >> ProcessConfigurable. At the core of this is that streams and c > >> > >> onfigurations are completely orthogonal concepts, and configuration > >> is > >> > >> only responsible for the `Process`, not the `Stream`. This is why > >> only > >> > >> the `process/connectAndProcess` returns configurable stream, but > >> > >> partitioning like `KeyBy` returns a pure DataStream. This may also > >> > answer > >> > >> your second question in passing. > >> > >> > >> > >> > >> > >> > Apart from the detailed withConfigFoo(foo)/withConfigBar(bar) > >> > >> methods, would it be better to also add a general > >> > >> withConfig(configKey, configValue) method to the > ProcessConfigurable > >> > >> interface? Adding a method for each configuration might harm the > >> > >> readability and compatibility of configurations. > >> > >> > >> > >> Sorry, I may not fully understand this question. > ProcessConfigurable > >> > >> simply refers to the configuration of the Process, which can have > the > >> > name, > >> > >> parallelism, etc of the process. It's not actually the > >> > Configuratiion(Contains > >> > >> a lot of ConfigOptions) that we usually talk about, but more like > >> > >> `SingleOutputStreamOperator` in DataStream V1. > >> > >> > >> > >> Best regards, > >> > >> > >> > >> Weijie > >> > >> > >> > >> > >> > >> Xuannan Su <suxuanna...@gmail.com> 于2024年1月29日周一 18:45写道: > >> > >> > >> > >>> Hi Weijie, > >> > >>> > >> > >>> Thanks for the FLIP! I have a few questions regarding the FLIP. > >> > >>> > >> > >>> 1. +1 to only use XXXParititionStream if users only need to use > the > >> > >>> configurable PartitionStream. If there are use cases for both, > >> > >>> perhaps we could use `ProcessConfigurableNonKeyedPartitionStream` > or > >> > >>> `ConfigurableNonKeyedPartitionStream` for simplicity. > >> > >>> > >> > >>> 2. Should we allow users to set custom configurations through the > >> > >>> `ProcessConfigurable` interface and access these configurations in > >> the > >> > >>> `ProcessFunction` via `RuntimeContext`? I believe it would be > useful > >> > >>> for process function developers to be able to define custom > >> > >>> configurations. > >> > >>> > >> > >>> 3. How can users define custom metrics within the > `ProcessFunction`? > >> > >>> Will there be a method like `getMetricGroup` available in the > >> > >>> `RuntimeContext`? > >> > >>> > >> > >>> Best, > >> > >>> Xuannan > >> > >>> > >> > >>> > >> > >>> On Fri, Jan 26, 2024 at 2:38 PM Yunfeng Zhou > >> > >>> <flink.zhouyunf...@gmail.com> wrote: > >> > >>> > > >> > >>> > Hi Weijie, > >> > >>> > > >> > >>> > Thanks for introducing this FLIP! I have a few questions about > the > >> > >>> > designs proposed. > >> > >>> > > >> > >>> > 1. Would it be better to have all XXXPartitionStream classes > >> > implement > >> > >>> > ProcessConfigurable, instead of defining both XXXPartitionStream > >> and > >> > >>> > ProcessConfigurableAndXXXPartitionStream? I wonder whether users > >> > would > >> > >>> > need to operate on a non-configurable PartitionStream. > >> > >>> > > >> > >>> > 2. The name "ProcessConfigurable" seems a little ambiguous to > me. > >> > Will > >> > >>> > there be classes other than XXXPartitionStream that implement > this > >> > >>> > interface? Will "Process" be accurate enough to describe > >> > >>> > PartitionStream and those classes? > >> > >>> > > >> > >>> > 3. Apart from the detailed withConfigFoo(foo)/withConfigBar(bar) > >> > >>> > methods, would it be better to also add a general > >> > >>> > withConfig(configKey, configValue) method to the > >> ProcessConfigurable > >> > >>> > interface? Adding a method for each configuration might harm the > >> > >>> > readability and compatibility of configurations. > >> > >>> > > >> > >>> > Looking forward to your response. > >> > >>> > > >> > >>> > Best regards, > >> > >>> > Yunfeng Zhou > >> > >>> > > >> > >>> > On Tue, Dec 26, 2023 at 2:47 PM weijie guo < > >> > guoweijieres...@gmail.com> > >> > >>> wrote: > >> > >>> > > > >> > >>> > > Hi devs, > >> > >>> > > > >> > >>> > > > >> > >>> > > I'd like to start a discussion about FLIP-410: Config, Context > >> and > >> > >>> > > Processing Timer Service of DataStream API V2 [1]. This is the > >> > second > >> > >>> > > sub-FLIP of DataStream API V2. > >> > >>> > > > >> > >>> > > > >> > >>> > > In FLIP-409 [2], we have defined the most basic primitive of > >> > >>> > > DataStream V2. On this basis, this FLIP will further answer > >> several > >> > >>> > > important questions closely related to it: > >> > >>> > > > >> > >>> > > 1. > >> > >>> > > How to configure the processing over the datastreams, such > as > >> > >>> > > setting the parallelism. > >> > >>> > > 2. > >> > >>> > > How to get access to the runtime contextual information and > >> > >>> > > services from inside the process functions. > >> > >>> > > 3. How to work with processing-time timers. > >> > >>> > > > >> > >>> > > You can find more details in this FLIP. Its relationship with > >> other > >> > >>> > > sub-FLIPs can be found in the umbrella FLIP > >> > >>> > > [3]. > >> > >>> > > > >> > >>> > > > >> > >>> > > Looking forward to hearing from you, thanks! > >> > >>> > > > >> > >>> > > > >> > >>> > > Best regards, > >> > >>> > > > >> > >>> > > Weijie > >> > >>> > > > >> > >>> > > > >> > >>> > > [1] > >> > >>> > > > >> > >>> > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2 > >> > >>> > > > >> > >>> > > [2] > >> > >>> > > > >> > >>> > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction > >> > >>> > > > >> > >>> > > [3] > >> > >>> > > > >> > >>> > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2 > >> > >>> > >> > >> > >> > > >> > > >