Hi All,

Thanks for all the feedback.

If there are no more comments, I would like to start the vote thread,
thanks again!

Best regards,

Weijie


Xintong Song <tonysong...@gmail.com> 于2024年2月20日周二 14:17写道:

> Thanks for the updates. LGTM.
>
> Best,
>
> Xintong
>
>
>
> On Mon, Feb 19, 2024 at 10:51 AM weijie guo <guoweijieres...@gmail.com>
> wrote:
>
> > Thanks for the reply, Xintong.
> >
> > Based on your comments, I made the following changes to this FLIP:
> >
> > 1. Renaming `TwoInputStreamProcessFunction` and
> > `BroadcastTwoInputStreamProcessFunction` to
> > `TwoInputNonBroadcastStreamProcessFunction` and
> > `TwoInputBroadcastStreamProcessFunction`, respectively.
> >
> > 2. Making `NonPartitionedContext` extend `RuntimeContext`.
> >
> > > Some of these changes also affect FLIP-410. I noticed that FLIP-410 is
> > also updated accordingly. It would be nice to also mention those changes
> in
> > the FLIP-410 discussion thread.
> >
> > Yes, I've now mentioned those updates in the FLIP-410 discussion thread.
> >
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Xintong Song <tonysong...@gmail.com> 于2024年2月5日周一 10:58写道:
> >
> > > Thanks for updating the FLIP, Weijie.
> > >
> > > I think separating the TwoInputProcessFunction according to whether the
> > > input stream contains BroadcastStream makes sense.
> > >
> > > I have a few more comments.
> > > 1. I'd suggest the names `TwoInputNonBroadcastStreamProcessFunction`
> and
> > > `TwoInputBroadcastStreamProcessFunction` for the separated methods.
> > > 2. I'd suggest making `NonPartitionedContext` extend `RuntimeContext`.
> > > Otherwise, for all the functionalities that `RuntimeContext` provides,
> we
> > > need to duplicate them for `NonPartitionedContext`.
> > > 3. Some of these changes also affect FLIP-410. I noticed that FLIP-410
> is
> > > also updated accordingly. It would be nice to also mention those
> changes
> > in
> > > the FLIP-410 discussion thread.
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Sun, Feb 4, 2024 at 11:23 AM weijie guo <guoweijieres...@gmail.com>
> > > wrote:
> > >
> > > > Hi Xuannan and Xintong,
> > > >
> > > > Good point! After further consideration, I feel that we should make
> the
> > > > Broadcast + NonKeyed/Keyed process function different from the normal
> > > > TwoInputProcessFunction. Because the record from the broadcast input
> > > indeed
> > > > correspond to all partitions, while the record from the non-broadcast
> > > edge
> > > > have explicit partitions.
> > > >
> > > > When we consider the data of broadcast input, it is only valid to do
> > > > something on all the partitions at once, such as things like
> > > > `applyToKeyedState`. Similarly, other operations(e.g, endOfInput)
> that
> > do
> > > > not determine the current partition should also only be allowed to
> > > perform
> > > > on all partitions. This FLIP has been updated.
> > > >
> > > > Best regards,
> > > >
> > > > Weijie
> > > >
> > > >
> > > > Xintong Song <tonysong...@gmail.com> 于2024年2月1日周四 11:31写道:
> > > >
> > > > > OK, I see your point.
> > > > >
> > > > > I think the demand for updating states and emitting outputs upon
> > > > receiving
> > > > > a broadcast record makes sense. However, the way
> > > > > `KeyedBroadcastProcessFunction` supports this may not be optimal.
> > E.g.,
> > > > if
> > > > > `Collector#collect` is called in `processBroadcastElement` but
> > outside
> > > of
> > > > > `Context#applyToKeyedState`, the result can be undefined.
> > > > >
> > > > > Currently in this FLIP, a `TwoInputStreamProcessFunction` is not
> > aware
> > > of
> > > > > which input is KeyedStream and which is BroadcastStream, which
> makes
> > > > > supporting things like `applyToKeyedState` difficult. I think we
> can
> > > > > provide a built-in function similar to
> > `KeyedBroadcastProcessFunction`
> > > on
> > > > > top of `TwoInputStreamProcessFunction` to address this demand.
> > > > >
> > > > > WDYT?
> > > > >
> > > > >
> > > > > Best,
> > > > >
> > > > > Xintong
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Feb 1, 2024 at 10:41 AM Xuannan Su <suxuanna...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi Weijie and Xingtong,
> > > > > >
> > > > > > Thanks for the reply! Please see my comments below.
> > > > > >
> > > > > > > Does this mean if we want to support (KeyedStream,
> > BroadcastStream)
> > > > ->
> > > > > > > (KeyedStream), we must make sure that no data can be output
> upon
> > > > > > processing
> > > > > > > records from the input BroadcastStream? That's probably a
> > > reasonable
> > > > > > > limitation.
> > > > > >
> > > > > > I don't think that the requirement for supporting (KeyedStream,
> > > > > > BroadcastStream) -> (KeyedStream) is that no data can be output
> > upon
> > > > > > processing the BroadcastStream. For instance, in the current
> > > > > > `KeyedBroadcastProcessFunction`, we use Context#applyToKeyedState
> > to
> > > > > > produce output results, which can be keyed in the same manner as
> > the
> > > > > > keyed input stream, upon processing data from the
> BroadcastStream.
> > > > > > Therefore, I believe it only requires that the user must ensure
> > that
> > > > > > the output is keyed in the same way as the input, in this case,
> the
> > > > > > same way as the keyed input stream. I think this requirement is
> > > > > > consistent with that of (KeyedStream, KeyedStream) ->
> > (KeyedStream).
> > > > > > Thus, I believe that supporting (KeyedStream, BroadcastStream) ->
> > > > > > (KeyedStream) will not introduce complexity for the users. WDYT?
> > > > > >
> > > > > > Best regards,
> > > > > > Xuannan
> > > > > >
> > > > > >
> > > > > > On Tue, Jan 30, 2024 at 3:12 PM weijie guo <
> > > guoweijieres...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > Hi Xintong,
> > > > > > >
> > > > > > > Thanks for your reply.
> > > > > > >
> > > > > > > > Does this mean if we want to support (KeyedStream,
> > > BroadcastStream)
> > > > > ->
> > > > > > > (KeyedStream), we must make sure that no data can be output
> upon
> > > > > > processing
> > > > > > > records from the input BroadcastStream? That's probably a
> > > reasonable
> > > > > > > limitation.
> > > > > > >
> > > > > > > I think so, this is the restriction that has to be imposed in
> > order
> > > > to
> > > > > > > avoid re-partition(i.e. shuffle).
> > > > > > > If one just want to get a keyed-stream and don't care about the
> > > data
> > > > > > > distribution, then explicit KeyBy partitioning works as
> expected.
> > > > > > >
> > > > > > > > The problem is would this limitation be too implicit for the
> > > users
> > > > to
> > > > > > > understand.
> > > > > > >
> > > > > > > Since we can't check for this limitation at compile time, if we
> > > were
> > > > to
> > > > > > add
> > > > > > > support for this case, we would have to introduce additional
> > > runtime
> > > > > > checks
> > > > > > > to ensure program correctness. For now, I'm inclined not to
> > support
> > > > it,
> > > > > > as
> > > > > > > it's hard for users to understand this restriction unless we
> have
> > > > > > something
> > > > > > > better. And we can always add it later if we do realize
> there's a
> > > > > strong
> > > > > > > demand for it.
> > > > > > >
> > > > > > > > 1. I'd suggest renaming the method with timestamp to
> something
> > > like
> > > > > > > `collectAndOverwriteTimestamp`. That might help users
> understand
> > > that
> > > > > > they
> > > > > > > don't always need to call this method, unless they explicitly
> > want
> > > to
> > > > > > > overwrite the timestamp.
> > > > > > >
> > > > > > > Make sense, I have updated this FLIP toward this new method
> name.
> > > > > > >
> > > > > > > > 2. While this method provides a way to set timestamps, how
> > would
> > > > > users
> > > > > > > read
> > > > > > > timestamps from the records?
> > > > > > >
> > > > > > > Ah, good point. I will introduce a new method to get the
> > timestamp
> > > of
> > > > > the
> > > > > > > current record in RuntimeContext.
> > > > > > >
> > > > > > >
> > > > > > > Best regards,
> > > > > > >
> > > > > > > Weijie
> > > > > > >
> > > > > > >
> > > > > > > Xintong Song <tonysong...@gmail.com> 于2024年1月30日周二 14:04写道:
> > > > > > >
> > > > > > > > Just trying to understand.
> > > > > > > >
> > > > > > > > > Is there a particular reason we do not support a
> > > > > > > > > `TwoInputProcessFunction` to combine a KeyedStream with a
> > > > > > > > > BroadcastStream to result in a KeyedStream? There seems to
> > be a
> > > > > valid
> > > > > > > > > use case where a KeyedStream is enriched with a
> > BroadcastStream
> > > > and
> > > > > > > > > returns a Stream that is partitioned in the same way.
> > > > > > > >
> > > > > > > >
> > > > > > > > > The key point here is that if the returned stream is a
> > > > KeyedStream,
> > > > > > we
> > > > > > > > > require that the partition of  input and output be the
> same.
> > As
> > > > for
> > > > > > the
> > > > > > > > > data on the broadcast edge, it will be broadcast to all
> > > > > parallelism,
> > > > > > we
> > > > > > > > > cannot keep the data partition consistent. For example, if
> a
> > > > > specific
> > > > > > > > > record is sent to both SubTask1 and SubTask2, after
> > processing,
> > > > the
> > > > > > > > > partition index calculated by the new KeySelector is `1`,
> > then
> > > > the
> > > > > > data
> > > > > > > > > distribution of SubTask2 has obviously changed.
> > > > > > > >
> > > > > > > >
> > > > > > > > Does this mean if we want to support (KeyedStream,
> > > BroadcastStream)
> > > > > ->
> > > > > > > > (KeyedStream), we must make sure that no data can be output
> > upon
> > > > > > processing
> > > > > > > > records from the input BroadcastStream? That's probably a
> > > > reasonable
> > > > > > > > limitation. The problem is would this limitation be too
> > implicit
> > > > for
> > > > > > the
> > > > > > > > users to understand.
> > > > > > > >
> > > > > > > >
> > > > > > > > > I noticed that there are two `collect` methods in the
> > > Collector,
> > > > > > > > >
> > > > > > > > > one with a timestamp and one without. Could you elaborate
> on
> > > the
> > > > > > > > > differences between them? Additionally, in what use case
> > would
> > > > one
> > > > > > use
> > > > > > > > > the method that includes the timestamp?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > That's a good question, and it's mostly used with
> > time-related
> > > > > > operators
> > > > > > > > > such as Window. First, we want to give the process function
> > the
> > > > > > ability
> > > > > > > > to
> > > > > > > > > reset timestamps, which makes it more flexible than the
> > > original
> > > > > > > > > API. Second, we don't want to take the timestamp extraction
> > > > > > > > > operator/function as a base primitive, it's more like a
> > > > high-level
> > > > > > > > > extension. Therefore, the framework must provide this
> > > > > functionality.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > 1. I'd suggest renaming the method with timestamp to
> something
> > > like
> > > > > > > > `collectAndOverwriteTimestamp`. That might help users
> > understand
> > > > that
> > > > > > they
> > > > > > > > don't always need to call this method, unless they explicitly
> > > want
> > > > to
> > > > > > > > overwrite the timestamp.
> > > > > > > >
> > > > > > > > 2. While this method provides a way to set timestamps, how
> > would
> > > > > users
> > > > > > read
> > > > > > > > timestamps from the records?
> > > > > > > >
> > > > > > > >
> > > > > > > > Best,
> > > > > > > >
> > > > > > > > Xintong
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Jan 30, 2024 at 12:45 PM weijie guo <
> > > > > guoweijieres...@gmail.com
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Xuannan,
> > > > > > > > >
> > > > > > > > > Thank you for your attention.
> > > > > > > > >
> > > > > > > > > > In the partitioning section, it says that "broadcast can
> > only
> > > > be
> > > > > > > > > used as a side-input of other Inputs." Could you clarify
> what
> > > is
> > > > > > meant
> > > > > > > > > by "side-input"? If I understand correctly, it refer to one
> > of
> > > > the
> > > > > > > > > inputs of the `TwoInputStreamProcessFunction`. If that is
> the
> > > > case,
> > > > > > > > > the term "side-input" may not be accurate.
> > > > > > > > >
> > > > > > > > > Yes, you got it right! I have rewrote this sentence to
> avoid
> > > > > > > > > misunderstanding.
> > > > > > > > >
> > > > > > > > > > Is there a particular reason we do not support a
> > > > > > > > > `TwoInputProcessFunction` to combine a KeyedStream with a
> > > > > > > > > BroadcastStream to result in a KeyedStream? There seems to
> > be a
> > > > > valid
> > > > > > > > > use case where a KeyedStream is enriched with a
> > BroadcastStream
> > > > and
> > > > > > > > > returns a Stream that is partitioned in the same way.
> > > > > > > > >
> > > > > > > > > The key point here is that if the returned stream is a
> > > > KeyedStream,
> > > > > > we
> > > > > > > > > require that the partition of  input and output be the
> same.
> > As
> > > > for
> > > > > > the
> > > > > > > > > data on the broadcast edge, it will be broadcast to all
> > > > > parallelism,
> > > > > > we
> > > > > > > > > cannot keep the data partition consistent. For example, if
> a
> > > > > specific
> > > > > > > > > record is sent to both SubTask1 and SubTask2, after
> > processing,
> > > > the
> > > > > > > > > partition index calculated by the new KeySelector is `1`,
> > then
> > > > the
> > > > > > data
> > > > > > > > > distribution of SubTask2 has obviously changed.
> > > > > > > > >
> > > > > > > > > > 3. There appears to be a typo in the example code. The
> > > > > > > > > `SingleStreamProcessFunction` should probably be
> > > > > > > > > `OneInputStreamProcessFunction`.
> > > > > > > > >
> > > > > > > > > Yes, good catch. I have updated this FLIP.
> > > > > > > > >
> > > > > > > > > > 4. How do we set the global configuration for the
> > > > > > > > > ExecutionEnvironment? Currently, we have the
> > > > > > > > >
> > > StreamExecutionEnvironment.getExecutionEnvironment(Configuration)
> > > > > > > > > method to provide the global configuration in the API.
> > > > > > > > >
> > > > > > > > > This is because we don't want to allow set config
> > > > programmatically
> > > > > > in the
> > > > > > > > > new API, everything best comes from configuration files.
> > > However,
> > > > > > this
> > > > > > > > may
> > > > > > > > > be too ideal, and the specific details need to be
> considered
> > > and
> > > > > > > > discussed
> > > > > > > > > in more detail, and I propose to devote a new sub-FLIP to
> > this
> > > > > issue
> > > > > > > > later.
> > > > > > > > > We can easily provide the
> > > > `getExecutionEnvironment(Configuration)`
> > > > > or
> > > > > > > > > `withConfiguration(Configuration)` method later.
> > > > > > > > >
> > > > > > > > > > I noticed that there are two `collect` methods in the
> > > > Collector,
> > > > > > > > > one with a timestamp and one without. Could you elaborate
> on
> > > the
> > > > > > > > > differences between them? Additionally, in what use case
> > would
> > > > one
> > > > > > use
> > > > > > > > > the method that includes the timestamp?
> > > > > > > > >
> > > > > > > > > That's a good question, and it's mostly used with
> > time-related
> > > > > > operators
> > > > > > > > > such as Window. First, we want to give the process function
> > the
> > > > > > ability
> > > > > > > > to
> > > > > > > > > reset timestamps, which makes it more flexible than the
> > > original
> > > > > > > > > API. Second, we don't want to take the timestamp extraction
> > > > > > > > > operator/function as a base primitive, it's more like a
> > > > high-level
> > > > > > > > > extension. Therefore, the framework must provide this
> > > > > functionality.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Best regards,
> > > > > > > > >
> > > > > > > > > Weijie
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二
> > 11:45写道:
> > > > > > > > >
> > > > > > > > > > Hi Yunfeng,
> > > > > > > > > >
> > > > > > > > > > Thank you for your attention
> > > > > > > > > >
> > > > > > > > > > > 1. Will we provide any API to support choosing which
> > input
> > > to
> > > > > > consume
> > > > > > > > > > between the two inputs of TwoInputStreamProcessFunction?
> It
> > > > would
> > > > > > be
> > > > > > > > > > helpful in online machine learning cases, where a process
> > > > > function
> > > > > > > > > > needs to receive the first machine learning model before
> it
> > > can
> > > > > > start
> > > > > > > > > > predictions on input data. Similar requirements might
> also
> > > > exist
> > > > > in
> > > > > > > > > > Flink CEP, where a rule set needs to be consumed by the
> > > process
> > > > > > > > > > function first before it can start matching the event
> > stream
> > > > > > against
> > > > > > > > > > CEP patterns.
> > > > > > > > > >
> > > > > > > > > > Good point! I think we can provide a
> `nextInputSelection()`
> > > > > method
> > > > > > for
> > > > > > > > > > `TwoInputStreamProcessFunction`.  It returns a
> > ·First/Second·
> > > > > enum
> > > > > > that
> > > > > > > > > > determines which Input the mailbox thread will read next.
> > But
> > > > I'm
> > > > > > > > > > considering putting it in the sub-FLIP related to Join,
> > since
> > > > > > features
> > > > > > > > > like
> > > > > > > > > > HashJoin have a more specific need for this.
> > > > > > > > > >
> > > > > > > > > > > A typo might exist in the current FLIP describing the
> API
> > > to
> > > > > > > > > > generate a global stream, as I can see either global() or
> > > > > > coalesce()
> > > > > > > > > > in different places of the FLIP. These two methods might
> > need
> > > > to
> > > > > be
> > > > > > > > > > unified into one method.
> > > > > > > > > >
> > > > > > > > > > Good catch! I have updated this FLIP to fix this typo.
> > > > > > > > > >
> > > > > > > > > > > The order of parameters in the current ProcessFunction
> is
> > > > > > (record,
> > > > > > > > > > context, output), while this FLIP proposes to change the
> > > order
> > > > > into
> > > > > > > > > > (record, output, context). Is there any reason to make
> this
> > > > > change?
> > > > > > > > > >
> > > > > > > > > > No, it's just the order we decide. But please note that
> > there
> > > > is
> > > > > no
> > > > > > > > > > relationship between the two ProcessFunction's anyway. I
> > > think
> > > > > it's
> > > > > > > > okay
> > > > > > > > > to
> > > > > > > > > > use our own order of parameters in new API.
> > > > > > > > > >
> > > > > > > > > > 4. Why does this FLIP propose to use connectAndProcess()
> > > > instead
> > > > > of
> > > > > > > > > > connect() (+ keyBy()) + process()? The latter looks
> simpler
> > > to
> > > > > me.
> > > > > > > > > >
> > > > > > > > > > > I actually also considered this way at first, but it
> > would
> > > > have
> > > > > > to
> > > > > > > > > > introduce some concepts like ConnectedStreams. But we
> hope
> > > that
> > > > > > streams
> > > > > > > > > > will be more clearly defined in the DataStream API,
> > otherwise
> > > > we
> > > > > > will
> > > > > > > > end
> > > > > > > > > > up going the same way as the original API, which you have
> > to
> > > > > > understand
> > > > > > > > > > `JoinedStreams/ConnectedStreams` and so on.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Best regards,
> > > > > > > > > >
> > > > > > > > > > Weijie
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二
> > > 11:20写道:
> > > > > > > > > >
> > > > > > > > > >> Hi Wencong:
> > > > > > > > > >>
> > > > > > > > > >> Thank you for your attention
> > > > > > > > > >>
> > > > > > > > > >> > Q1. Other DataStream types are converted into
> > > > > > > > > >> Non-Keyed DataStreams by using a "shuffle" operation
> > > > > > > > > >> to convert Input into output. Does this "shuffle"
> include
> > > the
> > > > > > > > > >> various repartition operations
> (rebalance/rescale/shuffle)
> > > > > > > > > >> from DataStream V1?
> > > > > > > > > >>
> > > > > > > > > >> Yes, The name `shuffle` is used only to represent the
> > > > > > transformation
> > > > > > > > of
> > > > > > > > > >> an arbitrary stream into a non-keyed partitioned stream
> > and
> > > > does
> > > > > > not
> > > > > > > > > >> restrict how the data is partitioned.
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> > Q2. Why is the design for
> > TwoOutputStreamProcessFunction,
> > > > > > > > > >> when dealing with a KeyedStream, only outputting
> > > combinations
> > > > > > > > > >> of (Keyed + Keyed) and (Non-Keyed + Non-Keyed)?
> > > > > > > > > >>
> > > > > > > > > >> In theory, we could only provide functions that return
> > > > Non-Keyed
> > > > > > > > > streams.
> > > > > > > > > >> If you do want a KeyedStream, you explicitly convert it
> > to a
> > > > > > > > KeyedStream
> > > > > > > > > >> via keyBy. However, because sometimes data is processed
> > > > without
> > > > > > > > changing
> > > > > > > > > >> the partition, we choose to provide an additional
> > > KeyedStream
> > > > > > > > > counterpart
> > > > > > > > > >> to reduce the shuffle overhead. We didn't introduce the
> > > > > non-keyed
> > > > > > +
> > > > > > > > > >> keyed combo here simply because it's not very common,
> and
> > if
> > > > we
> > > > > > really
> > > > > > > > > see
> > > > > > > > > >> a lot of users asking for it later on, it's easy to
> > support
> > > it
> > > > > > then.
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> Best regards,
> > > > > > > > > >>
> > > > > > > > > >> Weijie
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> Xuannan Su <suxuanna...@gmail.com> 于2024年1月29日周一
> 18:28写道:
> > > > > > > > > >>
> > > > > > > > > >>> Hi Weijie,
> > > > > > > > > >>>
> > > > > > > > > >>> Thank you for driving the design of the new DataStream
> > > API. I
> > > > > > have a
> > > > > > > > > >>> few questions regarding the FLIP:
> > > > > > > > > >>>
> > > > > > > > > >>> 1. In the partitioning section, it says that "broadcast
> > can
> > > > > only
> > > > > > be
> > > > > > > > > >>> used as a side-input of other Inputs." Could you
> clarify
> > > what
> > > > > is
> > > > > > > > meant
> > > > > > > > > >>> by "side-input"? If I understand correctly, it refer to
> > one
> > > > of
> > > > > > the
> > > > > > > > > >>> inputs of the `TwoInputStreamProcessFunction`. If that
> is
> > > the
> > > > > > case,
> > > > > > > > > >>> the term "side-input" may not be accurate.
> > > > > > > > > >>>
> > > > > > > > > >>> 2. Is there a particular reason we do not support a
> > > > > > > > > >>> `TwoInputProcessFunction` to combine a KeyedStream
> with a
> > > > > > > > > >>> BroadcastStream to result in a KeyedStream? There seems
> > to
> > > > be a
> > > > > > valid
> > > > > > > > > >>> use case where a KeyedStream is enriched with a
> > > > BroadcastStream
> > > > > > and
> > > > > > > > > >>> returns a Stream that is partitioned in the same way.
> > > > > > > > > >>>
> > > > > > > > > >>> 3. There appears to be a typo in the example code. The
> > > > > > > > > >>> `SingleStreamProcessFunction` should probably be
> > > > > > > > > >>> `OneInputStreamProcessFunction`.
> > > > > > > > > >>>
> > > > > > > > > >>> 4. How do we set the global configuration for the
> > > > > > > > > >>> ExecutionEnvironment? Currently, we have the
> > > > > > > > > >>>
> > > > > StreamExecutionEnvironment.getExecutionEnvironment(Configuration)
> > > > > > > > > >>> method to provide the global configuration in the API.
> > > > > > > > > >>>
> > > > > > > > > >>> 5. I noticed that there are two `collect` methods in
> the
> > > > > > Collector,
> > > > > > > > > >>> one with a timestamp and one without. Could you
> elaborate
> > > on
> > > > > the
> > > > > > > > > >>> differences between them? Additionally, in what use
> case
> > > > would
> > > > > > one
> > > > > > > > use
> > > > > > > > > >>> the method that includes the timestamp?
> > > > > > > > > >>>
> > > > > > > > > >>> Best regards,
> > > > > > > > > >>> Xuannan
> > > > > > > > > >>>
> > > > > > > > > >>>
> > > > > > > > > >>>
> > > > > > > > > >>> On Fri, Jan 26, 2024 at 2:21 PM Yunfeng Zhou
> > > > > > > > > >>> <flink.zhouyunf...@gmail.com> wrote:
> > > > > > > > > >>> >
> > > > > > > > > >>> > Hi Weijie,
> > > > > > > > > >>> >
> > > > > > > > > >>> > Thanks for raising discussions about the new
> DataStream
> > > > API.
> > > > > I
> > > > > > > > have a
> > > > > > > > > >>> > few questions about the content of the FLIP.
> > > > > > > > > >>> >
> > > > > > > > > >>> > 1. Will we provide any API to support choosing which
> > > input
> > > > to
> > > > > > > > consume
> > > > > > > > > >>> > between the two inputs of
> > TwoInputStreamProcessFunction?
> > > It
> > > > > > would
> > > > > > > > be
> > > > > > > > > >>> > helpful in online machine learning cases, where a
> > process
> > > > > > function
> > > > > > > > > >>> > needs to receive the first machine learning model
> > before
> > > it
> > > > > can
> > > > > > > > start
> > > > > > > > > >>> > predictions on input data. Similar requirements might
> > > also
> > > > > > exist in
> > > > > > > > > >>> > Flink CEP, where a rule set needs to be consumed by
> the
> > > > > process
> > > > > > > > > >>> > function first before it can start matching the event
> > > > stream
> > > > > > > > against
> > > > > > > > > >>> > CEP patterns.
> > > > > > > > > >>> >
> > > > > > > > > >>> > 2. A typo might exist in the current FLIP describing
> > the
> > > > API
> > > > > to
> > > > > > > > > >>> > generate a global stream, as I can see either
> global()
> > or
> > > > > > > > coalesce()
> > > > > > > > > >>> > in different places of the FLIP. These two methods
> > might
> > > > need
> > > > > > to be
> > > > > > > > > >>> > unified into one method.
> > > > > > > > > >>> >
> > > > > > > > > >>> > 3. The order of parameters in the current
> > ProcessFunction
> > > > is
> > > > > > > > (record,
> > > > > > > > > >>> > context, output), while this FLIP proposes to change
> > the
> > > > > order
> > > > > > into
> > > > > > > > > >>> > (record, output, context). Is there any reason to
> make
> > > this
> > > > > > change?
> > > > > > > > > >>> >
> > > > > > > > > >>> > 4. Why does this FLIP propose to use
> > connectAndProcess()
> > > > > > instead of
> > > > > > > > > >>> > connect() (+ keyBy()) + process()? The latter looks
> > > simpler
> > > > > to
> > > > > > me.
> > > > > > > > > >>> >
> > > > > > > > > >>> > Looking forward to discussing these questions with
> you.
> > > > > > > > > >>> >
> > > > > > > > > >>> > Best regards,
> > > > > > > > > >>> > Yunfeng Zhou
> > > > > > > > > >>> >
> > > > > > > > > >>> > On Tue, Dec 26, 2023 at 2:44 PM weijie guo <
> > > > > > > > > guoweijieres...@gmail.com>
> > > > > > > > > >>> wrote:
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > Hi devs,
> > > > > > > > > >>> > >
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > I'd like to start a discussion about FLIP-409:
> > > DataStream
> > > > > V2
> > > > > > > > > Building
> > > > > > > > > >>> > > Blocks: DataStream, Partitioning and
> ProcessFunction
> > > [1].
> > > > > > > > > >>> > >
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > As the first sub-FLIP for DataStream API V2, we'd
> > like
> > > to
> > > > > > discuss
> > > > > > > > > and
> > > > > > > > > >>> > > try to answer some of the most fundamental
> questions
> > in
> > > > > > stream
> > > > > > > > > >>> > > processing:
> > > > > > > > > >>> > >
> > > > > > > > > >>> > >    1. What kinds of data streams do we have?
> > > > > > > > > >>> > >    2. How to partition data over the streams?
> > > > > > > > > >>> > >    3. How to define a processing on the data
> stream?
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > The answer to these questions involve three core
> > > > concepts:
> > > > > > > > > >>> DataStream,
> > > > > > > > > >>> > > Partitioning and ProcessFunction. In this FLIP, we
> > will
> > > > > > discuss
> > > > > > > > the
> > > > > > > > > >>> > > definitions and related API primitives of these
> > > concepts
> > > > in
> > > > > > > > detail.
> > > > > > > > > >>> > >
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > You can find more details in FLIP-409 [1]. This
> > > sub-FLIP
> > > > is
> > > > > > at
> > > > > > > > the
> > > > > > > > > >>> > > heart of the entire DataStream API V2, and its
> > > > relationship
> > > > > > with
> > > > > > > > > >>> other
> > > > > > > > > >>> > > sub-FLIPs can be found in the umbrella FLIP [2].
> > > > > > > > > >>> > >
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > Looking forward to hearing from you, thanks!
> > > > > > > > > >>> > >
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > Best regards,
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > Weijie
> > > > > > > > > >>> > >
> > > > > > > > > >>> > >
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > [1]
> > > > > > > > > >>> > >
> > > > > > > > > >>>
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > [2]
> > > > > > > > > >>> > >
> > > > > > > > > >>>
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2
> > > > > > > > > >>>
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to