Dear Flink Community,

I'm starting my Flink journey, and I have stumbled upon a behavior of
Dynamic Tables / Changelog Streams which I cannot quite understand. So far
I haven't found an answer across the mailing list, articles and conference
talk recordings. Could you please help to understand the situation?

TLDR:
Many-to-one JOIN converts `+U` events to `-D`/`+I` pairs. Is there a way to
keep `+U` flowing through the system?

More detailed:

My use-case seems to be quite standard: maintaining asynchronous
incremental materialized denormalized view of my PostgreSQL database

In simple terms, I have a bunch of tables that I need to join together in
Flink and send the result to multiple destinations, including ElasticSearch.

In the setup I'm working with there is already Debezium and Kafka present,
so it seemed natural to me to subscribe to Kafka topics in Flink instead of
the PostgreSQL directly.

The end-to-end setup looks like this:
PostgreSQL -> Debezium -> Kafka -> Flink -> ElasticSearch

In the Flink application I did the following:
1. Create a `KafkaSource` for each table to subscribe to the debezium
topics, deserializing from AWS Glue Avro schemas to the generated POJOs.
2. Manually map POJO stream to `DataStream<Row>` with the Upsert semantics
3. Define two tables A and B using `.fromChangelogStream(source, schema,
ChangelogMode.upsert())` where schema includes a primary key.
4. Use `SELECT A.id as a_id, B.id as b_id, A.code as a_code, B.code as
b_code FROM A JOIN B ON A.b_id = B.id` to join the two tables together
5. Convert back to a stream with `.toChangelogStream(joinedTable, schema,
ChangelogMode.upsert())` where `schema` includes `a_id` as the primary key
6. Sink that stream to ElasticSearch

On Step 2, I generate a stream of `+U` row kinds. However, the JOIN on Step
4 converts them into `-D`/`+I` pairs even if in the source I only have `+U`
changing just the `A.code` field. The primary key information seems to have
been lost.

This leads to a stream of Delete/Insert events going to Elastic which isn't
optimal.

I understand that in a general scenario, JOINs are many-to-many, but in my
case I'm joining many-to-one, so I expect `A.id` to be preserved as the
primary key for the join result.

I can work around this limitation with buffering before the final sink to
aggregate `-D`/`+I` pairs back into `+U`. However, I wonder if there is a
way to somehow preserve the `+U` through the Join, as that would reduce the
amount of events going through the system?

Thank you in advance for taking the time to read and answer this question,
I really appreciate the help.

Kind regards,
Andrey Starostin

Reply via email to