Just trying to understand.

> Is there a particular reason we do not support a
> `TwoInputProcessFunction` to combine a KeyedStream with a
> BroadcastStream to result in a KeyedStream? There seems to be a valid
> use case where a KeyedStream is enriched with a BroadcastStream and
> returns a Stream that is partitioned in the same way.


> The key point here is that if the returned stream is a KeyedStream, we
> require that the partition of  input and output be the same. As for the
> data on the broadcast edge, it will be broadcast to all parallelism, we
> cannot keep the data partition consistent. For example, if a specific
> record is sent to both SubTask1 and SubTask2, after processing, the
> partition index calculated by the new KeySelector is `1`, then the data
> distribution of SubTask2 has obviously changed.


Does this mean if we want to support (KeyedStream, BroadcastStream) ->
(KeyedStream), we must make sure that no data can be output upon processing
records from the input BroadcastStream? That's probably a reasonable
limitation. The problem is would this limitation be too implicit for the
users to understand.


> I noticed that there are two `collect` methods in the Collector,
>
> one with a timestamp and one without. Could you elaborate on the
> differences between them? Additionally, in what use case would one use
> the method that includes the timestamp?
>
>
> That's a good question, and it's mostly used with time-related operators
> such as Window. First, we want to give the process function the ability to
> reset timestamps, which makes it more flexible than the original
> API. Second, we don't want to take the timestamp extraction
> operator/function as a base primitive, it's more like a high-level
> extension. Therefore, the framework must provide this functionality.
>
>
1. I'd suggest renaming the method with timestamp to something like
`collectAndOverwriteTimestamp`. That might help users understand that they
don't always need to call this method, unless they explicitly want to
overwrite the timestamp.

2. While this method provides a way to set timestamps, how would users read
timestamps from the records?


Best,

Xintong



On Tue, Jan 30, 2024 at 12:45 PM weijie guo <guoweijieres...@gmail.com>
wrote:

