Hi zoucao, Thanks for your proposal. I believe this discussion will take us one step further for iceberg/hudi integration.
## `MERGE` for streaming I feel that `MERGE` is very good for stream computing. And, this is where our Flink can have an advantage over other computation systems. MERGE INTO target_table USING source_table; Under the semantics of stream computation, the incremental data of source_table is read in real time, and these incremental data are merged into the target_table in real time. I think this is a great capability, and further research and discussion is needed for detailed `MERGE INTO` streaming support. ## Calcite sql extensions I'm not sure that calcite is good at extending its SQL syntax, a worst case scenario is that Flink needs to maintain a version of calcite of its own. I would prefer that we try to push the syntax to the calcite community as much as possible unless we have a clear solution for sql syntax extensions. Best, Jingsong On Thu, Feb 17, 2022 at 12:53 AM Francesco Guardiani <france...@ververica.com> wrote: > > > In the theory aspect, incremental data should be carefully considered for > streaming data. In this situation, the data flow from target_table to > target_table > will be a loop, and the incremental data with one key will keep going > through > the loop. It looks very strange. > > This is the same concern I have here, I don't see how MERGE can work in a > streaming scenario without modifying its preliminary assumptions and > semantics. > > Even assuming we put some hard constraint on the state size, for example > requiring to specify a window definition (like in interval joins), I still > think that the fundamental assumption of MERGE here is a problem: the > target table is both a sink and a source. And I think this is a big issue, > as we cannot reasonably assume that sink and sources are available for the > same table definition or that they behave similarly. > > Also, talking about the batch implementation, I don't understand how you > would implement this: from what I see in the "*validator*" paragraph of > your document, you convert the merge statement to a bunch of other sql > statements, but you omit the initial join, fundamental for the semantics of > MERGE. Perhaps can you provide more details about it? > > On another note, I think we can take inspiration from MERGE and its "event > driven" semantics, in order to have something that works both for batch and > streaming, say a "Flink-ified" version of MERGE. > > For example, something that I can think of could be: > > PUSH TO target_table > FROM source_table > ON [window TVF] > [when_clause [...]] > > Where when_clause looks like the ones from MERGE (looking at the pgsql). > This has the window TVF constraint, so the state doesn't grow indefinitely, > and the source_table is effectively any select you can think of, removing > the assumption that the target is both a sink and a source. This statement > at the end produces a changelog stream, pushed to the output table. A > statement like this could then allow you to have something similar to the > MERGE, just by replacing source_table with a select performing the join. Of > course this is an example, and might not make much sense, but I hope it > gives you the idea. > > FG > > > On Mon, Feb 14, 2022 at 4:28 AM OpenInx <open...@gmail.com> wrote: > > > I'm currently maintaining the iceberg flink modules from apache iceberg > > community. > > > > Currently, the spark has a great integration experience with iceberg format > > v2 in batch mode. In this document [1], > > The merge into syntax from spark sql extensions does really help a lot when > > people want to change row-level data. > > > > We flink currently has a good integration with iceberg format v2 in > > streaming mode, I mean people can export their > > change log data into an iceberg table directly by writing a few sql. > > This[2] is a good material to read if anybody want to > > create a simple demo. > > > > But I'd say in the batch scenarios, we flink sql currently lack few > > critical SQL syntax (for integrating iceberg format v2 in batch mode > > better): > > 1. ALTER TABLE to change columns. > > 2. UPDATE/DELETE sql to change the unexpected rows in a given table. > > 3. MERGE INTO to merge a batch changing row set (mixed with > > insert/delete/update) into the given table. > > > > In short, if we want to provide better integration and user experience with > > iceberg v2 in batch, then I think the support of the above syntax > > is very important (from iceberg perspective). > > > > > I think it's better to make that time investment at Calcite's > > implementation before bringing this to Flink. > > > > I find that there are some sql syntax which are critical for flink sql > > while not for other generic sql parser. Is it possible to implement our > > flink sql plugin/extensions which > > extends the core calcite sql. Going a step further, is it possible for us > > to achieve a better abstraction of the flink sql framework, so that > > downstream components can implement > > their own customized sql plugins based on this sql framework. In this way, > > it is possible to meet the needs of different components to add their own > > sql implementation on top of > > flink sql. > > > > [1]. https://iceberg.apache.org/docs/latest/spark-writes/#merge-into > > [2]. > > > > https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/build-real-time-data-lake-tutorial.html > > > > > > On Fri, Feb 11, 2022 at 4:28 PM zhou chao <zhouchao...@hotmail.com> wrote: > > > > > Hi, Martijn Visser, thanks for your reply. Firstly, I am sorry for > > posting > > > the > > > discussion twice. I sent the message to the dev mail group from an unsub- > > > scribed account, but the message was not shown for a while, and I > > guessed > > > that > > > the dev mail group would not post an email coming from an unsubscribed > > > account, such that I sent it again from a subscribed account. > > > > > > Q: How would you see merge work for streaming data? > > > I think this is an interesting topic, especially for Flink, which is > > > wanting to unify > > > the streaming & batch processing. Back to the merge statement, there > > exist > > > two inputs, target_table and source_table(query). In the merge statement, > > > source_table is used to correct the target_table's results and all rows > > in > > > target_table only need to be corrected once, that's what the batch job > > > does. > > > In the theory aspect, incremental data should be carefully considered for > > > streaming data. In this situation, the data flow from target_table to > > > target_table > > > will be a loop, and the incremental data with one key will keep going > > > through > > > the loop. It looks very strange. So far, we have not received any user > > > needs > > > matching the merge statement for streaming data. I think that the topic > > for > > > data streaming should be supported by user needs and use cases before > > > talking about. > > > > > > I really agree that we should leverage Calcite, and push calcite to > > invest > > > it, > > > but now this feature does not get enough attention in calcite community. > > I > > > found that some features for flink were also limited by calcite, such as > > > FLINK-21714[1], but finally was fixed in flink side. Could you teach me > > how > > > much effort we can usually afford in a situation like this? > > > > > > > > > best, > > > zoucao > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-21714 > > > > > > > > > 2022年2月10日 下午4:09,Martijn Visser <mart...@ververica.com<mailto: > > > mart...@ververica.com>> 写道: > > > > > > Hi zoucao, > > > > > > I see that this message was posted twice, so I choose to only reply to > > the > > > latest one (this one). Thanks for bringing this up for discussion. > > > > > > I agree that support for a merge statement would be a welcome addition to > > > Flink SQL for those that are using it for bounded jobs. How would you see > > > merge work for streaming data? > > > > > > I do think that in order for Flink to properly support this, we should > > > leverage Calcite for this. If there's no proper/full support for merge in > > > Calcite, I don't think we should add this ourselves. I think the time > > > investment and increase in technical debt doesn't outweigh the benefits > > > that this would bring to Flink. If it's really that important, I think > > it's > > > better to make that time investment at Calcite's implementation before > > > bringing this to Flink. > > > > > > Best regards, > > > > > > Martijn Visser > > > https://twitter.com/MartijnVisser82 > > > > > > > > > On Wed, 9 Feb 2022 at 08:40, zhou chao <zhouchao...@hotmail.com> wrote: > > > > > > Hi, devs! > > > Jingfeng and I would like to start a discussion about the MERGE > > statement, > > > and the discussion consists of two parts. In the first part, we want to > > > explore and collect the cases and motivations of the MERGE statement > > users. > > > In the second part, we want to find out the possibility for Flink SQL to > > > support the merge statement. > > > > > > Before driving the first topic, we want to introduce the definition and > > > benefits of the merge statement. The MERGE statement in SQL is a very > > > popular clause and it can handle inserts, updates, and deletes all in a > > > single transaction without having to write separate logic for each of > > > these. > > > For each insert, update, or delete statement, we can specify conditions > > > separately. Now, many Engine/DBs have supported this feature, for > > example, > > > SQL Server[1], Spark[2], Hive[3], pgSQL[4]. > > > > > > Our use case: > > > Order analysis & processing is one the most important scenario, but > > > sometimes updated orders have a long time span compared with the last one > > > with the same primary key, in the meanwhile, the states for this key have > > > expired, such that the wrong Agg result will be achieved. In this > > > situation, we use the merge statement in a batch job to correct the > > > results, and now spark + iceberg is chosen in our internal. In the > > future, > > > we want to unify the batch & streaming by using FlinkSQL in our internal, > > > it would be better if Flink could support the merge statement. If you > > have > > > other use cases and opinions, plz show us here. > > > > > > Now, calcite does not have good support for the merge statement, and > > there > > > exists a Jira CALCITE-4338[5] to track. Could we support the merge > > > statement relying on the limited support from calcite-1.26.0? I wrote a > > > simple doc[6] to drive this, just want to find out the possibility for > > > Flink SQL to support the merge statement. > > > > > > Looking forward to your feedback, thanks. > > > > > > best, > > > zoucao > > > > > > > > > [1] > > > > > > > > https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?redirectedfrom=MSDN&view=sql-server-ver15 > > > [2]https://issues.apache.org/jira/browse/SPARK-28893 > > > [3] > > > > > > > > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge > > > [4]https://www.postgresql.org/message-id/attachment/23520/sql-merge.html > > > [5]https://issues.apache.org/jira/browse/CALCITE-4338 > > > [6] > > > > > > > > https://docs.google.com/document/d/12wwCqK6zfWGs84ijFZmGPJqCYfYHUPmfx5CvzUkVrw4/edit?usp=sharing > > > > > > > >