> Could you list some needs from your point of view ?

I know the typical requirements about extending the native flink sql for
iceberg tables are:

1.  There are many table management operations in apache iceberg which can
be friendly if we can express in SQL for end users. such as:  expire
iceberg table snapshot, rewrite data files, rewrite metadata files, remove
orphan files etc.   In iceberg spark extension, we provide a  CALL
<procedure-name> ( arg, ...) extension syntax to execute those operations
in Spark SQL. See more details here [1];
2.  ALTER TABLE syntax.  There are some partition evolution syntax that
engines usually don't provide as native syntax.  See [2]
3.  MERGE INTO syntax as discussed above.

[1] https://iceberg.apache.org/docs/latest/spark-procedures/
[2]
https://github.com/apache/iceberg/blob/f5a753791f4dc6aca78569a14f731feda9edf462/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4#L70


On Mon, Feb 21, 2022 at 4:39 PM zhou chao <zhouchao...@hotmail.com> wrote:

> Hi, Openlnx,  FG and Jingsong, thanks for your attention about this issue.
> I am sorry for the late reply, I’ m working on the further job about the
> implement
> of merge statement, and now we support this in our internal for the basic
> SQL syntax.
>
> About: Implement our flink sql plugin/extensions which extends the core
> calcite sql.
> Going a step further, is it possible for us to achieve a better
> abstraction of the
> flink sql framework.
>
> AFAIK, some extensions has been done for FlinkSQL from calcite, but if we
> want more
> functionality and flexibility, such that greater risk and more workload
> will follow.  Someone
> knows flink and calcite well enough could give us some advise.
> I think it is great if flink has a better abstraction if sql framework, I
> know that iceberg
> implements the merge statement by extending logical rule from spark. Hi,
> openInx,
> could you list some needs from your point of view ?
>
> About: the implement of the merge statement
> I will explain how and why we need to rewrite the matched or not matched
> statement,
> I hope to help you know more about it, FG.
> Assume that there exist a target table and a source table with the
> following schema
> (a int, b bigint, c string)
> If the merge statement is
>
> |  merge into target t
> |  using source s
> |  on t.a = s.a
> |  when matched and s.c = ‘Flink’ then update set c = s.c
> |  when matched and s.c <> ’Flink’ then update set c = ‘Flink'
> |  when not matched then insert values(s.a, s.b, ‘Flink’)
>
> How to & why resolve before validation:
> 1. convert  the merge statement to
> select * from source s left outer join target t on t.a and s.a
> We choose left outer join as default, the inner or anti join could be seen
> as a optimizer and be
> used in logical rule.
> 2. convert the update statement to
> insert into t.a, t.b, s.c from (source s left outer join target t on t.a =
> s.a) where s.c = ‘Flink’
> We convert the update statement to the insert statement, because we need
> two inputs to update
> the target table , but the update statement only have one input, which can
> not meet the
> requirements. The same situation will occur in delete statement.
> Although it is tricky, we only need to rewrite the method rewriteMerge()
> and validateMerge() in
> SqlValidatorImpl, small changes for calcite aspects.
> 3. Then, we define a RelNode called MergeAction, which is the collection
> of all matched
> or not matched actions.
> Here is the AST:
> LogicalLegacySink(name=[`default_catalog`.`default_database`.`target`],
> fields=[a, b, c])
> +- LogicalMergeAction(
> action=[matched-0], op=[+U], expr#0..5=[{inputs}],
> expr#6=[_UTF-16LE'Flink':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"],
> expr#7=[=($t2, $t6)], a=[$t3], b=[$t4], c=[$t2], $condition=[$t7],
> action=[matched-1], op=[+U], expr#0..5=[{inputs}],
> expr#6=[_UTF-16LE'Flink'], expr#7=[_UTF-16LE'Flink':VARCHAR(2147483647)
> CHARACTER SET "UTF-16LE"], expr#8=[<>($t2, $t7)], a=[$t3], b=[$t4],
> EXPR$2=[$t6], $condition=[$t8],
> action=[not-matched-0], op=[+I], expr#0..5=[{inputs}],
> expr#6=[_UTF-16LE'Flink'], expr#7=[true], proj#0..1=[{exprs}],
> EXPR$2=[$t6], $condition=[$t7])
>    +- LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5])
>       +- LogicalJoin(condition=[=($3, $0)], joinType=[left])
>          :- LogicalTableScan(table=[[default_catalog, default_database,
> source, source: [CollectionTableSource(a, b, c)]]])
>          +- LogicalTableScan(table=[[default_catalog, default_database,
> target, source: [CollectionTableSource(a, b, c)]]])
>
> I am sorry that the doc is out of date, I will improve it as soon as
> possible.
>
> About: the merge statement for unbounded data
> I think it is very meaningful to support streaming 'merge into’ in flink,
> but now the biggest problem
> is the target table, which is used as source and sink table both. Maybe
> there are two ways we can go,
> one is introducing the third table as the sink table, the other is
> breaking the data loop, WDYS ?
>
>
>
> 2022年2月18日 下午2:03,Jingsong Li <jingsongl...@gmail.com<mailto:
> jingsongl...@gmail.com>> 写道:
>
> Hi zoucao,
>
> Thanks for your proposal. I believe this discussion will take us one
> step further for iceberg/hudi integration.
>
> ## `MERGE` for streaming
>
> I feel that `MERGE` is very good for stream computing. And, this is
> where our Flink can have an advantage over other computation systems.
>
> MERGE INTO target_table USING source_table;
>
> Under the semantics of stream computation, the incremental data of
> source_table is read in real time, and these incremental data are
> merged into the target_table in real time.
>
> I think this is a great capability, and further research and
> discussion is needed for detailed `MERGE INTO` streaming support.
>
> ## Calcite sql extensions
>
> I'm not sure that calcite is good at extending its SQL syntax, a worst
> case scenario is that Flink needs to maintain a version of calcite of
> its own.
>
> I would prefer that we try to push the syntax to the calcite community
> as much as possible unless we have a clear solution for sql syntax
> extensions.
>
> Best,
> Jingsong
>
> On Thu, Feb 17, 2022 at 12:53 AM Francesco Guardiani
> <france...@ververica.com<mailto:france...@ververica.com>> wrote:
>
> In the theory aspect, incremental data should be carefully considered for
> streaming data. In this situation,  the data flow from target_table to
> target_table
> will be a loop, and the incremental data with one key will keep going
> through
> the loop. It looks very strange.
>
> This is the same concern I have here, I don't see how MERGE can work in a
> streaming scenario without modifying its preliminary assumptions and
> semantics.
>
> Even assuming we put some hard constraint on the state size, for example
> requiring to specify a window definition (like in interval joins), I still
> think that the fundamental assumption of MERGE here is a problem: the
> target table is both a sink and a source. And I think this is a big issue,
> as we cannot reasonably assume that sink and sources are available for the
> same table definition or that they behave similarly.
>
> Also, talking about the batch implementation, I don't understand how you
> would implement this: from what I see in the "*validator*" paragraph of
> your document, you convert the merge statement to a bunch of other sql
> statements, but you omit the initial join, fundamental for the semantics of
> MERGE. Perhaps can you provide more details about it?
>
> On another note, I think we can take inspiration from MERGE and its "event
> driven" semantics, in order to have something that works both for batch and
> streaming, say a "Flink-ified" version of MERGE.
>
> For example, something that I can think of could be:
>
> PUSH TO target_table
> FROM source_table
> ON [window TVF]
> [when_clause [...]]
>
> Where when_clause looks like the ones from MERGE (looking at the pgsql).
> This has the window TVF constraint, so the state doesn't grow indefinitely,
> and the source_table is effectively any select you can think of, removing
> the assumption that the target is both a sink and a source. This statement
> at the end produces a changelog stream, pushed to the output table. A
> statement like this could then allow you to have something similar to the
> MERGE, just by replacing source_table with a select performing the join. Of
> course this is an example, and might not make much sense, but I hope it
> gives you the idea.
>
> FG
>
>
> On Mon, Feb 14, 2022 at 4:28 AM OpenInx <open...@gmail.com<mailto:
> open...@gmail.com>> wrote:
>
> I'm currently maintaining the iceberg flink modules from apache iceberg
> community.
>
> Currently, the spark has a great integration experience with iceberg format
> v2 in batch mode.  In this document [1],
> The merge into syntax from spark sql extensions does really help a lot when
> people want to change row-level data.
>
> We flink currently has a good integration with iceberg format v2 in
> streaming mode, I mean people can export their
> change log data into an iceberg table directly by writing a few sql.
> This[2] is a good material to read if anybody want to
> create a simple demo.
>
> But I'd say in the batch scenarios,  we flink sql currently lack few
> critical SQL syntax (for integrating iceberg format v2 in batch mode
> better):
> 1.  ALTER TABLE to change columns.
> 2.  UPDATE/DELETE sql to change the unexpected rows in a given table.
> 3.  MERGE INTO to merge a batch changing row set  (mixed with
> insert/delete/update) into the given table.
>
> In short, if we want to provide better integration and user experience with
> iceberg v2 in batch, then I think the support of the above syntax
> is very important (from iceberg perspective).
>
> I think it's better to make that time investment at Calcite's
> implementation before bringing this to Flink.
>
> I find that there are some sql syntax which are critical for flink sql
> while not for other generic sql parser.  Is it possible to implement our
> flink sql plugin/extensions which
> extends the core calcite sql. Going a step further, is it possible for us
> to achieve a better abstraction of the flink sql framework, so that
> downstream components can implement
> their own customized sql plugins based on this sql framework. In this way,
> it is possible to meet the needs of different components to add their own
> sql implementation on top of
> flink sql.
>
> [1]. https://iceberg.apache.org/docs/latest/spark-writes/#merge-into
> [2].
>
>
> https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/build-real-time-data-lake-tutorial.html
>
>
> On Fri, Feb 11, 2022 at 4:28 PM zhou chao <zhouchao...@hotmail.com> wrote:
>
> Hi, Martijn Visser, thanks for your reply. Firstly, I am sorry for
> posting
> the
> discussion twice. I sent the message to the dev mail group from an unsub-
> scribed account,  but the message was not shown for a while, and I
> guessed
> that
> the dev mail group would not post an email coming from an unsubscribed
> account, such that I sent it again from a subscribed account.
>
> Q: How would you see merge work for streaming data?
> I think this is an interesting topic, especially for Flink, which is
> wanting to unify
> the streaming & batch processing. Back to the merge statement, there
> exist
> two inputs, target_table and source_table(query). In the merge statement,
> source_table is used to correct the target_table's results and all rows
> in
> target_table only need to be corrected once, that's what the batch job
> does.
> In the theory aspect, incremental data should be carefully considered for
> streaming data. In this situation,  the data flow from target_table to
> target_table
> will be a loop, and the incremental data with one key will keep going
> through
> the loop. It looks very strange. So far, we have not received any user
> needs
> matching the merge statement for streaming data. I think that the topic
> for
> data streaming should be supported by user needs and use cases before
> talking about.
>
> I really agree that we should leverage Calcite, and push calcite to
> invest
> it,
> but now this feature does not get enough attention in calcite community.
> I
> found that some features for flink were also limited by calcite, such as
> FLINK-21714[1], but finally was fixed in flink side. Could you teach me
> how
> much effort we can usually afford in a situation like this?
>
>
> best,
> zoucao
>
> [1] https://issues.apache.org/jira/browse/FLINK-21714
>
>
> 2022年2月10日 下午4:09,Martijn Visser <mart...@ververica.com<mailto:
> mart...@ververica.com>> 写道:
>
> Hi zoucao,
>
> I see that this message was posted twice, so I choose to only reply to
> the
> latest one (this one). Thanks for bringing this up for discussion.
>
> I agree that support for a merge statement would be a welcome addition to
> Flink SQL for those that are using it for bounded jobs. How would you see
> merge work for streaming data?
>
> I do think that in order for Flink to properly support this, we should
> leverage Calcite for this. If there's no proper/full support for merge in
> Calcite, I don't think we should add this ourselves. I think the time
> investment and increase in technical debt doesn't outweigh the benefits
> that this would bring to Flink. If it's really that important, I think
> it's
> better to make that time investment at Calcite's implementation before
> bringing this to Flink.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
>
> On Wed, 9 Feb 2022 at 08:40, zhou chao <zhouchao...@hotmail.com> wrote:
>
> Hi, devs!
> Jingfeng and I would like to start a discussion about the MERGE
> statement,
> and the discussion consists of two parts. In the first part, we want to
> explore and collect the cases and motivations of the MERGE statement
> users.
> In the second part, we want to find out the possibility for Flink SQL to
> support the merge statement.
>
> Before driving the first topic, we want to introduce the definition and
> benefits of the merge statement. The MERGE statement in SQL is a very
> popular clause and it can handle inserts, updates, and deletes all in a
> single transaction without having to write separate logic for each of
> these.
> For each insert, update, or delete statement, we can specify conditions
> separately. Now, many Engine/DBs have supported this feature, for
> example,
> SQL Server[1], Spark[2], Hive[3],  pgSQL[4].
>
> Our use case:
> Order analysis & processing is one the most important scenario, but
> sometimes updated orders have a long time span compared with the last one
> with the same primary key, in the meanwhile, the states for this key have
> expired, such that the wrong Agg result will be achieved. In this
> situation, we use the merge statement in a batch job to correct the
> results, and now spark + iceberg is chosen in our internal. In the
> future,
> we want to unify the batch & streaming by using FlinkSQL in our internal,
> it would be better if Flink could support the merge statement. If you
> have
> other use cases and opinions, plz show us here.
>
> Now, calcite does not have good support for the merge statement, and
> there
> exists a Jira CALCITE-4338[5] to track. Could we support the merge
> statement relying on the limited support from calcite-1.26.0? I wrote a
> simple doc[6] to drive this, just want to find out the possibility for
> Flink SQL to support the merge statement.
>
> Looking forward to your feedback, thanks.
>
> best,
> zoucao
>
>
> [1]
>
>
>
> https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?redirectedfrom=MSDN&view=sql-server-ver15
> [2]https://issues.apache.org/jira/browse/SPARK-28893
> [3]
>
>
>
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge
> [4]https://www.postgresql.org/message-id/attachment/23520/sql-merge.html
> [5]https://issues.apache.org/jira/browse/CALCITE-4338
> [6]
>
>
>
> https://docs.google.com/document/d/12wwCqK6zfWGs84ijFZmGPJqCYfYHUPmfx5CvzUkVrw4/edit?usp=sharing
>
>
>
>
>

Reply via email to