> Hi Xuannan,
>
> Thank you for your attention.
>
> > In the partitioning section, it says that "broadcast can only be
> used as a side-input of other Inputs." Could you clarify what is meant
> by "side-input"? If I understand correctly, it refer to one of the
> inputs of the `TwoInputStreamProcessFunction`. If that is the case,
> the term "side-input" may not be accurate.
>
> Yes, you got it right! I have rewrote this sentence to avoid
> misunderstanding.
>
> > Is there a particular reason we do not support a
> `TwoInputProcessFunction` to combine a KeyedStream with a
> BroadcastStream to result in a KeyedStream? There seems to be a valid
> use case where a KeyedStream is enriched with a BroadcastStream and
> returns a Stream that is partitioned in the same way.
>
> The key point here is that if the returned stream is a KeyedStream, we
> require that the partition of  input and output be the same. As for the
> data on the broadcast edge, it will be broadcast to all parallelism, we
> cannot keep the data partition consistent. For example, if a specific
> record is sent to both SubTask1 and SubTask2, after processing, the
> partition index calculated by the new KeySelector is `1`, then the data
> distribution of SubTask2 has obviously changed.
>
> > 3. There appears to be a typo in the example code. The
> `SingleStreamProcessFunction` should probably be
> `OneInputStreamProcessFunction`.
>
> Yes, good catch. I have updated this FLIP.
>
> > 4. How do we set the global configuration for the
> ExecutionEnvironment? Currently, we have the
> StreamExecutionEnvironment.getExecutionEnvironment(Configuration)
> method to provide the global configuration in the API.
>
> This is because we don't want to allow set config programmatically in the
> new API, everything best comes from configuration files. However, this may
> be too ideal, and the specific details need to be considered and discussed
> in more detail, and I propose to devote a new sub-FLIP to this issue later.
> We can easily provide the `getExecutionEnvironment(Configuration)` or
> `withConfiguration(Configuration)` method later.
>
> > I noticed that there are two `collect` methods in the Collector,
> one with a timestamp and one without. Could you elaborate on the
> differences between them? Additionally, in what use case would one use
> the method that includes the timestamp?
>
> That's a good question, and it's mostly used with time-related operators
> such as Window. First, we want to give the process function the ability to
> reset timestamps, which makes it more flexible than the original
> API. Second, we don't want to take the timestamp extraction
> operator/function as a base primitive, it's more like a high-level
> extension. Therefore, the framework must provide this functionality.
>
>
> Best regards,
>
> Weijie
>
>
> weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 11:45写道:
>
> > Hi Yunfeng,
> >
> > Thank you for your attention
> >
> > > 1. Will we provide any API to support choosing which input to consume
> > between the two inputs of TwoInputStreamProcessFunction? It would be
> > helpful in online machine learning cases, where a process function
> > needs to receive the first machine learning model before it can start
> > predictions on input data. Similar requirements might also exist in
> > Flink CEP, where a rule set needs to be consumed by the process
> > function first before it can start matching the event stream against
> > CEP patterns.
> >
> > Good point! I think we can provide a `nextInputSelection()` method for
> > `TwoInputStreamProcessFunction`.  It returns a ·First/Second· enum that
> > determines which Input the mailbox thread will read next. But I'm
> > considering putting it in the sub-FLIP related to Join, since features
> like
> > HashJoin have a more specific need for this.
> >
> > > A typo might exist in the current FLIP describing the API to
> > generate a global stream, as I can see either global() or coalesce()
> > in different places of the FLIP. These two methods might need to be
> > unified into one method.
> >
> > Good catch! I have updated this FLIP to fix this typo.
> >
> > > The order of parameters in the current ProcessFunction is (record,
> > context, output), while this FLIP proposes to change the order into
> > (record, output, context). Is there any reason to make this change?
> >
> > No, it's just the order we decide. But please note that there is no
> > relationship between the two ProcessFunction's anyway. I think it's okay
> to
> > use our own order of parameters in new API.
> >
> > 4. Why does this FLIP propose to use connectAndProcess() instead of
> > connect() (+ keyBy()) + process()? The latter looks simpler to me.
> >
> > > I actually also considered this way at first, but it would have to
> > introduce some concepts like ConnectedStreams. But we hope that streams
> > will be more clearly defined in the DataStream API, otherwise we will end
> > up going the same way as the original API, which you have to understand
> > `JoinedStreams/ConnectedStreams` and so on.
> >
> >
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 11:20写道:
> >
> >> Hi Wencong:
> >>
> >> Thank you for your attention
> >>
> >> > Q1. Other DataStream types are converted into
> >> Non-Keyed DataStreams by using a "shuffle" operation
> >> to convert Input into output. Does this "shuffle" include the
> >> various repartition operations (rebalance/rescale/shuffle)
> >> from DataStream V1?
> >>
> >> Yes, The name `shuffle` is used only to represent the transformation of
> >> an arbitrary stream into a non-keyed partitioned stream and does not
> >> restrict how the data is partitioned.
> >>
> >>
> >> > Q2. Why is the design for TwoOutputStreamProcessFunction,
> >> when dealing with a KeyedStream, only outputting combinations
> >> of (Keyed + Keyed) and (Non-Keyed + Non-Keyed)?
> >>
> >> In theory, we could only provide functions that return Non-Keyed
> streams.
> >> If you do want a KeyedStream, you explicitly convert it to a KeyedStream
> >> via keyBy. However, because sometimes data is processed without changing
> >> the partition, we choose to provide an additional KeyedStream
> counterpart
> >> to reduce the shuffle overhead. We didn't introduce the non-keyed +
> >> keyed combo here simply because it's not very common, and if we really
> see
> >> a lot of users asking for it later on, it's easy to support it then.
> >>
> >>
> >> Best regards,
> >>
> >> Weijie
> >>
> >>
> >> Xuannan Su <suxuanna...@gmail.com> 于2024年1月29日周一 18:28写道:
> >>
> >>> Hi Weijie,
> >>>
> >>> Thank you for driving the design of the new DataStream API. I have a
> >>> few questions regarding the FLIP:
> >>>
> >>> 1. In the partitioning section, it says that "broadcast can only be
> >>> used as a side-input of other Inputs." Could you clarify what is meant
> >>> by "side-input"? If I understand correctly, it refer to one of the
> >>> inputs of the `TwoInputStreamProcessFunction`. If that is the case,
> >>> the term "side-input" may not be accurate.
> >>>
> >>> 2. Is there a particular reason we do not support a
> >>> `TwoInputProcessFunction` to combine a KeyedStream with a
> >>> BroadcastStream to result in a KeyedStream? There seems to be a valid
> >>> use case where a KeyedStream is enriched with a BroadcastStream and
> >>> returns a Stream that is partitioned in the same way.
> >>>
> >>> 3. There appears to be a typo in the example code. The
> >>> `SingleStreamProcessFunction` should probably be
> >>> `OneInputStreamProcessFunction`.
> >>>
> >>> 4. How do we set the global configuration for the
> >>> ExecutionEnvironment? Currently, we have the
> >>> StreamExecutionEnvironment.getExecutionEnvironment(Configuration)
> >>> method to provide the global configuration in the API.
> >>>
> >>> 5. I noticed that there are two `collect` methods in the Collector,
> >>> one with a timestamp and one without. Could you elaborate on the
> >>> differences between them? Additionally, in what use case would one use
> >>> the method that includes the timestamp?
> >>>
> >>> Best regards,
> >>> Xuannan
> >>>
> >>>
> >>>
> >>> On Fri, Jan 26, 2024 at 2:21 PM Yunfeng Zhou
> >>> <flink.zhouyunf...@gmail.com> wrote:
> >>> >
> >>> > Hi Weijie,
> >>> >
> >>> > Thanks for raising discussions about the new DataStream API. I have a
> >>> > few questions about the content of the FLIP.
> >>> >
> >>> > 1. Will we provide any API to support choosing which input to consume
> >>> > between the two inputs of TwoInputStreamProcessFunction? It would be
> >>> > helpful in online machine learning cases, where a process function
> >>> > needs to receive the first machine learning model before it can start
> >>> > predictions on input data. Similar requirements might also exist in
> >>> > Flink CEP, where a rule set needs to be consumed by the process
> >>> > function first before it can start matching the event stream against
> >>> > CEP patterns.
> >>> >
> >>> > 2. A typo might exist in the current FLIP describing the API to
> >>> > generate a global stream, as I can see either global() or coalesce()
> >>> > in different places of the FLIP. These two methods might need to be
> >>> > unified into one method.
> >>> >
> >>> > 3. The order of parameters in the current ProcessFunction is (record,
> >>> > context, output), while this FLIP proposes to change the order into
> >>> > (record, output, context). Is there any reason to make this change?
> >>> >
> >>> > 4. Why does this FLIP propose to use connectAndProcess() instead of
> >>> > connect() (+ keyBy()) + process()? The latter looks simpler to me.
> >>> >
> >>> > Looking forward to discussing these questions with you.
> >>> >
> >>> > Best regards,
> >>> > Yunfeng Zhou
> >>> >
> >>> > On Tue, Dec 26, 2023 at 2:44 PM weijie guo <
> guoweijieres...@gmail.com>
> >>> wrote:
> >>> > >
> >>> > > Hi devs,
> >>> > >
> >>> > >
> >>> > > I'd like to start a discussion about FLIP-409: DataStream V2
> Building
> >>> > > Blocks: DataStream, Partitioning and ProcessFunction [1].
> >>> > >
> >>> > >
> >>> > > As the first sub-FLIP for DataStream API V2, we'd like to discuss
> and
> >>> > > try to answer some of the most fundamental questions in stream
> >>> > > processing:
> >>> > >
> >>> > >    1. What kinds of data streams do we have?
> >>> > >    2. How to partition data over the streams?
> >>> > >    3. How to define a processing on the data stream?
> >>> > >
> >>> > > The answer to these questions involve three core concepts:
> >>> DataStream,
> >>> > > Partitioning and ProcessFunction. In this FLIP, we will discuss the
> >>> > > definitions and related API primitives of these concepts in detail.
> >>> > >
> >>> > >
> >>> > > You can find more details in FLIP-409 [1]. This sub-FLIP is at the
> >>> > > heart of the entire DataStream API V2, and its relationship with
> >>> other
> >>> > > sub-FLIPs can be found in the umbrella FLIP [2].
> >>> > >
> >>> > >
> >>> > > Looking forward to hearing from you, thanks!
> >>> > >
> >>> > >
> >>> > > Best regards,
> >>> > >
> >>> > > Weijie
> >>> > >
> >>> > >
> >>> > >
> >>> > > [1]
> >>> > >
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction
> >>> > >
> >>> > > [2]
> >>> > >
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2
> >>>
> >>
>

Reply via email to