Hi Yunfeng,

Thank you for your attention

> 1. Will we provide any API to support choosing which input to consume
between the two inputs of TwoInputStreamProcessFunction? It would be
helpful in online machine learning cases, where a process function
needs to receive the first machine learning model before it can start
predictions on input data. Similar requirements might also exist in
Flink CEP, where a rule set needs to be consumed by the process
function first before it can start matching the event stream against
CEP patterns.

Good point! I think we can provide a `nextInputSelection()` method for
`TwoInputStreamProcessFunction`.  It returns a ·First/Second· enum that
determines which Input the mailbox thread will read next. But I'm
considering putting it in the sub-FLIP related to Join, since features like
HashJoin have a more specific need for this.

> A typo might exist in the current FLIP describing the API to
generate a global stream, as I can see either global() or coalesce()
in different places of the FLIP. These two methods might need to be
unified into one method.

Good catch! I have updated this FLIP to fix this typo.

> The order of parameters in the current ProcessFunction is (record,
context, output), while this FLIP proposes to change the order into
(record, output, context). Is there any reason to make this change?

No, it's just the order we decide. But please note that there is no
relationship between the two ProcessFunction's anyway. I think it's okay to
use our own order of parameters in new API.

4. Why does this FLIP propose to use connectAndProcess() instead of
connect() (+ keyBy()) + process()? The latter looks simpler to me.

> I actually also considered this way at first, but it would have to
introduce some concepts like ConnectedStreams. But we hope that streams
will be more clearly defined in the DataStream API, otherwise we will end
up going the same way as the original API, which you have to understand
`JoinedStreams/ConnectedStreams` and so on.



Best regards,

Weijie


weijie guo <guoweijieres...@gmail.com> 于2024年1月30日周二 11:20写道:

> Hi Wencong:
>
> Thank you for your attention
>
> > Q1. Other DataStream types are converted into
> Non-Keyed DataStreams by using a "shuffle" operation
> to convert Input into output. Does this "shuffle" include the
> various repartition operations (rebalance/rescale/shuffle)
> from DataStream V1?
>
> Yes, The name `shuffle` is used only to represent the transformation of an
> arbitrary stream into a non-keyed partitioned stream and does not restrict
> how the data is partitioned.
>
>
> > Q2. Why is the design for TwoOutputStreamProcessFunction,
> when dealing with a KeyedStream, only outputting combinations
> of (Keyed + Keyed) and (Non-Keyed + Non-Keyed)?
>
> In theory, we could only provide functions that return Non-Keyed streams.
> If you do want a KeyedStream, you explicitly convert it to a KeyedStream
> via keyBy. However, because sometimes data is processed without changing
> the partition, we choose to provide an additional KeyedStream counterpart
> to reduce the shuffle overhead. We didn't introduce the non-keyed + keyed
> combo here simply because it's not very common, and if we really see a lot
> of users asking for it later on, it's easy to support it then.
>
>
> Best regards,
>
> Weijie
>
>
> Xuannan Su <suxuanna...@gmail.com> 于2024年1月29日周一 18:28写道:
>
>> Hi Weijie,
>>
>> Thank you for driving the design of the new DataStream API. I have a
>> few questions regarding the FLIP:
>>
>> 1. In the partitioning section, it says that "broadcast can only be
>> used as a side-input of other Inputs." Could you clarify what is meant
>> by "side-input"? If I understand correctly, it refer to one of the
>> inputs of the `TwoInputStreamProcessFunction`. If that is the case,
>> the term "side-input" may not be accurate.
>>
>> 2. Is there a particular reason we do not support a
>> `TwoInputProcessFunction` to combine a KeyedStream with a
>> BroadcastStream to result in a KeyedStream? There seems to be a valid
>> use case where a KeyedStream is enriched with a BroadcastStream and
>> returns a Stream that is partitioned in the same way.
>>
>> 3. There appears to be a typo in the example code. The
>> `SingleStreamProcessFunction` should probably be
>> `OneInputStreamProcessFunction`.
>>
>> 4. How do we set the global configuration for the
>> ExecutionEnvironment? Currently, we have the
>> StreamExecutionEnvironment.getExecutionEnvironment(Configuration)
>> method to provide the global configuration in the API.
>>
>> 5. I noticed that there are two `collect` methods in the Collector,
>> one with a timestamp and one without. Could you elaborate on the
>> differences between them? Additionally, in what use case would one use
>> the method that includes the timestamp?
>>
>> Best regards,
>> Xuannan
>>
>>
>>
>> On Fri, Jan 26, 2024 at 2:21 PM Yunfeng Zhou
>> <flink.zhouyunf...@gmail.com> wrote:
>> >
>> > Hi Weijie,
>> >
>> > Thanks for raising discussions about the new DataStream API. I have a
>> > few questions about the content of the FLIP.
>> >
>> > 1. Will we provide any API to support choosing which input to consume
>> > between the two inputs of TwoInputStreamProcessFunction? It would be
>> > helpful in online machine learning cases, where a process function
>> > needs to receive the first machine learning model before it can start
>> > predictions on input data. Similar requirements might also exist in
>> > Flink CEP, where a rule set needs to be consumed by the process
>> > function first before it can start matching the event stream against
>> > CEP patterns.
>> >
>> > 2. A typo might exist in the current FLIP describing the API to
>> > generate a global stream, as I can see either global() or coalesce()
>> > in different places of the FLIP. These two methods might need to be
>> > unified into one method.
>> >
>> > 3. The order of parameters in the current ProcessFunction is (record,
>> > context, output), while this FLIP proposes to change the order into
>> > (record, output, context). Is there any reason to make this change?
>> >
>> > 4. Why does this FLIP propose to use connectAndProcess() instead of
>> > connect() (+ keyBy()) + process()? The latter looks simpler to me.
>> >
>> > Looking forward to discussing these questions with you.
>> >
>> > Best regards,
>> > Yunfeng Zhou
>> >
>> > On Tue, Dec 26, 2023 at 2:44 PM weijie guo <guoweijieres...@gmail.com>
>> wrote:
>> > >
>> > > Hi devs,
>> > >
>> > >
>> > > I'd like to start a discussion about FLIP-409: DataStream V2 Building
>> > > Blocks: DataStream, Partitioning and ProcessFunction [1].
>> > >
>> > >
>> > > As the first sub-FLIP for DataStream API V2, we'd like to discuss and
>> > > try to answer some of the most fundamental questions in stream
>> > > processing:
>> > >
>> > >    1. What kinds of data streams do we have?
>> > >    2. How to partition data over the streams?
>> > >    3. How to define a processing on the data stream?
>> > >
>> > > The answer to these questions involve three core concepts: DataStream,
>> > > Partitioning and ProcessFunction. In this FLIP, we will discuss the
>> > > definitions and related API primitives of these concepts in detail.
>> > >
>> > >
>> > > You can find more details in FLIP-409 [1]. This sub-FLIP is at the
>> > > heart of the entire DataStream API V2, and its relationship with other
>> > > sub-FLIPs can be found in the umbrella FLIP [2].
>> > >
>> > >
>> > > Looking forward to hearing from you, thanks!
>> > >
>> > >
>> > > Best regards,
>> > >
>> > > Weijie
>> > >
>> > >
>> > >
>> > > [1]
>> > >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction
>> > >
>> > > [2]
>> > >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2
>>
>

Reply via email to