Hi Xintong, Thanks for the quick reply.
> Why introduce a new `MetricManager` rather than just return `MetricGroup` from `RuntimeContext`? This is to facilitate possible future extensions. But I thought it through, MetricGroup itself also plays the role of a manager. So I think you are right, I will add a `getMetricGroup` method directly in `RuntimeContext`. Best regards, Weijie Xintong Song <tonysong...@gmail.com> 于2024年1月31日周三 14:02写道: > > > > > How can users define custom metrics within the `ProcessFunction`? > > Will there be a method like `getMetricGroup` available in the > > `RuntimeContext`? > > > > I think this is a reasonable request. For extensibility, I have added the > > getMetricManager instead of getMetricGroup to RuntimeContext, we can use > it > > to get the MetricGroup. > > > > Why introduce a new `MetricManager` rather than just return `MetricGroup` > from `RuntimeContext`? > > > Q2. The FLIP describes the interface for handling processing > > timers (ProcessingTimeManager), but it does not mention > > how to delete or update an existing timer. V1 API provides TimeService > > that could delete a timer. Does this mean that > > once a timer is registered, it cannot be changed? > > > > I think we do need to introduce a method to delete the timer, but I'm > kind > > of curious why we need to update the timer instead of registering a new > > one. Anyway, I have updated the FLIP to support delete the timer. > > > > Registering a new timer does not mean the old timer should be removed. > There could be multiple timers. > > If we don't support deleting timers, developers can still decide to do > nothing upon the timer is triggered. In that case, they will need > additional logic to decide whether the timer should be skipped or not in > `onProcessingTimer`. Besides, there could also be additional performance > overhead in frequent calling and skipping the callback. > > Best, > > Xintong > > > > On Tue, Jan 30, 2024 at 3:26 PM weijie guo <guoweijieres...@gmail.com> > wrote: > > > Hi Wencong, > > > > > Q1. In the "Configuration" section, it is mentioned that > > configurations can be set continuously using the withXXX methods. > > Are these configuration options the same as those provided by DataStream > > V1, > > or might there be different options compared to V1? > > > > I haven't considered options that don't exist in V1 yet, but we may have > > some new options as we continue to develop. > > > > > Q2. The FLIP describes the interface for handling processing > > timers (ProcessingTimeManager), but it does not mention > > how to delete or update an existing timer. V1 API provides TimeService > > that could delete a timer. Does this mean that > > once a timer is registered, it cannot be changed? > > > > I think we do need to introduce a method to delete the timer, but I'm > kind > > of curious why we need to update the timer instead of registering a new > > one. Anyway, I have updated the FLIP to support delete the timer. > > > > > > > > Best regards, > > > > Weijie > > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 14:35写道: > > > > > Hi Xuannan, > > > > > > > 1. +1 to only use XXXParititionStream if users only need to use the > > > configurable PartitionStream. If there are use cases for both, > > > perhaps we could use `ProcessConfigurableNonKeyedPartitionStream` or > > > `ConfigurableNonKeyedPartitionStream` for simplicity. > > > > > > As for why we need both, you can refer to my reply to Yunfeng's first > > > question. As for the name, I can accept > > > ProcessConfigurableNonKeyedPartitionStream or keep the status quo. But > I > > > don't want to change it to ConfigurableNonKeyedPartitionStream, the > > reason > > > is the same, because the configuration is applied to the Process rather > > > than the Stream. > > > > > > > Should we allow users to set custom configurations through the > > > `ProcessConfigurable` interface and access these configurations in the > > > `ProcessFunction` via `RuntimeContext`? I believe it would be useful > > > for process function developers to be able to define custom > > > configurations. > > > > > > If I understand you correctly, you want to set custom properties for > > > processing. The current configurations are mostly for the runtime > engine, > > > such as determining the underlying operator 's parallelism and SSG. But > > I'm > > > not aware of the need to pass in a custom value(independent of the > > > framework itself) and then get it at runtime from RuntimeContext. Could > > > you give some examples? > > > > > > > How can users define custom metrics within the `ProcessFunction`? > > > Will there be a method like `getMetricGroup` available in the > > > `RuntimeContext`? > > > > > > I think this is a reasonable request. For extensibility, I have added > the > > > getMetricManager instead of getMetricGroup to RuntimeContext, we can > use > > > it to get the MetricGroup. > > > > > > > > > Best regards, > > > > > > Weijie > > > > > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 13:45写道: > > > > > >> Thanks Yunfeng, > > >> > > >> Let me try to answer your question :) > > >> > > >> > 1. Would it be better to have all XXXPartitionStream classes > implement > > >> ProcessConfigurable, instead of defining both XXXPartitionStream and > > >> ProcessConfigurableAndXXXPartitionStream? I wonder whether users would > > >> need to operate on a non-configurable PartitionStream. > > >> > > >> I thought about this for a while and decided to separate DataStream > from > > >> ProcessConfigurable. At the core of this is that streams and c > > >> onfigurations are completely orthogonal concepts, and configuration is > > >> only responsible for the `Process`, not the `Stream`. This is why only > > >> the `process/connectAndProcess` returns configurable stream, but > > >> partitioning like `KeyBy` returns a pure DataStream. This may also > > answer > > >> your second question in passing. > > >> > > >> > > >> > Apart from the detailed withConfigFoo(foo)/withConfigBar(bar) > > >> methods, would it be better to also add a general > > >> withConfig(configKey, configValue) method to the ProcessConfigurable > > >> interface? Adding a method for each configuration might harm the > > >> readability and compatibility of configurations. > > >> > > >> Sorry, I may not fully understand this question. ProcessConfigurable > > >> simply refers to the configuration of the Process, which can have the > > name, > > >> parallelism, etc of the process. It's not actually the > > Configuratiion(Contains > > >> a lot of ConfigOptions) that we usually talk about, but more like > > >> `SingleOutputStreamOperator` in DataStream V1. > > >> > > >> Best regards, > > >> > > >> Weijie > > >> > > >> > > >> Xuannan Su <suxuanna...@gmail.com> 于2024年1月29日周一 18:45写道: > > >> > > >>> Hi Weijie, > > >>> > > >>> Thanks for the FLIP! I have a few questions regarding the FLIP. > > >>> > > >>> 1. +1 to only use XXXParititionStream if users only need to use the > > >>> configurable PartitionStream. If there are use cases for both, > > >>> perhaps we could use `ProcessConfigurableNonKeyedPartitionStream` or > > >>> `ConfigurableNonKeyedPartitionStream` for simplicity. > > >>> > > >>> 2. Should we allow users to set custom configurations through the > > >>> `ProcessConfigurable` interface and access these configurations in > the > > >>> `ProcessFunction` via `RuntimeContext`? I believe it would be useful > > >>> for process function developers to be able to define custom > > >>> configurations. > > >>> > > >>> 3. How can users define custom metrics within the `ProcessFunction`? > > >>> Will there be a method like `getMetricGroup` available in the > > >>> `RuntimeContext`? > > >>> > > >>> Best, > > >>> Xuannan > > >>> > > >>> > > >>> On Fri, Jan 26, 2024 at 2:38 PM Yunfeng Zhou > > >>> <flink.zhouyunf...@gmail.com> wrote: > > >>> > > > >>> > Hi Weijie, > > >>> > > > >>> > Thanks for introducing this FLIP! I have a few questions about the > > >>> > designs proposed. > > >>> > > > >>> > 1. Would it be better to have all XXXPartitionStream classes > > implement > > >>> > ProcessConfigurable, instead of defining both XXXPartitionStream > and > > >>> > ProcessConfigurableAndXXXPartitionStream? I wonder whether users > > would > > >>> > need to operate on a non-configurable PartitionStream. > > >>> > > > >>> > 2. The name "ProcessConfigurable" seems a little ambiguous to me. > > Will > > >>> > there be classes other than XXXPartitionStream that implement this > > >>> > interface? Will "Process" be accurate enough to describe > > >>> > PartitionStream and those classes? > > >>> > > > >>> > 3. Apart from the detailed withConfigFoo(foo)/withConfigBar(bar) > > >>> > methods, would it be better to also add a general > > >>> > withConfig(configKey, configValue) method to the > ProcessConfigurable > > >>> > interface? Adding a method for each configuration might harm the > > >>> > readability and compatibility of configurations. > > >>> > > > >>> > Looking forward to your response. > > >>> > > > >>> > Best regards, > > >>> > Yunfeng Zhou > > >>> > > > >>> > On Tue, Dec 26, 2023 at 2:47 PM weijie guo < > > guoweijieres...@gmail.com> > > >>> wrote: > > >>> > > > > >>> > > Hi devs, > > >>> > > > > >>> > > > > >>> > > I'd like to start a discussion about FLIP-410: Config, Context > and > > >>> > > Processing Timer Service of DataStream API V2 [1]. This is the > > second > > >>> > > sub-FLIP of DataStream API V2. > > >>> > > > > >>> > > > > >>> > > In FLIP-409 [2], we have defined the most basic primitive of > > >>> > > DataStream V2. On this basis, this FLIP will further answer > several > > >>> > > important questions closely related to it: > > >>> > > > > >>> > > 1. > > >>> > > How to configure the processing over the datastreams, such as > > >>> > > setting the parallelism. > > >>> > > 2. > > >>> > > How to get access to the runtime contextual information and > > >>> > > services from inside the process functions. > > >>> > > 3. How to work with processing-time timers. > > >>> > > > > >>> > > You can find more details in this FLIP. Its relationship with > other > > >>> > > sub-FLIPs can be found in the umbrella FLIP > > >>> > > [3]. > > >>> > > > > >>> > > > > >>> > > Looking forward to hearing from you, thanks! > > >>> > > > > >>> > > > > >>> > > Best regards, > > >>> > > > > >>> > > Weijie > > >>> > > > > >>> > > > > >>> > > [1] > > >>> > > > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2 > > >>> > > > > >>> > > [2] > > >>> > > > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction > > >>> > > > > >>> > > [3] > > >>> > > > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2 > > >>> > > >> > > >