> In the theory aspect, incremental data should be carefully considered for
streaming data. In this situation,  the data flow from target_table to
target_table
will be a loop, and the incremental data with one key will keep going
through
the loop. It looks very strange.

This is the same concern I have here, I don't see how MERGE can work in a
streaming scenario without modifying its preliminary assumptions and
semantics.

Even assuming we put some hard constraint on the state size, for example
requiring to specify a window definition (like in interval joins), I still
think that the fundamental assumption of MERGE here is a problem: the
target table is both a sink and a source. And I think this is a big issue,
as we cannot reasonably assume that sink and sources are available for the
same table definition or that they behave similarly.

Also, talking about the batch implementation, I don't understand how you
would implement this: from what I see in the "*validator*" paragraph of
your document, you convert the merge statement to a bunch of other sql
statements, but you omit the initial join, fundamental for the semantics of
MERGE. Perhaps can you provide more details about it?

On another note, I think we can take inspiration from MERGE and its "event
driven" semantics, in order to have something that works both for batch and
streaming, say a "Flink-ified" version of MERGE.

For example, something that I can think of could be:

PUSH TO target_table
FROM source_table
ON [window TVF]
[when_clause [...]]

Where when_clause looks like the ones from MERGE (looking at the pgsql).
This has the window TVF constraint, so the state doesn't grow indefinitely,
and the source_table is effectively any select you can think of, removing
the assumption that the target is both a sink and a source. This statement
at the end produces a changelog stream, pushed to the output table. A
statement like this could then allow you to have something similar to the
MERGE, just by replacing source_table with a select performing the join. Of
course this is an example, and might not make much sense, but I hope it
gives you the idea.

FG


On Mon, Feb 14, 2022 at 4:28 AM OpenInx <open...@gmail.com> wrote:

> I'm currently maintaining the iceberg flink modules from apache iceberg
> community.
>
> Currently, the spark has a great integration experience with iceberg format
> v2 in batch mode.  In this document [1],
> The merge into syntax from spark sql extensions does really help a lot when
> people want to change row-level data.
>
> We flink currently has a good integration with iceberg format v2 in
> streaming mode, I mean people can export their
> change log data into an iceberg table directly by writing a few sql.
> This[2] is a good material to read if anybody want to
> create a simple demo.
>
> But I'd say in the batch scenarios,  we flink sql currently lack few
> critical SQL syntax (for integrating iceberg format v2 in batch mode
> better):
> 1.  ALTER TABLE to change columns.
> 2.  UPDATE/DELETE sql to change the unexpected rows in a given table.
> 3.  MERGE INTO to merge a batch changing row set  (mixed with
> insert/delete/update) into the given table.
>
> In short, if we want to provide better integration and user experience with
> iceberg v2 in batch, then I think the support of the above syntax
> is very important (from iceberg perspective).
>
> > I think it's better to make that time investment at Calcite's
> implementation before bringing this to Flink.
>
> I find that there are some sql syntax which are critical for flink sql
> while not for other generic sql parser.  Is it possible to implement our
> flink sql plugin/extensions which
> extends the core calcite sql. Going a step further, is it possible for us
> to achieve a better abstraction of the flink sql framework, so that
> downstream components can implement
> their own customized sql plugins based on this sql framework. In this way,
> it is possible to meet the needs of different components to add their own
> sql implementation on top of
> flink sql.
>
> [1]. https://iceberg.apache.org/docs/latest/spark-writes/#merge-into
> [2].
>
> https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/build-real-time-data-lake-tutorial.html
>
>
> On Fri, Feb 11, 2022 at 4:28 PM zhou chao <zhouchao...@hotmail.com> wrote:
>
> > Hi, Martijn Visser, thanks for your reply. Firstly, I am sorry for
> posting
> > the
> > discussion twice. I sent the message to the dev mail group from an unsub-
> > scribed account,  but the message was not shown for a while, and I
> guessed
> > that
> > the dev mail group would not post an email coming from an unsubscribed
> > account, such that I sent it again from a subscribed account.
> >
> > Q: How would you see merge work for streaming data?
> > I think this is an interesting topic, especially for Flink, which is
> > wanting to unify
> > the streaming & batch processing. Back to the merge statement, there
> exist
> > two inputs, target_table and source_table(query). In the merge statement,
> > source_table is used to correct the target_table's results and all rows
> in
> > target_table only need to be corrected once, that's what the batch job
> > does.
> > In the theory aspect, incremental data should be carefully considered for
> > streaming data. In this situation,  the data flow from target_table to
> > target_table
> > will be a loop, and the incremental data with one key will keep going
> > through
> > the loop. It looks very strange. So far, we have not received any user
> > needs
> > matching the merge statement for streaming data. I think that the topic
> for
> > data streaming should be supported by user needs and use cases before
> > talking about.
> >
> > I really agree that we should leverage Calcite, and push calcite to
> invest
> > it,
> > but now this feature does not get enough attention in calcite community.
> I
> > found that some features for flink were also limited by calcite, such as
> > FLINK-21714[1], but finally was fixed in flink side. Could you teach me
> how
> > much effort we can usually afford in a situation like this?
> >
> >
> > best,
> > zoucao
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-21714
> >
> >
> > 2022年2月10日 下午4:09,Martijn Visser <mart...@ververica.com<mailto:
> > mart...@ververica.com>> 写道:
> >
> > Hi zoucao,
> >
> > I see that this message was posted twice, so I choose to only reply to
> the
> > latest one (this one). Thanks for bringing this up for discussion.
> >
> > I agree that support for a merge statement would be a welcome addition to
> > Flink SQL for those that are using it for bounded jobs. How would you see
> > merge work for streaming data?
> >
> > I do think that in order for Flink to properly support this, we should
> > leverage Calcite for this. If there's no proper/full support for merge in
> > Calcite, I don't think we should add this ourselves. I think the time
> > investment and increase in technical debt doesn't outweigh the benefits
> > that this would bring to Flink. If it's really that important, I think
> it's
> > better to make that time investment at Calcite's implementation before
> > bringing this to Flink.
> >
> > Best regards,
> >
> > Martijn Visser
> > https://twitter.com/MartijnVisser82
> >
> >
> > On Wed, 9 Feb 2022 at 08:40, zhou chao <zhouchao...@hotmail.com> wrote:
> >
> > Hi, devs!
> > Jingfeng and I would like to start a discussion about the MERGE
> statement,
> > and the discussion consists of two parts. In the first part, we want to
> > explore and collect the cases and motivations of the MERGE statement
> users.
> > In the second part, we want to find out the possibility for Flink SQL to
> > support the merge statement.
> >
> > Before driving the first topic, we want to introduce the definition and
> > benefits of the merge statement. The MERGE statement in SQL is a very
> > popular clause and it can handle inserts, updates, and deletes all in a
> > single transaction without having to write separate logic for each of
> > these.
> > For each insert, update, or delete statement, we can specify conditions
> > separately. Now, many Engine/DBs have supported this feature, for
> example,
> > SQL Server[1], Spark[2], Hive[3],  pgSQL[4].
> >
> > Our use case:
> > Order analysis & processing is one the most important scenario, but
> > sometimes updated orders have a long time span compared with the last one
> > with the same primary key, in the meanwhile, the states for this key have
> > expired, such that the wrong Agg result will be achieved. In this
> > situation, we use the merge statement in a batch job to correct the
> > results, and now spark + iceberg is chosen in our internal. In the
> future,
> > we want to unify the batch & streaming by using FlinkSQL in our internal,
> > it would be better if Flink could support the merge statement. If you
> have
> > other use cases and opinions, plz show us here.
> >
> > Now, calcite does not have good support for the merge statement, and
> there
> > exists a Jira CALCITE-4338[5] to track. Could we support the merge
> > statement relying on the limited support from calcite-1.26.0? I wrote a
> > simple doc[6] to drive this, just want to find out the possibility for
> > Flink SQL to support the merge statement.
> >
> > Looking forward to your feedback, thanks.
> >
> > best,
> > zoucao
> >
> >
> > [1]
> >
> >
> https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?redirectedfrom=MSDN&view=sql-server-ver15
> > [2]https://issues.apache.org/jira/browse/SPARK-28893
> > [3]
> >
> >
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge
> > [4]https://www.postgresql.org/message-id/attachment/23520/sql-merge.html
> > [5]https://issues.apache.org/jira/browse/CALCITE-4338
> > [6]
> >
> >
> https://docs.google.com/document/d/12wwCqK6zfWGs84ijFZmGPJqCYfYHUPmfx5CvzUkVrw4/edit?usp=sharing
> >
> >
>

Reply via email to