Hi All, Thanks for all the feedback.
If there are no more comments, I would like to start the vote thread, thanks again! Best regards, Weijie Xintong Song <tonysong...@gmail.com> 于2024年2月20日周二 14:17写道: > LGTM > > Best, > > Xintong > > > > On Mon, Feb 19, 2024 at 10:48 AM weijie guo <guoweijieres...@gmail.com> > wrote: > > > Hi All, > > > > Based on the discussion thread of FLIP-409, I did a synchronized update > to > > this one. In simple terms, added `TwoInputBroadcastStreamProcessFunction` > > related content. > > > > > > Best regards, > > > > Weijie > > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年1月31日周三 15:00写道: > > > > > Hi Xintong, > > > > > > Thanks for the quick reply. > > > > > > > Why introduce a new `MetricManager` rather than just return > > `MetricGroup` > > > from `RuntimeContext`? > > > > > > This is to facilitate possible future extensions. But I thought it > > > through, MetricGroup itself also plays the role of a manager. > > > So I think you are right, I will add a `getMetricGroup` method directly > > in > > > `RuntimeContext`. > > > > > > Best regards, > > > > > > Weijie > > > > > > > > > Xintong Song <tonysong...@gmail.com> 于2024年1月31日周三 14:02写道: > > > > > >> > > > >> > > How can users define custom metrics within the `ProcessFunction`? > > >> > Will there be a method like `getMetricGroup` available in the > > >> > `RuntimeContext`? > > >> > > > >> > I think this is a reasonable request. For extensibility, I have > added > > >> the > > >> > getMetricManager instead of getMetricGroup to RuntimeContext, we can > > >> use it > > >> > to get the MetricGroup. > > >> > > > >> > > >> Why introduce a new `MetricManager` rather than just return > > `MetricGroup` > > >> from `RuntimeContext`? > > >> > > >> > Q2. The FLIP describes the interface for handling processing > > >> > timers (ProcessingTimeManager), but it does not mention > > >> > how to delete or update an existing timer. V1 API provides > TimeService > > >> > that could delete a timer. Does this mean that > > >> > once a timer is registered, it cannot be changed? > > >> > > > >> > I think we do need to introduce a method to delete the timer, but > I'm > > >> kind > > >> > of curious why we need to update the timer instead of registering a > > new > > >> > one. Anyway, I have updated the FLIP to support delete the timer. > > >> > > > >> > > >> Registering a new timer does not mean the old timer should be removed. > > >> There could be multiple timers. > > >> > > >> If we don't support deleting timers, developers can still decide to do > > >> nothing upon the timer is triggered. In that case, they will need > > >> additional logic to decide whether the timer should be skipped or not > in > > >> `onProcessingTimer`. Besides, there could also be additional > performance > > >> overhead in frequent calling and skipping the callback. > > >> > > >> Best, > > >> > > >> Xintong > > >> > > >> > > >> > > >> On Tue, Jan 30, 2024 at 3:26 PM weijie guo <guoweijieres...@gmail.com > > > > >> wrote: > > >> > > >> > Hi Wencong, > > >> > > > >> > > Q1. In the "Configuration" section, it is mentioned that > > >> > configurations can be set continuously using the withXXX methods. > > >> > Are these configuration options the same as those provided by > > DataStream > > >> > V1, > > >> > or might there be different options compared to V1? > > >> > > > >> > I haven't considered options that don't exist in V1 yet, but we may > > have > > >> > some new options as we continue to develop. > > >> > > > >> > > Q2. The FLIP describes the interface for handling processing > > >> > timers (ProcessingTimeManager), but it does not mention > > >> > how to delete or update an existing timer. V1 API provides > TimeService > > >> > that could delete a timer. Does this mean that > > >> > once a timer is registered, it cannot be changed? > > >> > > > >> > I think we do need to introduce a method to delete the timer, but > I'm > > >> kind > > >> > of curious why we need to update the timer instead of registering a > > new > > >> > one. Anyway, I have updated the FLIP to support delete the timer. > > >> > > > >> > > > >> > > > >> > Best regards, > > >> > > > >> > Weijie > > >> > > > >> > > > >> > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 14:35写道: > > >> > > > >> > > Hi Xuannan, > > >> > > > > >> > > > 1. +1 to only use XXXParititionStream if users only need to use > > the > > >> > > configurable PartitionStream. If there are use cases for both, > > >> > > perhaps we could use `ProcessConfigurableNonKeyedPartitionStream` > or > > >> > > `ConfigurableNonKeyedPartitionStream` for simplicity. > > >> > > > > >> > > As for why we need both, you can refer to my reply to Yunfeng's > > first > > >> > > question. As for the name, I can accept > > >> > > ProcessConfigurableNonKeyedPartitionStream or keep the status quo. > > >> But I > > >> > > don't want to change it to ConfigurableNonKeyedPartitionStream, > the > > >> > reason > > >> > > is the same, because the configuration is applied to the Process > > >> rather > > >> > > than the Stream. > > >> > > > > >> > > > Should we allow users to set custom configurations through the > > >> > > `ProcessConfigurable` interface and access these configurations in > > the > > >> > > `ProcessFunction` via `RuntimeContext`? I believe it would be > useful > > >> > > for process function developers to be able to define custom > > >> > > configurations. > > >> > > > > >> > > If I understand you correctly, you want to set custom properties > for > > >> > > processing. The current configurations are mostly for the runtime > > >> engine, > > >> > > such as determining the underlying operator 's parallelism and > SSG. > > >> But > > >> > I'm > > >> > > not aware of the need to pass in a custom value(independent of the > > >> > > framework itself) and then get it at runtime from RuntimeContext. > > >> Could > > >> > > you give some examples? > > >> > > > > >> > > > How can users define custom metrics within the > `ProcessFunction`? > > >> > > Will there be a method like `getMetricGroup` available in the > > >> > > `RuntimeContext`? > > >> > > > > >> > > I think this is a reasonable request. For extensibility, I have > > added > > >> the > > >> > > getMetricManager instead of getMetricGroup to RuntimeContext, we > can > > >> use > > >> > > it to get the MetricGroup. > > >> > > > > >> > > > > >> > > Best regards, > > >> > > > > >> > > Weijie > > >> > > > > >> > > > > >> > > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 13:45写道: > > >> > > > > >> > >> Thanks Yunfeng, > > >> > >> > > >> > >> Let me try to answer your question :) > > >> > >> > > >> > >> > 1. Would it be better to have all XXXPartitionStream classes > > >> implement > > >> > >> ProcessConfigurable, instead of defining both XXXPartitionStream > > and > > >> > >> ProcessConfigurableAndXXXPartitionStream? I wonder whether users > > >> would > > >> > >> need to operate on a non-configurable PartitionStream. > > >> > >> > > >> > >> I thought about this for a while and decided to separate > DataStream > > >> from > > >> > >> ProcessConfigurable. At the core of this is that streams and c > > >> > >> onfigurations are completely orthogonal concepts, and > configuration > > >> is > > >> > >> only responsible for the `Process`, not the `Stream`. This is why > > >> only > > >> > >> the `process/connectAndProcess` returns configurable stream, but > > >> > >> partitioning like `KeyBy` returns a pure DataStream. This may > also > > >> > answer > > >> > >> your second question in passing. > > >> > >> > > >> > >> > > >> > >> > Apart from the detailed withConfigFoo(foo)/withConfigBar(bar) > > >> > >> methods, would it be better to also add a general > > >> > >> withConfig(configKey, configValue) method to the > > ProcessConfigurable > > >> > >> interface? Adding a method for each configuration might harm the > > >> > >> readability and compatibility of configurations. > > >> > >> > > >> > >> Sorry, I may not fully understand this question. > > ProcessConfigurable > > >> > >> simply refers to the configuration of the Process, which can have > > the > > >> > name, > > >> > >> parallelism, etc of the process. It's not actually the > > >> > Configuratiion(Contains > > >> > >> a lot of ConfigOptions) that we usually talk about, but more like > > >> > >> `SingleOutputStreamOperator` in DataStream V1. > > >> > >> > > >> > >> Best regards, > > >> > >> > > >> > >> Weijie > > >> > >> > > >> > >> > > >> > >> Xuannan Su <suxuanna...@gmail.com> 于2024年1月29日周一 18:45写道: > > >> > >> > > >> > >>> Hi Weijie, > > >> > >>> > > >> > >>> Thanks for the FLIP! I have a few questions regarding the FLIP. > > >> > >>> > > >> > >>> 1. +1 to only use XXXParititionStream if users only need to use > > the > > >> > >>> configurable PartitionStream. If there are use cases for both, > > >> > >>> perhaps we could use > `ProcessConfigurableNonKeyedPartitionStream` > > or > > >> > >>> `ConfigurableNonKeyedPartitionStream` for simplicity. > > >> > >>> > > >> > >>> 2. Should we allow users to set custom configurations through > the > > >> > >>> `ProcessConfigurable` interface and access these configurations > in > > >> the > > >> > >>> `ProcessFunction` via `RuntimeContext`? I believe it would be > > useful > > >> > >>> for process function developers to be able to define custom > > >> > >>> configurations. > > >> > >>> > > >> > >>> 3. How can users define custom metrics within the > > `ProcessFunction`? > > >> > >>> Will there be a method like `getMetricGroup` available in the > > >> > >>> `RuntimeContext`? > > >> > >>> > > >> > >>> Best, > > >> > >>> Xuannan > > >> > >>> > > >> > >>> > > >> > >>> On Fri, Jan 26, 2024 at 2:38 PM Yunfeng Zhou > > >> > >>> <flink.zhouyunf...@gmail.com> wrote: > > >> > >>> > > > >> > >>> > Hi Weijie, > > >> > >>> > > > >> > >>> > Thanks for introducing this FLIP! I have a few questions about > > the > > >> > >>> > designs proposed. > > >> > >>> > > > >> > >>> > 1. Would it be better to have all XXXPartitionStream classes > > >> > implement > > >> > >>> > ProcessConfigurable, instead of defining both > XXXPartitionStream > > >> and > > >> > >>> > ProcessConfigurableAndXXXPartitionStream? I wonder whether > users > > >> > would > > >> > >>> > need to operate on a non-configurable PartitionStream. > > >> > >>> > > > >> > >>> > 2. The name "ProcessConfigurable" seems a little ambiguous to > > me. > > >> > Will > > >> > >>> > there be classes other than XXXPartitionStream that implement > > this > > >> > >>> > interface? Will "Process" be accurate enough to describe > > >> > >>> > PartitionStream and those classes? > > >> > >>> > > > >> > >>> > 3. Apart from the detailed > withConfigFoo(foo)/withConfigBar(bar) > > >> > >>> > methods, would it be better to also add a general > > >> > >>> > withConfig(configKey, configValue) method to the > > >> ProcessConfigurable > > >> > >>> > interface? Adding a method for each configuration might harm > the > > >> > >>> > readability and compatibility of configurations. > > >> > >>> > > > >> > >>> > Looking forward to your response. > > >> > >>> > > > >> > >>> > Best regards, > > >> > >>> > Yunfeng Zhou > > >> > >>> > > > >> > >>> > On Tue, Dec 26, 2023 at 2:47 PM weijie guo < > > >> > guoweijieres...@gmail.com> > > >> > >>> wrote: > > >> > >>> > > > > >> > >>> > > Hi devs, > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > I'd like to start a discussion about FLIP-410: Config, > Context > > >> and > > >> > >>> > > Processing Timer Service of DataStream API V2 [1]. This is > the > > >> > second > > >> > >>> > > sub-FLIP of DataStream API V2. > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > In FLIP-409 [2], we have defined the most basic primitive of > > >> > >>> > > DataStream V2. On this basis, this FLIP will further answer > > >> several > > >> > >>> > > important questions closely related to it: > > >> > >>> > > > > >> > >>> > > 1. > > >> > >>> > > How to configure the processing over the datastreams, > such > > as > > >> > >>> > > setting the parallelism. > > >> > >>> > > 2. > > >> > >>> > > How to get access to the runtime contextual information > and > > >> > >>> > > services from inside the process functions. > > >> > >>> > > 3. How to work with processing-time timers. > > >> > >>> > > > > >> > >>> > > You can find more details in this FLIP. Its relationship > with > > >> other > > >> > >>> > > sub-FLIPs can be found in the umbrella FLIP > > >> > >>> > > [3]. > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > Looking forward to hearing from you, thanks! > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > Best regards, > > >> > >>> > > > > >> > >>> > > Weijie > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > [1] > > >> > >>> > > > > >> > >>> > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2 > > >> > >>> > > > > >> > >>> > > [2] > > >> > >>> > > > > >> > >>> > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction > > >> > >>> > > > > >> > >>> > > [3] > > >> > >>> > > > > >> > >>> > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2 > > >> > >>> > > >> > >> > > >> > > > >> > > > > > >