Hi All, Based on the discussion thread of FLIP-409, I did a synchronized update to this one. In simple terms, added `TwoInputBroadcastStreamProcessFunction` related content.
Best regards, Weijie weijie guo <guoweijieres...@gmail.com> 于2024年1月31日周三 15:00写道: > Hi Xintong, > > Thanks for the quick reply. > > > Why introduce a new `MetricManager` rather than just return `MetricGroup` > from `RuntimeContext`? > > This is to facilitate possible future extensions. But I thought it > through, MetricGroup itself also plays the role of a manager. > So I think you are right, I will add a `getMetricGroup` method directly in > `RuntimeContext`. > > Best regards, > > Weijie > > > Xintong Song <tonysong...@gmail.com> 于2024年1月31日周三 14:02写道: > >> > >> > > How can users define custom metrics within the `ProcessFunction`? >> > Will there be a method like `getMetricGroup` available in the >> > `RuntimeContext`? >> > >> > I think this is a reasonable request. For extensibility, I have added >> the >> > getMetricManager instead of getMetricGroup to RuntimeContext, we can >> use it >> > to get the MetricGroup. >> > >> >> Why introduce a new `MetricManager` rather than just return `MetricGroup` >> from `RuntimeContext`? >> >> > Q2. The FLIP describes the interface for handling processing >> > timers (ProcessingTimeManager), but it does not mention >> > how to delete or update an existing timer. V1 API provides TimeService >> > that could delete a timer. Does this mean that >> > once a timer is registered, it cannot be changed? >> > >> > I think we do need to introduce a method to delete the timer, but I'm >> kind >> > of curious why we need to update the timer instead of registering a new >> > one. Anyway, I have updated the FLIP to support delete the timer. >> > >> >> Registering a new timer does not mean the old timer should be removed. >> There could be multiple timers. >> >> If we don't support deleting timers, developers can still decide to do >> nothing upon the timer is triggered. In that case, they will need >> additional logic to decide whether the timer should be skipped or not in >> `onProcessingTimer`. Besides, there could also be additional performance >> overhead in frequent calling and skipping the callback. >> >> Best, >> >> Xintong >> >> >> >> On Tue, Jan 30, 2024 at 3:26 PM weijie guo <guoweijieres...@gmail.com> >> wrote: >> >> > Hi Wencong, >> > >> > > Q1. In the "Configuration" section, it is mentioned that >> > configurations can be set continuously using the withXXX methods. >> > Are these configuration options the same as those provided by DataStream >> > V1, >> > or might there be different options compared to V1? >> > >> > I haven't considered options that don't exist in V1 yet, but we may have >> > some new options as we continue to develop. >> > >> > > Q2. The FLIP describes the interface for handling processing >> > timers (ProcessingTimeManager), but it does not mention >> > how to delete or update an existing timer. V1 API provides TimeService >> > that could delete a timer. Does this mean that >> > once a timer is registered, it cannot be changed? >> > >> > I think we do need to introduce a method to delete the timer, but I'm >> kind >> > of curious why we need to update the timer instead of registering a new >> > one. Anyway, I have updated the FLIP to support delete the timer. >> > >> > >> > >> > Best regards, >> > >> > Weijie >> > >> > >> > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 14:35写道: >> > >> > > Hi Xuannan, >> > > >> > > > 1. +1 to only use XXXParititionStream if users only need to use the >> > > configurable PartitionStream. If there are use cases for both, >> > > perhaps we could use `ProcessConfigurableNonKeyedPartitionStream` or >> > > `ConfigurableNonKeyedPartitionStream` for simplicity. >> > > >> > > As for why we need both, you can refer to my reply to Yunfeng's first >> > > question. As for the name, I can accept >> > > ProcessConfigurableNonKeyedPartitionStream or keep the status quo. >> But I >> > > don't want to change it to ConfigurableNonKeyedPartitionStream, the >> > reason >> > > is the same, because the configuration is applied to the Process >> rather >> > > than the Stream. >> > > >> > > > Should we allow users to set custom configurations through the >> > > `ProcessConfigurable` interface and access these configurations in the >> > > `ProcessFunction` via `RuntimeContext`? I believe it would be useful >> > > for process function developers to be able to define custom >> > > configurations. >> > > >> > > If I understand you correctly, you want to set custom properties for >> > > processing. The current configurations are mostly for the runtime >> engine, >> > > such as determining the underlying operator 's parallelism and SSG. >> But >> > I'm >> > > not aware of the need to pass in a custom value(independent of the >> > > framework itself) and then get it at runtime from RuntimeContext. >> Could >> > > you give some examples? >> > > >> > > > How can users define custom metrics within the `ProcessFunction`? >> > > Will there be a method like `getMetricGroup` available in the >> > > `RuntimeContext`? >> > > >> > > I think this is a reasonable request. For extensibility, I have added >> the >> > > getMetricManager instead of getMetricGroup to RuntimeContext, we can >> use >> > > it to get the MetricGroup. >> > > >> > > >> > > Best regards, >> > > >> > > Weijie >> > > >> > > >> > > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 13:45写道: >> > > >> > >> Thanks Yunfeng, >> > >> >> > >> Let me try to answer your question :) >> > >> >> > >> > 1. Would it be better to have all XXXPartitionStream classes >> implement >> > >> ProcessConfigurable, instead of defining both XXXPartitionStream and >> > >> ProcessConfigurableAndXXXPartitionStream? I wonder whether users >> would >> > >> need to operate on a non-configurable PartitionStream. >> > >> >> > >> I thought about this for a while and decided to separate DataStream >> from >> > >> ProcessConfigurable. At the core of this is that streams and c >> > >> onfigurations are completely orthogonal concepts, and configuration >> is >> > >> only responsible for the `Process`, not the `Stream`. This is why >> only >> > >> the `process/connectAndProcess` returns configurable stream, but >> > >> partitioning like `KeyBy` returns a pure DataStream. This may also >> > answer >> > >> your second question in passing. >> > >> >> > >> >> > >> > Apart from the detailed withConfigFoo(foo)/withConfigBar(bar) >> > >> methods, would it be better to also add a general >> > >> withConfig(configKey, configValue) method to the ProcessConfigurable >> > >> interface? Adding a method for each configuration might harm the >> > >> readability and compatibility of configurations. >> > >> >> > >> Sorry, I may not fully understand this question. ProcessConfigurable >> > >> simply refers to the configuration of the Process, which can have the >> > name, >> > >> parallelism, etc of the process. It's not actually the >> > Configuratiion(Contains >> > >> a lot of ConfigOptions) that we usually talk about, but more like >> > >> `SingleOutputStreamOperator` in DataStream V1. >> > >> >> > >> Best regards, >> > >> >> > >> Weijie >> > >> >> > >> >> > >> Xuannan Su <suxuanna...@gmail.com> 于2024年1月29日周一 18:45写道: >> > >> >> > >>> Hi Weijie, >> > >>> >> > >>> Thanks for the FLIP! I have a few questions regarding the FLIP. >> > >>> >> > >>> 1. +1 to only use XXXParititionStream if users only need to use the >> > >>> configurable PartitionStream. If there are use cases for both, >> > >>> perhaps we could use `ProcessConfigurableNonKeyedPartitionStream` or >> > >>> `ConfigurableNonKeyedPartitionStream` for simplicity. >> > >>> >> > >>> 2. Should we allow users to set custom configurations through the >> > >>> `ProcessConfigurable` interface and access these configurations in >> the >> > >>> `ProcessFunction` via `RuntimeContext`? I believe it would be useful >> > >>> for process function developers to be able to define custom >> > >>> configurations. >> > >>> >> > >>> 3. How can users define custom metrics within the `ProcessFunction`? >> > >>> Will there be a method like `getMetricGroup` available in the >> > >>> `RuntimeContext`? >> > >>> >> > >>> Best, >> > >>> Xuannan >> > >>> >> > >>> >> > >>> On Fri, Jan 26, 2024 at 2:38 PM Yunfeng Zhou >> > >>> <flink.zhouyunf...@gmail.com> wrote: >> > >>> > >> > >>> > Hi Weijie, >> > >>> > >> > >>> > Thanks for introducing this FLIP! I have a few questions about the >> > >>> > designs proposed. >> > >>> > >> > >>> > 1. Would it be better to have all XXXPartitionStream classes >> > implement >> > >>> > ProcessConfigurable, instead of defining both XXXPartitionStream >> and >> > >>> > ProcessConfigurableAndXXXPartitionStream? I wonder whether users >> > would >> > >>> > need to operate on a non-configurable PartitionStream. >> > >>> > >> > >>> > 2. The name "ProcessConfigurable" seems a little ambiguous to me. >> > Will >> > >>> > there be classes other than XXXPartitionStream that implement this >> > >>> > interface? Will "Process" be accurate enough to describe >> > >>> > PartitionStream and those classes? >> > >>> > >> > >>> > 3. Apart from the detailed withConfigFoo(foo)/withConfigBar(bar) >> > >>> > methods, would it be better to also add a general >> > >>> > withConfig(configKey, configValue) method to the >> ProcessConfigurable >> > >>> > interface? Adding a method for each configuration might harm the >> > >>> > readability and compatibility of configurations. >> > >>> > >> > >>> > Looking forward to your response. >> > >>> > >> > >>> > Best regards, >> > >>> > Yunfeng Zhou >> > >>> > >> > >>> > On Tue, Dec 26, 2023 at 2:47 PM weijie guo < >> > guoweijieres...@gmail.com> >> > >>> wrote: >> > >>> > > >> > >>> > > Hi devs, >> > >>> > > >> > >>> > > >> > >>> > > I'd like to start a discussion about FLIP-410: Config, Context >> and >> > >>> > > Processing Timer Service of DataStream API V2 [1]. This is the >> > second >> > >>> > > sub-FLIP of DataStream API V2. >> > >>> > > >> > >>> > > >> > >>> > > In FLIP-409 [2], we have defined the most basic primitive of >> > >>> > > DataStream V2. On this basis, this FLIP will further answer >> several >> > >>> > > important questions closely related to it: >> > >>> > > >> > >>> > > 1. >> > >>> > > How to configure the processing over the datastreams, such as >> > >>> > > setting the parallelism. >> > >>> > > 2. >> > >>> > > How to get access to the runtime contextual information and >> > >>> > > services from inside the process functions. >> > >>> > > 3. How to work with processing-time timers. >> > >>> > > >> > >>> > > You can find more details in this FLIP. Its relationship with >> other >> > >>> > > sub-FLIPs can be found in the umbrella FLIP >> > >>> > > [3]. >> > >>> > > >> > >>> > > >> > >>> > > Looking forward to hearing from you, thanks! >> > >>> > > >> > >>> > > >> > >>> > > Best regards, >> > >>> > > >> > >>> > > Weijie >> > >>> > > >> > >>> > > >> > >>> > > [1] >> > >>> > > >> > >>> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2 >> > >>> > > >> > >>> > > [2] >> > >>> > > >> > >>> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction >> > >>> > > >> > >>> > > [3] >> > >>> > > >> > >>> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2 >> > >>> >> > >> >> > >> >