> In the theory aspect, incremental data should be carefully considered for streaming data. In this situation, the data flow from target_table to target_table will be a loop, and the incremental data with one key will keep going through the loop. It looks very strange.
This is the same concern I have here, I don't see how MERGE can work in a streaming scenario without modifying its preliminary assumptions and semantics. Even assuming we put some hard constraint on the state size, for example requiring to specify a window definition (like in interval joins), I still think that the fundamental assumption of MERGE here is a problem: the target table is both a sink and a source. And I think this is a big issue, as we cannot reasonably assume that sink and sources are available for the same table definition or that they behave similarly. Also, talking about the batch implementation, I don't understand how you would implement this: from what I see in the "*validator*" paragraph of your document, you convert the merge statement to a bunch of other sql statements, but you omit the initial join, fundamental for the semantics of MERGE. Perhaps can you provide more details about it? On another note, I think we can take inspiration from MERGE and its "event driven" semantics, in order to have something that works both for batch and streaming, say a "Flink-ified" version of MERGE. For example, something that I can think of could be: PUSH TO target_table FROM source_table ON [window TVF] [when_clause [...]] Where when_clause looks like the ones from MERGE (looking at the pgsql). This has the window TVF constraint, so the state doesn't grow indefinitely, and the source_table is effectively any select you can think of, removing the assumption that the target is both a sink and a source. This statement at the end produces a changelog stream, pushed to the output table. A statement like this could then allow you to have something similar to the MERGE, just by replacing source_table with a select performing the join. Of course this is an example, and might not make much sense, but I hope it gives you the idea. FG On Mon, Feb 14, 2022 at 4:28 AM OpenInx <open...@gmail.com> wrote: > I'm currently maintaining the iceberg flink modules from apache iceberg > community. > > Currently, the spark has a great integration experience with iceberg format > v2 in batch mode. In this document [1], > The merge into syntax from spark sql extensions does really help a lot when > people want to change row-level data. > > We flink currently has a good integration with iceberg format v2 in > streaming mode, I mean people can export their > change log data into an iceberg table directly by writing a few sql. > This[2] is a good material to read if anybody want to > create a simple demo. > > But I'd say in the batch scenarios, we flink sql currently lack few > critical SQL syntax (for integrating iceberg format v2 in batch mode > better): > 1. ALTER TABLE to change columns. > 2. UPDATE/DELETE sql to change the unexpected rows in a given table. > 3. MERGE INTO to merge a batch changing row set (mixed with > insert/delete/update) into the given table. > > In short, if we want to provide better integration and user experience with > iceberg v2 in batch, then I think the support of the above syntax > is very important (from iceberg perspective). > > > I think it's better to make that time investment at Calcite's > implementation before bringing this to Flink. > > I find that there are some sql syntax which are critical for flink sql > while not for other generic sql parser. Is it possible to implement our > flink sql plugin/extensions which > extends the core calcite sql. Going a step further, is it possible for us > to achieve a better abstraction of the flink sql framework, so that > downstream components can implement > their own customized sql plugins based on this sql framework. In this way, > it is possible to meet the needs of different components to add their own > sql implementation on top of > flink sql. > > [1]. https://iceberg.apache.org/docs/latest/spark-writes/#merge-into > [2]. > > https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/build-real-time-data-lake-tutorial.html > > > On Fri, Feb 11, 2022 at 4:28 PM zhou chao <zhouchao...@hotmail.com> wrote: > > > Hi, Martijn Visser, thanks for your reply. Firstly, I am sorry for > posting > > the > > discussion twice. I sent the message to the dev mail group from an unsub- > > scribed account, but the message was not shown for a while, and I > guessed > > that > > the dev mail group would not post an email coming from an unsubscribed > > account, such that I sent it again from a subscribed account. > > > > Q: How would you see merge work for streaming data? > > I think this is an interesting topic, especially for Flink, which is > > wanting to unify > > the streaming & batch processing. Back to the merge statement, there > exist > > two inputs, target_table and source_table(query). In the merge statement, > > source_table is used to correct the target_table's results and all rows > in > > target_table only need to be corrected once, that's what the batch job > > does. > > In the theory aspect, incremental data should be carefully considered for > > streaming data. In this situation, the data flow from target_table to > > target_table > > will be a loop, and the incremental data with one key will keep going > > through > > the loop. It looks very strange. So far, we have not received any user > > needs > > matching the merge statement for streaming data. I think that the topic > for > > data streaming should be supported by user needs and use cases before > > talking about. > > > > I really agree that we should leverage Calcite, and push calcite to > invest > > it, > > but now this feature does not get enough attention in calcite community. > I > > found that some features for flink were also limited by calcite, such as > > FLINK-21714[1], but finally was fixed in flink side. Could you teach me > how > > much effort we can usually afford in a situation like this? > > > > > > best, > > zoucao > > > > [1] https://issues.apache.org/jira/browse/FLINK-21714 > > > > > > 2022年2月10日 下午4:09,Martijn Visser <mart...@ververica.com<mailto: > > mart...@ververica.com>> 写道: > > > > Hi zoucao, > > > > I see that this message was posted twice, so I choose to only reply to > the > > latest one (this one). Thanks for bringing this up for discussion. > > > > I agree that support for a merge statement would be a welcome addition to > > Flink SQL for those that are using it for bounded jobs. How would you see > > merge work for streaming data? > > > > I do think that in order for Flink to properly support this, we should > > leverage Calcite for this. If there's no proper/full support for merge in > > Calcite, I don't think we should add this ourselves. I think the time > > investment and increase in technical debt doesn't outweigh the benefits > > that this would bring to Flink. If it's really that important, I think > it's > > better to make that time investment at Calcite's implementation before > > bringing this to Flink. > > > > Best regards, > > > > Martijn Visser > > https://twitter.com/MartijnVisser82 > > > > > > On Wed, 9 Feb 2022 at 08:40, zhou chao <zhouchao...@hotmail.com> wrote: > > > > Hi, devs! > > Jingfeng and I would like to start a discussion about the MERGE > statement, > > and the discussion consists of two parts. In the first part, we want to > > explore and collect the cases and motivations of the MERGE statement > users. > > In the second part, we want to find out the possibility for Flink SQL to > > support the merge statement. > > > > Before driving the first topic, we want to introduce the definition and > > benefits of the merge statement. The MERGE statement in SQL is a very > > popular clause and it can handle inserts, updates, and deletes all in a > > single transaction without having to write separate logic for each of > > these. > > For each insert, update, or delete statement, we can specify conditions > > separately. Now, many Engine/DBs have supported this feature, for > example, > > SQL Server[1], Spark[2], Hive[3], pgSQL[4]. > > > > Our use case: > > Order analysis & processing is one the most important scenario, but > > sometimes updated orders have a long time span compared with the last one > > with the same primary key, in the meanwhile, the states for this key have > > expired, such that the wrong Agg result will be achieved. In this > > situation, we use the merge statement in a batch job to correct the > > results, and now spark + iceberg is chosen in our internal. In the > future, > > we want to unify the batch & streaming by using FlinkSQL in our internal, > > it would be better if Flink could support the merge statement. If you > have > > other use cases and opinions, plz show us here. > > > > Now, calcite does not have good support for the merge statement, and > there > > exists a Jira CALCITE-4338[5] to track. Could we support the merge > > statement relying on the limited support from calcite-1.26.0? I wrote a > > simple doc[6] to drive this, just want to find out the possibility for > > Flink SQL to support the merge statement. > > > > Looking forward to your feedback, thanks. > > > > best, > > zoucao > > > > > > [1] > > > > > https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?redirectedfrom=MSDN&view=sql-server-ver15 > > [2]https://issues.apache.org/jira/browse/SPARK-28893 > > [3] > > > > > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge > > [4]https://www.postgresql.org/message-id/attachment/23520/sql-merge.html > > [5]https://issues.apache.org/jira/browse/CALCITE-4338 > > [6] > > > > > https://docs.google.com/document/d/12wwCqK6zfWGs84ijFZmGPJqCYfYHUPmfx5CvzUkVrw4/edit?usp=sharing > > > > >