Hi zoucao,

Thanks for your proposal. I believe this discussion will take us one
step further for iceberg/hudi integration.

## `MERGE` for streaming

I feel that `MERGE` is very good for stream computing. And, this is
where our Flink can have an advantage over other computation systems.

MERGE INTO target_table USING source_table;

Under the semantics of stream computation, the incremental data of
source_table is read in real time, and these incremental data are
merged into the target_table in real time.

I think this is a great capability, and further research and
discussion is needed for detailed `MERGE INTO` streaming support.

## Calcite sql extensions

I'm not sure that calcite is good at extending its SQL syntax, a worst
case scenario is that Flink needs to maintain a version of calcite of
its own.

I would prefer that we try to push the syntax to the calcite community
as much as possible unless we have a clear solution for sql syntax
extensions.

Best,
Jingsong

On Thu, Feb 17, 2022 at 12:53 AM Francesco Guardiani
<france...@ververica.com> wrote:
>
> > In the theory aspect, incremental data should be carefully considered for
> streaming data. In this situation,  the data flow from target_table to
> target_table
> will be a loop, and the incremental data with one key will keep going
> through
> the loop. It looks very strange.
>
> This is the same concern I have here, I don't see how MERGE can work in a
> streaming scenario without modifying its preliminary assumptions and
> semantics.
>
> Even assuming we put some hard constraint on the state size, for example
> requiring to specify a window definition (like in interval joins), I still
> think that the fundamental assumption of MERGE here is a problem: the
> target table is both a sink and a source. And I think this is a big issue,
> as we cannot reasonably assume that sink and sources are available for the
> same table definition or that they behave similarly.
>
> Also, talking about the batch implementation, I don't understand how you
> would implement this: from what I see in the "*validator*" paragraph of
> your document, you convert the merge statement to a bunch of other sql
> statements, but you omit the initial join, fundamental for the semantics of
> MERGE. Perhaps can you provide more details about it?
>
> On another note, I think we can take inspiration from MERGE and its "event
> driven" semantics, in order to have something that works both for batch and
> streaming, say a "Flink-ified" version of MERGE.
>
> For example, something that I can think of could be:
>
> PUSH TO target_table
> FROM source_table
> ON [window TVF]
> [when_clause [...]]
>
> Where when_clause looks like the ones from MERGE (looking at the pgsql).
> This has the window TVF constraint, so the state doesn't grow indefinitely,
> and the source_table is effectively any select you can think of, removing
> the assumption that the target is both a sink and a source. This statement
> at the end produces a changelog stream, pushed to the output table. A
> statement like this could then allow you to have something similar to the
> MERGE, just by replacing source_table with a select performing the join. Of
> course this is an example, and might not make much sense, but I hope it
> gives you the idea.
>
> FG
>
>
> On Mon, Feb 14, 2022 at 4:28 AM OpenInx <open...@gmail.com> wrote:
>
> > I'm currently maintaining the iceberg flink modules from apache iceberg
> > community.
> >
> > Currently, the spark has a great integration experience with iceberg format
> > v2 in batch mode.  In this document [1],
> > The merge into syntax from spark sql extensions does really help a lot when
> > people want to change row-level data.
> >
> > We flink currently has a good integration with iceberg format v2 in
> > streaming mode, I mean people can export their
> > change log data into an iceberg table directly by writing a few sql.
> > This[2] is a good material to read if anybody want to
> > create a simple demo.
> >
> > But I'd say in the batch scenarios,  we flink sql currently lack few
> > critical SQL syntax (for integrating iceberg format v2 in batch mode
> > better):
> > 1.  ALTER TABLE to change columns.
> > 2.  UPDATE/DELETE sql to change the unexpected rows in a given table.
> > 3.  MERGE INTO to merge a batch changing row set  (mixed with
> > insert/delete/update) into the given table.
> >
> > In short, if we want to provide better integration and user experience with
> > iceberg v2 in batch, then I think the support of the above syntax
> > is very important (from iceberg perspective).
> >
> > > I think it's better to make that time investment at Calcite's
> > implementation before bringing this to Flink.
> >
> > I find that there are some sql syntax which are critical for flink sql
> > while not for other generic sql parser.  Is it possible to implement our
> > flink sql plugin/extensions which
> > extends the core calcite sql. Going a step further, is it possible for us
> > to achieve a better abstraction of the flink sql framework, so that
> > downstream components can implement
> > their own customized sql plugins based on this sql framework. In this way,
> > it is possible to meet the needs of different components to add their own
> > sql implementation on top of
> > flink sql.
> >
> > [1]. https://iceberg.apache.org/docs/latest/spark-writes/#merge-into
> > [2].
> >
> > https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/build-real-time-data-lake-tutorial.html
> >
> >
> > On Fri, Feb 11, 2022 at 4:28 PM zhou chao <zhouchao...@hotmail.com> wrote:
> >
> > > Hi, Martijn Visser, thanks for your reply. Firstly, I am sorry for
> > posting
> > > the
> > > discussion twice. I sent the message to the dev mail group from an unsub-
> > > scribed account,  but the message was not shown for a while, and I
> > guessed
> > > that
> > > the dev mail group would not post an email coming from an unsubscribed
> > > account, such that I sent it again from a subscribed account.
> > >
> > > Q: How would you see merge work for streaming data?
> > > I think this is an interesting topic, especially for Flink, which is
> > > wanting to unify
> > > the streaming & batch processing. Back to the merge statement, there
> > exist
> > > two inputs, target_table and source_table(query). In the merge statement,
> > > source_table is used to correct the target_table's results and all rows
> > in
> > > target_table only need to be corrected once, that's what the batch job
> > > does.
> > > In the theory aspect, incremental data should be carefully considered for
> > > streaming data. In this situation,  the data flow from target_table to
> > > target_table
> > > will be a loop, and the incremental data with one key will keep going
> > > through
> > > the loop. It looks very strange. So far, we have not received any user
> > > needs
> > > matching the merge statement for streaming data. I think that the topic
> > for
> > > data streaming should be supported by user needs and use cases before
> > > talking about.
> > >
> > > I really agree that we should leverage Calcite, and push calcite to
> > invest
> > > it,
> > > but now this feature does not get enough attention in calcite community.
> > I
> > > found that some features for flink were also limited by calcite, such as
> > > FLINK-21714[1], but finally was fixed in flink side. Could you teach me
> > how
> > > much effort we can usually afford in a situation like this?
> > >
> > >
> > > best,
> > > zoucao
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-21714
> > >
> > >
> > > 2022年2月10日 下午4:09,Martijn Visser <mart...@ververica.com<mailto:
> > > mart...@ververica.com>> 写道:
> > >
> > > Hi zoucao,
> > >
> > > I see that this message was posted twice, so I choose to only reply to
> > the
> > > latest one (this one). Thanks for bringing this up for discussion.
> > >
> > > I agree that support for a merge statement would be a welcome addition to
> > > Flink SQL for those that are using it for bounded jobs. How would you see
> > > merge work for streaming data?
> > >
> > > I do think that in order for Flink to properly support this, we should
> > > leverage Calcite for this. If there's no proper/full support for merge in
> > > Calcite, I don't think we should add this ourselves. I think the time
> > > investment and increase in technical debt doesn't outweigh the benefits
> > > that this would bring to Flink. If it's really that important, I think
> > it's
> > > better to make that time investment at Calcite's implementation before
> > > bringing this to Flink.
> > >
> > > Best regards,
> > >
> > > Martijn Visser
> > > https://twitter.com/MartijnVisser82
> > >
> > >
> > > On Wed, 9 Feb 2022 at 08:40, zhou chao <zhouchao...@hotmail.com> wrote:
> > >
> > > Hi, devs!
> > > Jingfeng and I would like to start a discussion about the MERGE
> > statement,
> > > and the discussion consists of two parts. In the first part, we want to
> > > explore and collect the cases and motivations of the MERGE statement
> > users.
> > > In the second part, we want to find out the possibility for Flink SQL to
> > > support the merge statement.
> > >
> > > Before driving the first topic, we want to introduce the definition and
> > > benefits of the merge statement. The MERGE statement in SQL is a very
> > > popular clause and it can handle inserts, updates, and deletes all in a
> > > single transaction without having to write separate logic for each of
> > > these.
> > > For each insert, update, or delete statement, we can specify conditions
> > > separately. Now, many Engine/DBs have supported this feature, for
> > example,
> > > SQL Server[1], Spark[2], Hive[3],  pgSQL[4].
> > >
> > > Our use case:
> > > Order analysis & processing is one the most important scenario, but
> > > sometimes updated orders have a long time span compared with the last one
> > > with the same primary key, in the meanwhile, the states for this key have
> > > expired, such that the wrong Agg result will be achieved. In this
> > > situation, we use the merge statement in a batch job to correct the
> > > results, and now spark + iceberg is chosen in our internal. In the
> > future,
> > > we want to unify the batch & streaming by using FlinkSQL in our internal,
> > > it would be better if Flink could support the merge statement. If you
> > have
> > > other use cases and opinions, plz show us here.
> > >
> > > Now, calcite does not have good support for the merge statement, and
> > there
> > > exists a Jira CALCITE-4338[5] to track. Could we support the merge
> > > statement relying on the limited support from calcite-1.26.0? I wrote a
> > > simple doc[6] to drive this, just want to find out the possibility for
> > > Flink SQL to support the merge statement.
> > >
> > > Looking forward to your feedback, thanks.
> > >
> > > best,
> > > zoucao
> > >
> > >
> > > [1]
> > >
> > >
> > https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?redirectedfrom=MSDN&view=sql-server-ver15
> > > [2]https://issues.apache.org/jira/browse/SPARK-28893
> > > [3]
> > >
> > >
> > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge
> > > [4]https://www.postgresql.org/message-id/attachment/23520/sql-merge.html
> > > [5]https://issues.apache.org/jira/browse/CALCITE-4338
> > > [6]
> > >
> > >
> > https://docs.google.com/document/d/12wwCqK6zfWGs84ijFZmGPJqCYfYHUPmfx5CvzUkVrw4/edit?usp=sharing
> > >
> > >
> >

Reply via email to