Hi All, Thanks for all the feedback.
If there are no more comments, I would like to start the vote thread, thanks again! Best regards, Weijie Xintong Song <tonysong...@gmail.com> 于2024年2月20日周二 14:17写道: > Thanks for the updates. LGTM. > > Best, > > Xintong > > > > On Mon, Feb 19, 2024 at 10:51 AM weijie guo <guoweijieres...@gmail.com> > wrote: > > > Thanks for the reply, Xintong. > > > > Based on your comments, I made the following changes to this FLIP: > > > > 1. Renaming `TwoInputStreamProcessFunction` and > > `BroadcastTwoInputStreamProcessFunction` to > > `TwoInputNonBroadcastStreamProcessFunction` and > > `TwoInputBroadcastStreamProcessFunction`, respectively. > > > > 2. Making `NonPartitionedContext` extend `RuntimeContext`. > > > > > Some of these changes also affect FLIP-410. I noticed that FLIP-410 is > > also updated accordingly. It would be nice to also mention those changes > in > > the FLIP-410 discussion thread. > > > > Yes, I've now mentioned those updates in the FLIP-410 discussion thread. > > > > > > Best regards, > > > > Weijie > > > > > > Xintong Song <tonysong...@gmail.com> 于2024年2月5日周一 10:58写道: > > > > > Thanks for updating the FLIP, Weijie. > > > > > > I think separating the TwoInputProcessFunction according to whether the > > > input stream contains BroadcastStream makes sense. > > > > > > I have a few more comments. > > > 1. I'd suggest the names `TwoInputNonBroadcastStreamProcessFunction` > and > > > `TwoInputBroadcastStreamProcessFunction` for the separated methods. > > > 2. I'd suggest making `NonPartitionedContext` extend `RuntimeContext`. > > > Otherwise, for all the functionalities that `RuntimeContext` provides, > we > > > need to duplicate them for `NonPartitionedContext`. > > > 3. Some of these changes also affect FLIP-410. I noticed that FLIP-410 > is > > > also updated accordingly. It would be nice to also mention those > changes > > in > > > the FLIP-410 discussion thread. > > > > > > Best, > > > > > > Xintong > > > > > > > > > > > > On Sun, Feb 4, 2024 at 11:23 AM weijie guo <guoweijieres...@gmail.com> > > > wrote: > > > > > > > Hi Xuannan and Xintong, > > > > > > > > Good point! After further consideration, I feel that we should make > the > > > > Broadcast + NonKeyed/Keyed process function different from the normal > > > > TwoInputProcessFunction. Because the record from the broadcast input > > > indeed > > > > correspond to all partitions, while the record from the non-broadcast > > > edge > > > > have explicit partitions. > > > > > > > > When we consider the data of broadcast input, it is only valid to do > > > > something on all the partitions at once, such as things like > > > > `applyToKeyedState`. Similarly, other operations(e.g, endOfInput) > that > > do > > > > not determine the current partition should also only be allowed to > > > perform > > > > on all partitions. This FLIP has been updated. > > > > > > > > Best regards, > > > > > > > > Weijie > > > > > > > > > > > > Xintong Song <tonysong...@gmail.com> 于2024年2月1日周四 11:31写道: > > > > > > > > > OK, I see your point. > > > > > > > > > > I think the demand for updating states and emitting outputs upon > > > > receiving > > > > > a broadcast record makes sense. However, the way > > > > > `KeyedBroadcastProcessFunction` supports this may not be optimal. > > E.g., > > > > if > > > > > `Collector#collect` is called in `processBroadcastElement` but > > outside > > > of > > > > > `Context#applyToKeyedState`, the result can be undefined. > > > > > > > > > > Currently in this FLIP, a `TwoInputStreamProcessFunction` is not > > aware > > > of > > > > > which input is KeyedStream and which is BroadcastStream, which > makes > > > > > supporting things like `applyToKeyedState` difficult. I think we > can > > > > > provide a built-in function similar to > > `KeyedBroadcastProcessFunction` > > > on > > > > > top of `TwoInputStreamProcessFunction` to address this demand. > > > > > > > > > > WDYT? > > > > > > > > > > > > > > > Best, > > > > > > > > > > Xintong > > > > > > > > > > > > > > > > > > > > On Thu, Feb 1, 2024 at 10:41 AM Xuannan Su <suxuanna...@gmail.com> > > > > wrote: > > > > > > > > > > > Hi Weijie and Xingtong, > > > > > > > > > > > > Thanks for the reply! Please see my comments below. > > > > > > > > > > > > > Does this mean if we want to support (KeyedStream, > > BroadcastStream) > > > > -> > > > > > > > (KeyedStream), we must make sure that no data can be output > upon > > > > > > processing > > > > > > > records from the input BroadcastStream? That's probably a > > > reasonable > > > > > > > limitation. > > > > > > > > > > > > I don't think that the requirement for supporting (KeyedStream, > > > > > > BroadcastStream) -> (KeyedStream) is that no data can be output > > upon > > > > > > processing the BroadcastStream. For instance, in the current > > > > > > `KeyedBroadcastProcessFunction`, we use Context#applyToKeyedState > > to > > > > > > produce output results, which can be keyed in the same manner as > > the > > > > > > keyed input stream, upon processing data from the > BroadcastStream. > > > > > > Therefore, I believe it only requires that the user must ensure > > that > > > > > > the output is keyed in the same way as the input, in this case, > the > > > > > > same way as the keyed input stream. I think this requirement is > > > > > > consistent with that of (KeyedStream, KeyedStream) -> > > (KeyedStream). > > > > > > Thus, I believe that supporting (KeyedStream, BroadcastStream) -> > > > > > > (KeyedStream) will not introduce complexity for the users. WDYT? > > > > > > > > > > > > Best regards, > > > > > > Xuannan > > > > > > > > > > > > > > > > > > On Tue, Jan 30, 2024 at 3:12 PM weijie guo < > > > guoweijieres...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > Hi Xintong, > > > > > > > > > > > > > > Thanks for your reply. > > > > > > > > > > > > > > > Does this mean if we want to support (KeyedStream, > > > BroadcastStream) > > > > > -> > > > > > > > (KeyedStream), we must make sure that no data can be output > upon > > > > > > processing > > > > > > > records from the input BroadcastStream? That's probably a > > > reasonable > > > > > > > limitation. > > > > > > > > > > > > > > I think so, this is the restriction that has to be imposed in > > order > > > > to > > > > > > > avoid re-partition(i.e. shuffle). > > > > > > > If one just want to get a keyed-stream and don't care about the > > > data > > > > > > > distribution, then explicit KeyBy partitioning works as > expected. > > > > > > > > > > > > > > > The problem is would this limitation be too implicit for the > > > users > > > > to > > > > > > > understand. > > > > > > > > > > > > > > Since we can't check for this limitation at compile time, if we > > > were > > > > to > > > > > > add > > > > > > > support for this case, we would have to introduce additional > > > runtime > > > > > > checks > > > > > > > to ensure program correctness. For now, I'm inclined not to > > support > > > > it, > > > > > > as > > > > > > > it's hard for users to understand this restriction unless we > have > > > > > > something > > > > > > > better. And we can always add it later if we do realize > there's a > > > > > strong > > > > > > > demand for it. > > > > > > > > > > > > > > > 1. I'd suggest renaming the method with timestamp to > something > > > like > > > > > > > `collectAndOverwriteTimestamp`. That might help users > understand > > > that > > > > > > they > > > > > > > don't always need to call this method, unless they explicitly > > want > > > to > > > > > > > overwrite the timestamp. > > > > > > > > > > > > > > Make sense, I have updated this FLIP toward this new method > name. > > > > > > > > > > > > > > > 2. While this method provides a way to set timestamps, how > > would > > > > > users > > > > > > > read > > > > > > > timestamps from the records? > > > > > > > > > > > > > > Ah, good point. I will introduce a new method to get the > > timestamp > > > of > > > > > the > > > > > > > current record in RuntimeContext. > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > Weijie > > > > > > > > > > > > > > > > > > > > > Xintong Song <tonysong...@gmail.com> 于2024年1月30日周二 14:04写道: > > > > > > > > > > > > > > > Just trying to understand. > > > > > > > > > > > > > > > > > Is there a particular reason we do not support a > > > > > > > > > `TwoInputProcessFunction` to combine a KeyedStream with a > > > > > > > > > BroadcastStream to result in a KeyedStream? There seems to > > be a > > > > > valid > > > > > > > > > use case where a KeyedStream is enriched with a > > BroadcastStream > > > > and > > > > > > > > > returns a Stream that is partitioned in the same way. > > > > > > > > > > > > > > > > > > > > > > > > > The key point here is that if the returned stream is a > > > > KeyedStream, > > > > > > we > > > > > > > > > require that the partition of input and output be the > same. > > As > > > > for > > > > > > the > > > > > > > > > data on the broadcast edge, it will be broadcast to all > > > > > parallelism, > > > > > > we > > > > > > > > > cannot keep the data partition consistent. For example, if > a > > > > > specific > > > > > > > > > record is sent to both SubTask1 and SubTask2, after > > processing, > > > > the > > > > > > > > > partition index calculated by the new KeySelector is `1`, > > then > > > > the > > > > > > data > > > > > > > > > distribution of SubTask2 has obviously changed. > > > > > > > > > > > > > > > > > > > > > > > > Does this mean if we want to support (KeyedStream, > > > BroadcastStream) > > > > > -> > > > > > > > > (KeyedStream), we must make sure that no data can be output > > upon > > > > > > processing > > > > > > > > records from the input BroadcastStream? That's probably a > > > > reasonable > > > > > > > > limitation. The problem is would this limitation be too > > implicit > > > > for > > > > > > the > > > > > > > > users to understand. > > > > > > > > > > > > > > > > > > > > > > > > > I noticed that there are two `collect` methods in the > > > Collector, > > > > > > > > > > > > > > > > > > one with a timestamp and one without. Could you elaborate > on > > > the > > > > > > > > > differences between them? Additionally, in what use case > > would > > > > one > > > > > > use > > > > > > > > > the method that includes the timestamp? > > > > > > > > > > > > > > > > > > > > > > > > > > > That's a good question, and it's mostly used with > > time-related > > > > > > operators > > > > > > > > > such as Window. First, we want to give the process function > > the > > > > > > ability > > > > > > > > to > > > > > > > > > reset timestamps, which makes it more flexible than the > > > original > > > > > > > > > API. Second, we don't want to take the timestamp extraction > > > > > > > > > operator/function as a base primitive, it's more like a > > > > high-level > > > > > > > > > extension. Therefore, the framework must provide this > > > > > functionality. > > > > > > > > > > > > > > > > > > > > > > > > > > 1. I'd suggest renaming the method with timestamp to > something > > > like > > > > > > > > `collectAndOverwriteTimestamp`. That might help users > > understand > > > > that > > > > > > they > > > > > > > > don't always need to call this method, unless they explicitly > > > want > > > > to > > > > > > > > overwrite the timestamp. > > > > > > > > > > > > > > > > 2. While this method provides a way to set timestamps, how > > would > > > > > users > > > > > > read > > > > > > > > timestamps from the records? > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > Xintong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jan 30, 2024 at 12:45 PM weijie guo < > > > > > guoweijieres...@gmail.com > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Xuannan, > > > > > > > > > > > > > > > > > > Thank you for your attention. > > > > > > > > > > > > > > > > > > > In the partitioning section, it says that "broadcast can > > only > > > > be > > > > > > > > > used as a side-input of other Inputs." Could you clarify > what > > > is > > > > > > meant > > > > > > > > > by "side-input"? If I understand correctly, it refer to one > > of > > > > the > > > > > > > > > inputs of the `TwoInputStreamProcessFunction`. If that is > the > > > > case, > > > > > > > > > the term "side-input" may not be accurate. > > > > > > > > > > > > > > > > > > Yes, you got it right! I have rewrote this sentence to > avoid > > > > > > > > > misunderstanding. > > > > > > > > > > > > > > > > > > > Is there a particular reason we do not support a > > > > > > > > > `TwoInputProcessFunction` to combine a KeyedStream with a > > > > > > > > > BroadcastStream to result in a KeyedStream? There seems to > > be a > > > > > valid > > > > > > > > > use case where a KeyedStream is enriched with a > > BroadcastStream > > > > and > > > > > > > > > returns a Stream that is partitioned in the same way. > > > > > > > > > > > > > > > > > > The key point here is that if the returned stream is a > > > > KeyedStream, > > > > > > we > > > > > > > > > require that the partition of input and output be the > same. > > As > > > > for > > > > > > the > > > > > > > > > data on the broadcast edge, it will be broadcast to all > > > > > parallelism, > > > > > > we > > > > > > > > > cannot keep the data partition consistent. For example, if > a > > > > > specific > > > > > > > > > record is sent to both SubTask1 and SubTask2, after > > processing, > > > > the > > > > > > > > > partition index calculated by the new KeySelector is `1`, > > then > > > > the > > > > > > data > > > > > > > > > distribution of SubTask2 has obviously changed. > > > > > > > > > > > > > > > > > > > 3. There appears to be a typo in the example code. The > > > > > > > > > `SingleStreamProcessFunction` should probably be > > > > > > > > > `OneInputStreamProcessFunction`. > > > > > > > > > > > > > > > > > > Yes, good catch. I have updated this FLIP. > > > > > > > > > > > > > > > > > > > 4. How do we set the global configuration for the > > > > > > > > > ExecutionEnvironment? Currently, we have the > > > > > > > > > > > > StreamExecutionEnvironment.getExecutionEnvironment(Configuration) > > > > > > > > > method to provide the global configuration in the API. > > > > > > > > > > > > > > > > > > This is because we don't want to allow set config > > > > programmatically > > > > > > in the > > > > > > > > > new API, everything best comes from configuration files. > > > However, > > > > > > this > > > > > > > > may > > > > > > > > > be too ideal, and the specific details need to be > considered > > > and > > > > > > > > discussed > > > > > > > > > in more detail, and I propose to devote a new sub-FLIP to > > this > > > > > issue > > > > > > > > later. > > > > > > > > > We can easily provide the > > > > `getExecutionEnvironment(Configuration)` > > > > > or > > > > > > > > > `withConfiguration(Configuration)` method later. > > > > > > > > > > > > > > > > > > > I noticed that there are two `collect` methods in the > > > > Collector, > > > > > > > > > one with a timestamp and one without. Could you elaborate > on > > > the > > > > > > > > > differences between them? Additionally, in what use case > > would > > > > one > > > > > > use > > > > > > > > > the method that includes the timestamp? > > > > > > > > > > > > > > > > > > That's a good question, and it's mostly used with > > time-related > > > > > > operators > > > > > > > > > such as Window. First, we want to give the process function > > the > > > > > > ability > > > > > > > > to > > > > > > > > > reset timestamps, which makes it more flexible than the > > > original > > > > > > > > > API. Second, we don't want to take the timestamp extraction > > > > > > > > > operator/function as a base primitive, it's more like a > > > > high-level > > > > > > > > > extension. Therefore, the framework must provide this > > > > > functionality. > > > > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > > > > > Weijie > > > > > > > > > > > > > > > > > > > > > > > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 > > 11:45写道: > > > > > > > > > > > > > > > > > > > Hi Yunfeng, > > > > > > > > > > > > > > > > > > > > Thank you for your attention > > > > > > > > > > > > > > > > > > > > > 1. Will we provide any API to support choosing which > > input > > > to > > > > > > consume > > > > > > > > > > between the two inputs of TwoInputStreamProcessFunction? > It > > > > would > > > > > > be > > > > > > > > > > helpful in online machine learning cases, where a process > > > > > function > > > > > > > > > > needs to receive the first machine learning model before > it > > > can > > > > > > start > > > > > > > > > > predictions on input data. Similar requirements might > also > > > > exist > > > > > in > > > > > > > > > > Flink CEP, where a rule set needs to be consumed by the > > > process > > > > > > > > > > function first before it can start matching the event > > stream > > > > > > against > > > > > > > > > > CEP patterns. > > > > > > > > > > > > > > > > > > > > Good point! I think we can provide a > `nextInputSelection()` > > > > > method > > > > > > for > > > > > > > > > > `TwoInputStreamProcessFunction`. It returns a > > ·First/Second· > > > > > enum > > > > > > that > > > > > > > > > > determines which Input the mailbox thread will read next. > > But > > > > I'm > > > > > > > > > > considering putting it in the sub-FLIP related to Join, > > since > > > > > > features > > > > > > > > > like > > > > > > > > > > HashJoin have a more specific need for this. > > > > > > > > > > > > > > > > > > > > > A typo might exist in the current FLIP describing the > API > > > to > > > > > > > > > > generate a global stream, as I can see either global() or > > > > > > coalesce() > > > > > > > > > > in different places of the FLIP. These two methods might > > need > > > > to > > > > > be > > > > > > > > > > unified into one method. > > > > > > > > > > > > > > > > > > > > Good catch! I have updated this FLIP to fix this typo. > > > > > > > > > > > > > > > > > > > > > The order of parameters in the current ProcessFunction > is > > > > > > (record, > > > > > > > > > > context, output), while this FLIP proposes to change the > > > order > > > > > into > > > > > > > > > > (record, output, context). Is there any reason to make > this > > > > > change? > > > > > > > > > > > > > > > > > > > > No, it's just the order we decide. But please note that > > there > > > > is > > > > > no > > > > > > > > > > relationship between the two ProcessFunction's anyway. I > > > think > > > > > it's > > > > > > > > okay > > > > > > > > > to > > > > > > > > > > use our own order of parameters in new API. > > > > > > > > > > > > > > > > > > > > 4. Why does this FLIP propose to use connectAndProcess() > > > > instead > > > > > of > > > > > > > > > > connect() (+ keyBy()) + process()? The latter looks > simpler > > > to > > > > > me. > > > > > > > > > > > > > > > > > > > > > I actually also considered this way at first, but it > > would > > > > have > > > > > > to > > > > > > > > > > introduce some concepts like ConnectedStreams. But we > hope > > > that > > > > > > streams > > > > > > > > > > will be more clearly defined in the DataStream API, > > otherwise > > > > we > > > > > > will > > > > > > > > end > > > > > > > > > > up going the same way as the original API, which you have > > to > > > > > > understand > > > > > > > > > > `JoinedStreams/ConnectedStreams` and so on. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > > > > > > > Weijie > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 > > > 11:20写道: > > > > > > > > > > > > > > > > > > > >> Hi Wencong: > > > > > > > > > >> > > > > > > > > > >> Thank you for your attention > > > > > > > > > >> > > > > > > > > > >> > Q1. Other DataStream types are converted into > > > > > > > > > >> Non-Keyed DataStreams by using a "shuffle" operation > > > > > > > > > >> to convert Input into output. Does this "shuffle" > include > > > the > > > > > > > > > >> various repartition operations > (rebalance/rescale/shuffle) > > > > > > > > > >> from DataStream V1? > > > > > > > > > >> > > > > > > > > > >> Yes, The name `shuffle` is used only to represent the > > > > > > transformation > > > > > > > > of > > > > > > > > > >> an arbitrary stream into a non-keyed partitioned stream > > and > > > > does > > > > > > not > > > > > > > > > >> restrict how the data is partitioned. > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > Q2. Why is the design for > > TwoOutputStreamProcessFunction, > > > > > > > > > >> when dealing with a KeyedStream, only outputting > > > combinations > > > > > > > > > >> of (Keyed + Keyed) and (Non-Keyed + Non-Keyed)? > > > > > > > > > >> > > > > > > > > > >> In theory, we could only provide functions that return > > > > Non-Keyed > > > > > > > > > streams. > > > > > > > > > >> If you do want a KeyedStream, you explicitly convert it > > to a > > > > > > > > KeyedStream > > > > > > > > > >> via keyBy. However, because sometimes data is processed > > > > without > > > > > > > > changing > > > > > > > > > >> the partition, we choose to provide an additional > > > KeyedStream > > > > > > > > > counterpart > > > > > > > > > >> to reduce the shuffle overhead. We didn't introduce the > > > > > non-keyed > > > > > > + > > > > > > > > > >> keyed combo here simply because it's not very common, > and > > if > > > > we > > > > > > really > > > > > > > > > see > > > > > > > > > >> a lot of users asking for it later on, it's easy to > > support > > > it > > > > > > then. > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> Best regards, > > > > > > > > > >> > > > > > > > > > >> Weijie > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> Xuannan Su <suxuanna...@gmail.com> 于2024年1月29日周一 > 18:28写道: > > > > > > > > > >> > > > > > > > > > >>> Hi Weijie, > > > > > > > > > >>> > > > > > > > > > >>> Thank you for driving the design of the new DataStream > > > API. I > > > > > > have a > > > > > > > > > >>> few questions regarding the FLIP: > > > > > > > > > >>> > > > > > > > > > >>> 1. In the partitioning section, it says that "broadcast > > can > > > > > only > > > > > > be > > > > > > > > > >>> used as a side-input of other Inputs." Could you > clarify > > > what > > > > > is > > > > > > > > meant > > > > > > > > > >>> by "side-input"? If I understand correctly, it refer to > > one > > > > of > > > > > > the > > > > > > > > > >>> inputs of the `TwoInputStreamProcessFunction`. If that > is > > > the > > > > > > case, > > > > > > > > > >>> the term "side-input" may not be accurate. > > > > > > > > > >>> > > > > > > > > > >>> 2. Is there a particular reason we do not support a > > > > > > > > > >>> `TwoInputProcessFunction` to combine a KeyedStream > with a > > > > > > > > > >>> BroadcastStream to result in a KeyedStream? There seems > > to > > > > be a > > > > > > valid > > > > > > > > > >>> use case where a KeyedStream is enriched with a > > > > BroadcastStream > > > > > > and > > > > > > > > > >>> returns a Stream that is partitioned in the same way. > > > > > > > > > >>> > > > > > > > > > >>> 3. There appears to be a typo in the example code. The > > > > > > > > > >>> `SingleStreamProcessFunction` should probably be > > > > > > > > > >>> `OneInputStreamProcessFunction`. > > > > > > > > > >>> > > > > > > > > > >>> 4. How do we set the global configuration for the > > > > > > > > > >>> ExecutionEnvironment? Currently, we have the > > > > > > > > > >>> > > > > > StreamExecutionEnvironment.getExecutionEnvironment(Configuration) > > > > > > > > > >>> method to provide the global configuration in the API. > > > > > > > > > >>> > > > > > > > > > >>> 5. I noticed that there are two `collect` methods in > the > > > > > > Collector, > > > > > > > > > >>> one with a timestamp and one without. Could you > elaborate > > > on > > > > > the > > > > > > > > > >>> differences between them? Additionally, in what use > case > > > > would > > > > > > one > > > > > > > > use > > > > > > > > > >>> the method that includes the timestamp? > > > > > > > > > >>> > > > > > > > > > >>> Best regards, > > > > > > > > > >>> Xuannan > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> On Fri, Jan 26, 2024 at 2:21 PM Yunfeng Zhou > > > > > > > > > >>> <flink.zhouyunf...@gmail.com> wrote: > > > > > > > > > >>> > > > > > > > > > > >>> > Hi Weijie, > > > > > > > > > >>> > > > > > > > > > > >>> > Thanks for raising discussions about the new > DataStream > > > > API. > > > > > I > > > > > > > > have a > > > > > > > > > >>> > few questions about the content of the FLIP. > > > > > > > > > >>> > > > > > > > > > > >>> > 1. Will we provide any API to support choosing which > > > input > > > > to > > > > > > > > consume > > > > > > > > > >>> > between the two inputs of > > TwoInputStreamProcessFunction? > > > It > > > > > > would > > > > > > > > be > > > > > > > > > >>> > helpful in online machine learning cases, where a > > process > > > > > > function > > > > > > > > > >>> > needs to receive the first machine learning model > > before > > > it > > > > > can > > > > > > > > start > > > > > > > > > >>> > predictions on input data. Similar requirements might > > > also > > > > > > exist in > > > > > > > > > >>> > Flink CEP, where a rule set needs to be consumed by > the > > > > > process > > > > > > > > > >>> > function first before it can start matching the event > > > > stream > > > > > > > > against > > > > > > > > > >>> > CEP patterns. > > > > > > > > > >>> > > > > > > > > > > >>> > 2. A typo might exist in the current FLIP describing > > the > > > > API > > > > > to > > > > > > > > > >>> > generate a global stream, as I can see either > global() > > or > > > > > > > > coalesce() > > > > > > > > > >>> > in different places of the FLIP. These two methods > > might > > > > need > > > > > > to be > > > > > > > > > >>> > unified into one method. > > > > > > > > > >>> > > > > > > > > > > >>> > 3. The order of parameters in the current > > ProcessFunction > > > > is > > > > > > > > (record, > > > > > > > > > >>> > context, output), while this FLIP proposes to change > > the > > > > > order > > > > > > into > > > > > > > > > >>> > (record, output, context). Is there any reason to > make > > > this > > > > > > change? > > > > > > > > > >>> > > > > > > > > > > >>> > 4. Why does this FLIP propose to use > > connectAndProcess() > > > > > > instead of > > > > > > > > > >>> > connect() (+ keyBy()) + process()? The latter looks > > > simpler > > > > > to > > > > > > me. > > > > > > > > > >>> > > > > > > > > > > >>> > Looking forward to discussing these questions with > you. > > > > > > > > > >>> > > > > > > > > > > >>> > Best regards, > > > > > > > > > >>> > Yunfeng Zhou > > > > > > > > > >>> > > > > > > > > > > >>> > On Tue, Dec 26, 2023 at 2:44 PM weijie guo < > > > > > > > > > guoweijieres...@gmail.com> > > > > > > > > > >>> wrote: > > > > > > > > > >>> > > > > > > > > > > > >>> > > Hi devs, > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > I'd like to start a discussion about FLIP-409: > > > DataStream > > > > > V2 > > > > > > > > > Building > > > > > > > > > >>> > > Blocks: DataStream, Partitioning and > ProcessFunction > > > [1]. > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > As the first sub-FLIP for DataStream API V2, we'd > > like > > > to > > > > > > discuss > > > > > > > > > and > > > > > > > > > >>> > > try to answer some of the most fundamental > questions > > in > > > > > > stream > > > > > > > > > >>> > > processing: > > > > > > > > > >>> > > > > > > > > > > > >>> > > 1. What kinds of data streams do we have? > > > > > > > > > >>> > > 2. How to partition data over the streams? > > > > > > > > > >>> > > 3. How to define a processing on the data > stream? > > > > > > > > > >>> > > > > > > > > > > > >>> > > The answer to these questions involve three core > > > > concepts: > > > > > > > > > >>> DataStream, > > > > > > > > > >>> > > Partitioning and ProcessFunction. In this FLIP, we > > will > > > > > > discuss > > > > > > > > the > > > > > > > > > >>> > > definitions and related API primitives of these > > > concepts > > > > in > > > > > > > > detail. > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > You can find more details in FLIP-409 [1]. This > > > sub-FLIP > > > > is > > > > > > at > > > > > > > > the > > > > > > > > > >>> > > heart of the entire DataStream API V2, and its > > > > relationship > > > > > > with > > > > > > > > > >>> other > > > > > > > > > >>> > > sub-FLIPs can be found in the umbrella FLIP [2]. > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > Looking forward to hearing from you, thanks! > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > Best regards, > > > > > > > > > >>> > > > > > > > > > > > >>> > > Weijie > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > [1] > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction > > > > > > > > > >>> > > > > > > > > > > > >>> > > [2] > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2 > > > > > > > > > >>> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >