+1 for the FLIP >From my understanding, the fundamental difference between SQL, Table API and DataStream API is the programming model.
- SQL is declarative and relational. It describes what data is needed, rather than how data should be processed, leaving spaces for the planner to optimize the execution plan. Data are organized as relational tables (rows and columns). - Table API is similar to SQL, in terms of organizing data as relational tables. Theoretically, it can also be declarative, but AFAIK currently it's not. Compared to SQL, it has better support for imperative UDFs implemented in Java / Scala. - DataStream API, on the other hand, has a completely different programming model. It's completely imperative that users need to control every single step in how data is processed. Data are organized as streams of records, where records can be arbitrary types. To decide which API to use, users should first think about the programming model. Whether their business logics can be easily expressed with the relational table model? Do they want to define the data processing declaratively or imperatively? From that perspective, I don't think there's any blur or confusion. And even if there is, that probably indicates that we should work on better documentation to guide users through this. No matter which programming model the user chooses, Flink should provide as better programming experiences as possible. There might be some functionality overlaps, such as Join that is commonly needed in all above models, which is totally fine. Preventing DataStream APIs from providing easy-to-use Join supports just because Table API / SQl already have similar supports doesn't really make sense to me. In fact, as Xu has already mentioned, we have seen many users using Join in DataStream workloads. They choose DataStream over Table because their business logics are not suitable for a relational table model. E.g., event processing where different types of events may have different data structures, which cannot be efficiently represented by a unified table schema. Meantiem, they also have demands for joining events with other information based on e.g., user-id / device-id. Such use cases are very common according to my experiences in supporting our users. Those use cases would benefit from the built-in Join support, so that users don't need to worry about details like dealing with states, and can focus on the logic of handling the matched data. Best, Xintong On Tue, Jan 7, 2025 at 10:41 PM Xu Huang <huangxu.wal...@gmail.com> wrote: > Hi, Timo > > I agree that the DataStream API should provide primitives for stream > processing, which is why we have divided the functionality into fundamental > primitives and high-Level extensions in the umbrella FLIP [1]. I do not > agree that providing a Join on DataStream V1 is useless. Not all users are > willing, and some jobs are difficult to rewrite to SQL/Table. Additionally, > we have seen many DataStream jobs on Alibaba Cloud that use Join in varying > degrees. It's important to note that the the functionalities provided by > extensions can all be implemented through the Process Function in > DataStream V2. The purpose of providing extensions is to simplify the > repetitive work of writing complex logic for users. If users were to > implement similar logic using only the fundamental primitives, it would > waste a lot of their time. Over time, they would extract their own common > code, which would eventually converge with the extensions we provide. > Therefore, > we believe that providing such extensions is beneficial and harmless to > users. > > Furthermore, if extensions are placed in different modules, users would > need to import multiple extensions in their pom files when writing jobs, > which indirectly increases the complexity of writing jobs. I believe it is > sufficient to encapsulate all extensions in Built-In Functions rather than > in the fundamental primitives of the stream. Users will not be affected by > these extensions if they do not explicitly utilize them. > > > Best, > > Xu Huang > > [1] > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789682#FLIP408:[Umbrella]IntroduceDataStreamAPIV2-Fundamentalprimitives > > > Timo Walther <twal...@apache.org> 于2025年1月7日周二 18:38写道: > > > Hi Huang, > > > > I agree with Fabian that we should be careful with adding features to > > DataStream API V2. Providing it as an extension rather than a base API > > is definitely a good direction. Prepared built-in functions and > > algorithms should rather go into a lib module. > > > > Nevertheless, the DataStream API should focus on the primitives for > > stream processing. I'm not sure if the join implementation in DataStream > > API V1 was ever useful. Instead of digging into details of the join > > implementation or custom triggers, the past has shown that it is often > > easier for users to just understand ProcessFunction and configure it the > > way they like. Streaming joins are usually highly customized and use > > case specific. > > > > Regards, > > Timo > > > > > > > > > > On 06.01.25 12:35, Xu Huang wrote: > > > Hi, Fabian. > > > > > > Thanks for your question, let me elaborate: > > > > > > First of all what we provide is not Join operation in relational > algebra, > > > but the ability to join/combine two data streams. There will always be > > > cases where the user's inputs cannot be represented simply by the table > > > abstraction, but we can join them by some field of record on the > streams. > > > > > > Secondly, what we provide is just an extension and not the > base/primitive > > > semantics of DataStream V2(see the selection of "Fundamental > primitives" > > > and "High-Level Extensions" in FLIP-408[1]). I don't think this will > > > confuse users, providing such an optional extension is more like > > syntactic > > > sugar around the base api for users, even if we don't provide it, users > > > will find a way to do something similar. > > > > > > > > > Finally, the deprecated Flink DataSet API does support Join, DataStream > > V2 > > > carries the ability of DataStream V1 and DataSet. So we thought it > would > > > make sense to provide this capability in the form of an extension > rather > > > than a base API. > > > > > > > > > Best > > > > > > Xu Huang > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789682#FLIP408:[Umbrella]IntroduceDataStreamAPIV2-Fundamentalprimitives > > > > > > Fabian Paul <fp...@apache.org> 于2025年1月6日周一 16:59写道: > > > > > >> Hi Xu, > > >> > > >> Thanks for drafting the FLIP. I have a question regarding the > > >> motivation for the change. > > >> > > >> So far, the datastream API doesn't support any relational operations, > > >> and if users want to use joins/groupBy etc., they usually use SQL or > > >> the table API. > > >> With this FLIP, the difference between the APIs becomes more blurry, > > >> and users might be confused about the different join semantics of the > > >> API layers. > > >> > > >> I would like to know the arguments behind introducing the statements > > >> to the datastream API against using the table API. It should also be > > >> part of the FLIP to understand better how this change helps Flink > > >> users. > > >> > > >> Best, > > >> Fabian > > >> > > >> > > >> On Mon, Jan 6, 2025 at 9:25 AM Junrui Lee <jrlee....@gmail.com> > wrote: > > >>> > > >>> Hi Xu, > > >>> > > >>> Thanks for your work. I have a small question: In JoinExtension, > when a > > >>> record is received, is it immediately joined with all the data > received > > >>> from the other side, or does it wait until both streams are finished > > >> before > > >>> joining? Also, does it work on both bounded and unbounded streams? > > >>> > > >>> Xu Huang <huangxu.wal...@gmail.com> 于2025年1月4日周六 11:50写道: > > >>> > > >>>> Hi Devs, > > >>>> > > >>>> Weijie Guo and I would like to initiate a discussion about FLIP-500: > > >>>> Support Join Extension in DataStream V2 API [1]. > > >>>> > > >>>> In relational algebra, Join are used to co-group two datasets and > > >> combine > > >>>> the data based on specific conditions. For stream computing systems, > > >> the > > >>>> data of the two streams is cached (usually through State) when the > > Join > > >>>> operation is performed. When data from either stream arrives, it can > > be > > >>>> matched with data from another stream. Therefore, Join has been > widely > > >> used > > >>>> in multi-stream aggregate scenarios. > > >>>> > > >>>> To make it easy for users to use Join in DataStream V2, this FLIP > will > > >>>> implement the Join extension in DataStream V2. > > >>>> > > >>>> For more details, please refer to FLIP-500 [1]. We look forward to > > your > > >>>> feedback. > > >>>> > > >>>> > > >>>> Best, > > >>>> > > >>>> Xu Huang > > >>>> > > >>>> > > >>>> [1] https://cwiki.apache.org/confluence/x/ywz0Ew > > >>>> > > >> > > > > > > > >