Jiabao Sun created FLINK-29625:
----------------------------------
Summary: Optimize changelog normalize
Key: FLINK-29625
URL: https://issues.apache.org/jira/browse/FLINK-29625
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Affects Versions: 1.15.2
Reporter: Jiabao Sun
Currently, Flink will add an expensive operator _changelog normalize_ to the
source of the upsert changelog mode to complete the _update_before_ value.
Even inserting directly from upsert-kafka source to upsert-kafka sink will
still add this operator, and there is an extra operator to clear
_upsert_before_ messages, which is obviously redundant.
In CDC scenarios, some databases do not provide update before images, such as
Cassandra、MongoDB、TiDB({_}Old Value{_} is not turned on) and Postgres
({_}REPLICA IDENTITY{_} is not set to {_}FULL{_}). Using Flink SQL to process
these changelog will have a lot of state overhead.
I don't know much about why this operator is needed, so I take the liberty to
ask if we can get rid of changelog normalize completely or optimistic about it,
adding it only if a normalized changelog is required by an after operator.
If this optimization is worthwhile, I'm happy to help with it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)