> Could you list some needs from your point of view ? I know the typical requirements about extending the native flink sql for iceberg tables are:
1. There are many table management operations in apache iceberg which can be friendly if we can express in SQL for end users. such as: expire iceberg table snapshot, rewrite data files, rewrite metadata files, remove orphan files etc. In iceberg spark extension, we provide a CALL <procedure-name> ( arg, ...) extension syntax to execute those operations in Spark SQL. See more details here [1]; 2. ALTER TABLE syntax. There are some partition evolution syntax that engines usually don't provide as native syntax. See [2] 3. MERGE INTO syntax as discussed above. [1] https://iceberg.apache.org/docs/latest/spark-procedures/ [2] https://github.com/apache/iceberg/blob/f5a753791f4dc6aca78569a14f731feda9edf462/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4#L70 On Mon, Feb 21, 2022 at 4:39 PM zhou chao <zhouchao...@hotmail.com> wrote: > Hi, Openlnx, FG and Jingsong, thanks for your attention about this issue. > I am sorry for the late reply, I’ m working on the further job about the > implement > of merge statement, and now we support this in our internal for the basic > SQL syntax. > > About: Implement our flink sql plugin/extensions which extends the core > calcite sql. > Going a step further, is it possible for us to achieve a better > abstraction of the > flink sql framework. > > AFAIK, some extensions has been done for FlinkSQL from calcite, but if we > want more > functionality and flexibility, such that greater risk and more workload > will follow. Someone > knows flink and calcite well enough could give us some advise. > I think it is great if flink has a better abstraction if sql framework, I > know that iceberg > implements the merge statement by extending logical rule from spark. Hi, > openInx, > could you list some needs from your point of view ? > > About: the implement of the merge statement > I will explain how and why we need to rewrite the matched or not matched > statement, > I hope to help you know more about it, FG. > Assume that there exist a target table and a source table with the > following schema > (a int, b bigint, c string) > If the merge statement is > > | merge into target t > | using source s > | on t.a = s.a > | when matched and s.c = ‘Flink’ then update set c = s.c > | when matched and s.c <> ’Flink’ then update set c = ‘Flink' > | when not matched then insert values(s.a, s.b, ‘Flink’) > > How to & why resolve before validation: > 1. convert the merge statement to > select * from source s left outer join target t on t.a and s.a > We choose left outer join as default, the inner or anti join could be seen > as a optimizer and be > used in logical rule. > 2. convert the update statement to > insert into t.a, t.b, s.c from (source s left outer join target t on t.a = > s.a) where s.c = ‘Flink’ > We convert the update statement to the insert statement, because we need > two inputs to update > the target table , but the update statement only have one input, which can > not meet the > requirements. The same situation will occur in delete statement. > Although it is tricky, we only need to rewrite the method rewriteMerge() > and validateMerge() in > SqlValidatorImpl, small changes for calcite aspects. > 3. Then, we define a RelNode called MergeAction, which is the collection > of all matched > or not matched actions. > Here is the AST: > LogicalLegacySink(name=[`default_catalog`.`default_database`.`target`], > fields=[a, b, c]) > +- LogicalMergeAction( > action=[matched-0], op=[+U], expr#0..5=[{inputs}], > expr#6=[_UTF-16LE'Flink':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], > expr#7=[=($t2, $t6)], a=[$t3], b=[$t4], c=[$t2], $condition=[$t7], > action=[matched-1], op=[+U], expr#0..5=[{inputs}], > expr#6=[_UTF-16LE'Flink'], expr#7=[_UTF-16LE'Flink':VARCHAR(2147483647) > CHARACTER SET "UTF-16LE"], expr#8=[<>($t2, $t7)], a=[$t3], b=[$t4], > EXPR$2=[$t6], $condition=[$t8], > action=[not-matched-0], op=[+I], expr#0..5=[{inputs}], > expr#6=[_UTF-16LE'Flink'], expr#7=[true], proj#0..1=[{exprs}], > EXPR$2=[$t6], $condition=[$t7]) > +- LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5]) > +- LogicalJoin(condition=[=($3, $0)], joinType=[left]) > :- LogicalTableScan(table=[[default_catalog, default_database, > source, source: [CollectionTableSource(a, b, c)]]]) > +- LogicalTableScan(table=[[default_catalog, default_database, > target, source: [CollectionTableSource(a, b, c)]]]) > > I am sorry that the doc is out of date, I will improve it as soon as > possible. > > About: the merge statement for unbounded data > I think it is very meaningful to support streaming 'merge into’ in flink, > but now the biggest problem > is the target table, which is used as source and sink table both. Maybe > there are two ways we can go, > one is introducing the third table as the sink table, the other is > breaking the data loop, WDYS ? > > > > 2022年2月18日 下午2:03,Jingsong Li <jingsongl...@gmail.com<mailto: > jingsongl...@gmail.com>> 写道: > > Hi zoucao, > > Thanks for your proposal. I believe this discussion will take us one > step further for iceberg/hudi integration. > > ## `MERGE` for streaming > > I feel that `MERGE` is very good for stream computing. And, this is > where our Flink can have an advantage over other computation systems. > > MERGE INTO target_table USING source_table; > > Under the semantics of stream computation, the incremental data of > source_table is read in real time, and these incremental data are > merged into the target_table in real time. > > I think this is a great capability, and further research and > discussion is needed for detailed `MERGE INTO` streaming support. > > ## Calcite sql extensions > > I'm not sure that calcite is good at extending its SQL syntax, a worst > case scenario is that Flink needs to maintain a version of calcite of > its own. > > I would prefer that we try to push the syntax to the calcite community > as much as possible unless we have a clear solution for sql syntax > extensions. > > Best, > Jingsong > > On Thu, Feb 17, 2022 at 12:53 AM Francesco Guardiani > <france...@ververica.com<mailto:france...@ververica.com>> wrote: > > In the theory aspect, incremental data should be carefully considered for > streaming data. In this situation, the data flow from target_table to > target_table > will be a loop, and the incremental data with one key will keep going > through > the loop. It looks very strange. > > This is the same concern I have here, I don't see how MERGE can work in a > streaming scenario without modifying its preliminary assumptions and > semantics. > > Even assuming we put some hard constraint on the state size, for example > requiring to specify a window definition (like in interval joins), I still > think that the fundamental assumption of MERGE here is a problem: the > target table is both a sink and a source. And I think this is a big issue, > as we cannot reasonably assume that sink and sources are available for the > same table definition or that they behave similarly. > > Also, talking about the batch implementation, I don't understand how you > would implement this: from what I see in the "*validator*" paragraph of > your document, you convert the merge statement to a bunch of other sql > statements, but you omit the initial join, fundamental for the semantics of > MERGE. Perhaps can you provide more details about it? > > On another note, I think we can take inspiration from MERGE and its "event > driven" semantics, in order to have something that works both for batch and > streaming, say a "Flink-ified" version of MERGE. > > For example, something that I can think of could be: > > PUSH TO target_table > FROM source_table > ON [window TVF] > [when_clause [...]] > > Where when_clause looks like the ones from MERGE (looking at the pgsql). > This has the window TVF constraint, so the state doesn't grow indefinitely, > and the source_table is effectively any select you can think of, removing > the assumption that the target is both a sink and a source. This statement > at the end produces a changelog stream, pushed to the output table. A > statement like this could then allow you to have something similar to the > MERGE, just by replacing source_table with a select performing the join. Of > course this is an example, and might not make much sense, but I hope it > gives you the idea. > > FG > > > On Mon, Feb 14, 2022 at 4:28 AM OpenInx <open...@gmail.com<mailto: > open...@gmail.com>> wrote: > > I'm currently maintaining the iceberg flink modules from apache iceberg > community. > > Currently, the spark has a great integration experience with iceberg format > v2 in batch mode. In this document [1], > The merge into syntax from spark sql extensions does really help a lot when > people want to change row-level data. > > We flink currently has a good integration with iceberg format v2 in > streaming mode, I mean people can export their > change log data into an iceberg table directly by writing a few sql. > This[2] is a good material to read if anybody want to > create a simple demo. > > But I'd say in the batch scenarios, we flink sql currently lack few > critical SQL syntax (for integrating iceberg format v2 in batch mode > better): > 1. ALTER TABLE to change columns. > 2. UPDATE/DELETE sql to change the unexpected rows in a given table. > 3. MERGE INTO to merge a batch changing row set (mixed with > insert/delete/update) into the given table. > > In short, if we want to provide better integration and user experience with > iceberg v2 in batch, then I think the support of the above syntax > is very important (from iceberg perspective). > > I think it's better to make that time investment at Calcite's > implementation before bringing this to Flink. > > I find that there are some sql syntax which are critical for flink sql > while not for other generic sql parser. Is it possible to implement our > flink sql plugin/extensions which > extends the core calcite sql. Going a step further, is it possible for us > to achieve a better abstraction of the flink sql framework, so that > downstream components can implement > their own customized sql plugins based on this sql framework. In this way, > it is possible to meet the needs of different components to add their own > sql implementation on top of > flink sql. > > [1]. https://iceberg.apache.org/docs/latest/spark-writes/#merge-into > [2]. > > > https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/build-real-time-data-lake-tutorial.html > > > On Fri, Feb 11, 2022 at 4:28 PM zhou chao <zhouchao...@hotmail.com> wrote: > > Hi, Martijn Visser, thanks for your reply. Firstly, I am sorry for > posting > the > discussion twice. I sent the message to the dev mail group from an unsub- > scribed account, but the message was not shown for a while, and I > guessed > that > the dev mail group would not post an email coming from an unsubscribed > account, such that I sent it again from a subscribed account. > > Q: How would you see merge work for streaming data? > I think this is an interesting topic, especially for Flink, which is > wanting to unify > the streaming & batch processing. Back to the merge statement, there > exist > two inputs, target_table and source_table(query). In the merge statement, > source_table is used to correct the target_table's results and all rows > in > target_table only need to be corrected once, that's what the batch job > does. > In the theory aspect, incremental data should be carefully considered for > streaming data. In this situation, the data flow from target_table to > target_table > will be a loop, and the incremental data with one key will keep going > through > the loop. It looks very strange. So far, we have not received any user > needs > matching the merge statement for streaming data. I think that the topic > for > data streaming should be supported by user needs and use cases before > talking about. > > I really agree that we should leverage Calcite, and push calcite to > invest > it, > but now this feature does not get enough attention in calcite community. > I > found that some features for flink were also limited by calcite, such as > FLINK-21714[1], but finally was fixed in flink side. Could you teach me > how > much effort we can usually afford in a situation like this? > > > best, > zoucao > > [1] https://issues.apache.org/jira/browse/FLINK-21714 > > > 2022年2月10日 下午4:09,Martijn Visser <mart...@ververica.com<mailto: > mart...@ververica.com>> 写道: > > Hi zoucao, > > I see that this message was posted twice, so I choose to only reply to > the > latest one (this one). Thanks for bringing this up for discussion. > > I agree that support for a merge statement would be a welcome addition to > Flink SQL for those that are using it for bounded jobs. How would you see > merge work for streaming data? > > I do think that in order for Flink to properly support this, we should > leverage Calcite for this. If there's no proper/full support for merge in > Calcite, I don't think we should add this ourselves. I think the time > investment and increase in technical debt doesn't outweigh the benefits > that this would bring to Flink. If it's really that important, I think > it's > better to make that time investment at Calcite's implementation before > bringing this to Flink. > > Best regards, > > Martijn Visser > https://twitter.com/MartijnVisser82 > > > On Wed, 9 Feb 2022 at 08:40, zhou chao <zhouchao...@hotmail.com> wrote: > > Hi, devs! > Jingfeng and I would like to start a discussion about the MERGE > statement, > and the discussion consists of two parts. In the first part, we want to > explore and collect the cases and motivations of the MERGE statement > users. > In the second part, we want to find out the possibility for Flink SQL to > support the merge statement. > > Before driving the first topic, we want to introduce the definition and > benefits of the merge statement. The MERGE statement in SQL is a very > popular clause and it can handle inserts, updates, and deletes all in a > single transaction without having to write separate logic for each of > these. > For each insert, update, or delete statement, we can specify conditions > separately. Now, many Engine/DBs have supported this feature, for > example, > SQL Server[1], Spark[2], Hive[3], pgSQL[4]. > > Our use case: > Order analysis & processing is one the most important scenario, but > sometimes updated orders have a long time span compared with the last one > with the same primary key, in the meanwhile, the states for this key have > expired, such that the wrong Agg result will be achieved. In this > situation, we use the merge statement in a batch job to correct the > results, and now spark + iceberg is chosen in our internal. In the > future, > we want to unify the batch & streaming by using FlinkSQL in our internal, > it would be better if Flink could support the merge statement. If you > have > other use cases and opinions, plz show us here. > > Now, calcite does not have good support for the merge statement, and > there > exists a Jira CALCITE-4338[5] to track. Could we support the merge > statement relying on the limited support from calcite-1.26.0? I wrote a > simple doc[6] to drive this, just want to find out the possibility for > Flink SQL to support the merge statement. > > Looking forward to your feedback, thanks. > > best, > zoucao > > > [1] > > > > https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?redirectedfrom=MSDN&view=sql-server-ver15 > [2]https://issues.apache.org/jira/browse/SPARK-28893 > [3] > > > > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge > [4]https://www.postgresql.org/message-id/attachment/23520/sql-merge.html > [5]https://issues.apache.org/jira/browse/CALCITE-4338 > [6] > > > > https://docs.google.com/document/d/12wwCqK6zfWGs84ijFZmGPJqCYfYHUPmfx5CvzUkVrw4/edit?usp=sharing > > > > >