Just trying to understand. > Is there a particular reason we do not support a > `TwoInputProcessFunction` to combine a KeyedStream with a > BroadcastStream to result in a KeyedStream? There seems to be a valid > use case where a KeyedStream is enriched with a BroadcastStream and > returns a Stream that is partitioned in the same way.
> The key point here is that if the returned stream is a KeyedStream, we > require that the partition of input and output be the same. As for the > data on the broadcast edge, it will be broadcast to all parallelism, we > cannot keep the data partition consistent. For example, if a specific > record is sent to both SubTask1 and SubTask2, after processing, the > partition index calculated by the new KeySelector is `1`, then the data > distribution of SubTask2 has obviously changed. Does this mean if we want to support (KeyedStream, BroadcastStream) -> (KeyedStream), we must make sure that no data can be output upon processing records from the input BroadcastStream? That's probably a reasonable limitation. The problem is would this limitation be too implicit for the users to understand. > I noticed that there are two `collect` methods in the Collector, > > one with a timestamp and one without. Could you elaborate on the > differences between them? Additionally, in what use case would one use > the method that includes the timestamp? > > > That's a good question, and it's mostly used with time-related operators > such as Window. First, we want to give the process function the ability to > reset timestamps, which makes it more flexible than the original > API. Second, we don't want to take the timestamp extraction > operator/function as a base primitive, it's more like a high-level > extension. Therefore, the framework must provide this functionality. > > 1. I'd suggest renaming the method with timestamp to something like `collectAndOverwriteTimestamp`. That might help users understand that they don't always need to call this method, unless they explicitly want to overwrite the timestamp. 2. While this method provides a way to set timestamps, how would users read timestamps from the records? Best, Xintong On Tue, Jan 30, 2024 at 12:45 PM weijie guo <guoweijieres...@gmail.com> wrote: > Hi Xuannan, > > Thank you for your attention. > > > In the partitioning section, it says that "broadcast can only be > used as a side-input of other Inputs." Could you clarify what is meant > by "side-input"? If I understand correctly, it refer to one of the > inputs of the `TwoInputStreamProcessFunction`. If that is the case, > the term "side-input" may not be accurate. > > Yes, you got it right! I have rewrote this sentence to avoid > misunderstanding. > > > Is there a particular reason we do not support a > `TwoInputProcessFunction` to combine a KeyedStream with a > BroadcastStream to result in a KeyedStream? There seems to be a valid > use case where a KeyedStream is enriched with a BroadcastStream and > returns a Stream that is partitioned in the same way. > > The key point here is that if the returned stream is a KeyedStream, we > require that the partition of input and output be the same. As for the > data on the broadcast edge, it will be broadcast to all parallelism, we > cannot keep the data partition consistent. For example, if a specific > record is sent to both SubTask1 and SubTask2, after processing, the > partition index calculated by the new KeySelector is `1`, then the data > distribution of SubTask2 has obviously changed. > > > 3. There appears to be a typo in the example code. The > `SingleStreamProcessFunction` should probably be > `OneInputStreamProcessFunction`. > > Yes, good catch. I have updated this FLIP. > > > 4. How do we set the global configuration for the > ExecutionEnvironment? Currently, we have the > StreamExecutionEnvironment.getExecutionEnvironment(Configuration) > method to provide the global configuration in the API. > > This is because we don't want to allow set config programmatically in the > new API, everything best comes from configuration files. However, this may > be too ideal, and the specific details need to be considered and discussed > in more detail, and I propose to devote a new sub-FLIP to this issue later. > We can easily provide the `getExecutionEnvironment(Configuration)` or > `withConfiguration(Configuration)` method later. > > > I noticed that there are two `collect` methods in the Collector, > one with a timestamp and one without. Could you elaborate on the > differences between them? Additionally, in what use case would one use > the method that includes the timestamp? > > That's a good question, and it's mostly used with time-related operators > such as Window. First, we want to give the process function the ability to > reset timestamps, which makes it more flexible than the original > API. Second, we don't want to take the timestamp extraction > operator/function as a base primitive, it's more like a high-level > extension. Therefore, the framework must provide this functionality. > > > Best regards, > > Weijie > > > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 11:45写道: > > > Hi Yunfeng, > > > > Thank you for your attention > > > > > 1. Will we provide any API to support choosing which input to consume > > between the two inputs of TwoInputStreamProcessFunction? It would be > > helpful in online machine learning cases, where a process function > > needs to receive the first machine learning model before it can start > > predictions on input data. Similar requirements might also exist in > > Flink CEP, where a rule set needs to be consumed by the process > > function first before it can start matching the event stream against > > CEP patterns. > > > > Good point! I think we can provide a `nextInputSelection()` method for > > `TwoInputStreamProcessFunction`. It returns a ·First/Second· enum that > > determines which Input the mailbox thread will read next. But I'm > > considering putting it in the sub-FLIP related to Join, since features > like > > HashJoin have a more specific need for this. > > > > > A typo might exist in the current FLIP describing the API to > > generate a global stream, as I can see either global() or coalesce() > > in different places of the FLIP. These two methods might need to be > > unified into one method. > > > > Good catch! I have updated this FLIP to fix this typo. > > > > > The order of parameters in the current ProcessFunction is (record, > > context, output), while this FLIP proposes to change the order into > > (record, output, context). Is there any reason to make this change? > > > > No, it's just the order we decide. But please note that there is no > > relationship between the two ProcessFunction's anyway. I think it's okay > to > > use our own order of parameters in new API. > > > > 4. Why does this FLIP propose to use connectAndProcess() instead of > > connect() (+ keyBy()) + process()? The latter looks simpler to me. > > > > > I actually also considered this way at first, but it would have to > > introduce some concepts like ConnectedStreams. But we hope that streams > > will be more clearly defined in the DataStream API, otherwise we will end > > up going the same way as the original API, which you have to understand > > `JoinedStreams/ConnectedStreams` and so on. > > > > > > > > Best regards, > > > > Weijie > > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 11:20写道: > > > >> Hi Wencong: > >> > >> Thank you for your attention > >> > >> > Q1. Other DataStream types are converted into > >> Non-Keyed DataStreams by using a "shuffle" operation > >> to convert Input into output. Does this "shuffle" include the > >> various repartition operations (rebalance/rescale/shuffle) > >> from DataStream V1? > >> > >> Yes, The name `shuffle` is used only to represent the transformation of > >> an arbitrary stream into a non-keyed partitioned stream and does not > >> restrict how the data is partitioned. > >> > >> > >> > Q2. Why is the design for TwoOutputStreamProcessFunction, > >> when dealing with a KeyedStream, only outputting combinations > >> of (Keyed + Keyed) and (Non-Keyed + Non-Keyed)? > >> > >> In theory, we could only provide functions that return Non-Keyed > streams. > >> If you do want a KeyedStream, you explicitly convert it to a KeyedStream > >> via keyBy. However, because sometimes data is processed without changing > >> the partition, we choose to provide an additional KeyedStream > counterpart > >> to reduce the shuffle overhead. We didn't introduce the non-keyed + > >> keyed combo here simply because it's not very common, and if we really > see > >> a lot of users asking for it later on, it's easy to support it then. > >> > >> > >> Best regards, > >> > >> Weijie > >> > >> > >> Xuannan Su <suxuanna...@gmail.com> 于2024年1月29日周一 18:28写道: > >> > >>> Hi Weijie, > >>> > >>> Thank you for driving the design of the new DataStream API. I have a > >>> few questions regarding the FLIP: > >>> > >>> 1. In the partitioning section, it says that "broadcast can only be > >>> used as a side-input of other Inputs." Could you clarify what is meant > >>> by "side-input"? If I understand correctly, it refer to one of the > >>> inputs of the `TwoInputStreamProcessFunction`. If that is the case, > >>> the term "side-input" may not be accurate. > >>> > >>> 2. Is there a particular reason we do not support a > >>> `TwoInputProcessFunction` to combine a KeyedStream with a > >>> BroadcastStream to result in a KeyedStream? There seems to be a valid > >>> use case where a KeyedStream is enriched with a BroadcastStream and > >>> returns a Stream that is partitioned in the same way. > >>> > >>> 3. There appears to be a typo in the example code. The > >>> `SingleStreamProcessFunction` should probably be > >>> `OneInputStreamProcessFunction`. > >>> > >>> 4. How do we set the global configuration for the > >>> ExecutionEnvironment? Currently, we have the > >>> StreamExecutionEnvironment.getExecutionEnvironment(Configuration) > >>> method to provide the global configuration in the API. > >>> > >>> 5. I noticed that there are two `collect` methods in the Collector, > >>> one with a timestamp and one without. Could you elaborate on the > >>> differences between them? Additionally, in what use case would one use > >>> the method that includes the timestamp? > >>> > >>> Best regards, > >>> Xuannan > >>> > >>> > >>> > >>> On Fri, Jan 26, 2024 at 2:21 PM Yunfeng Zhou > >>> <flink.zhouyunf...@gmail.com> wrote: > >>> > > >>> > Hi Weijie, > >>> > > >>> > Thanks for raising discussions about the new DataStream API. I have a > >>> > few questions about the content of the FLIP. > >>> > > >>> > 1. Will we provide any API to support choosing which input to consume > >>> > between the two inputs of TwoInputStreamProcessFunction? It would be > >>> > helpful in online machine learning cases, where a process function > >>> > needs to receive the first machine learning model before it can start > >>> > predictions on input data. Similar requirements might also exist in > >>> > Flink CEP, where a rule set needs to be consumed by the process > >>> > function first before it can start matching the event stream against > >>> > CEP patterns. > >>> > > >>> > 2. A typo might exist in the current FLIP describing the API to > >>> > generate a global stream, as I can see either global() or coalesce() > >>> > in different places of the FLIP. These two methods might need to be > >>> > unified into one method. > >>> > > >>> > 3. The order of parameters in the current ProcessFunction is (record, > >>> > context, output), while this FLIP proposes to change the order into > >>> > (record, output, context). Is there any reason to make this change? > >>> > > >>> > 4. Why does this FLIP propose to use connectAndProcess() instead of > >>> > connect() (+ keyBy()) + process()? The latter looks simpler to me. > >>> > > >>> > Looking forward to discussing these questions with you. > >>> > > >>> > Best regards, > >>> > Yunfeng Zhou > >>> > > >>> > On Tue, Dec 26, 2023 at 2:44 PM weijie guo < > guoweijieres...@gmail.com> > >>> wrote: > >>> > > > >>> > > Hi devs, > >>> > > > >>> > > > >>> > > I'd like to start a discussion about FLIP-409: DataStream V2 > Building > >>> > > Blocks: DataStream, Partitioning and ProcessFunction [1]. > >>> > > > >>> > > > >>> > > As the first sub-FLIP for DataStream API V2, we'd like to discuss > and > >>> > > try to answer some of the most fundamental questions in stream > >>> > > processing: > >>> > > > >>> > > 1. What kinds of data streams do we have? > >>> > > 2. How to partition data over the streams? > >>> > > 3. How to define a processing on the data stream? > >>> > > > >>> > > The answer to these questions involve three core concepts: > >>> DataStream, > >>> > > Partitioning and ProcessFunction. In this FLIP, we will discuss the > >>> > > definitions and related API primitives of these concepts in detail. > >>> > > > >>> > > > >>> > > You can find more details in FLIP-409 [1]. This sub-FLIP is at the > >>> > > heart of the entire DataStream API V2, and its relationship with > >>> other > >>> > > sub-FLIPs can be found in the umbrella FLIP [2]. > >>> > > > >>> > > > >>> > > Looking forward to hearing from you, thanks! > >>> > > > >>> > > > >>> > > Best regards, > >>> > > > >>> > > Weijie > >>> > > > >>> > > > >>> > > > >>> > > [1] > >>> > > > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction > >>> > > > >>> > > [2] > >>> > > > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2 > >>> > >> >