Dear Flink Community, I'm starting my Flink journey, and I have stumbled upon a behavior of Dynamic Tables / Changelog Streams which I cannot quite understand. So far I haven't found an answer across the mailing list, articles and conference talk recordings. Could you please help to understand the situation?
TLDR: Many-to-one JOIN converts `+U` events to `-D`/`+I` pairs. Is there a way to keep `+U` flowing through the system? More detailed: My use-case seems to be quite standard: maintaining asynchronous incremental materialized denormalized view of my PostgreSQL database In simple terms, I have a bunch of tables that I need to join together in Flink and send the result to multiple destinations, including ElasticSearch. In the setup I'm working with there is already Debezium and Kafka present, so it seemed natural to me to subscribe to Kafka topics in Flink instead of the PostgreSQL directly. The end-to-end setup looks like this: PostgreSQL -> Debezium -> Kafka -> Flink -> ElasticSearch In the Flink application I did the following: 1. Create a `KafkaSource` for each table to subscribe to the debezium topics, deserializing from AWS Glue Avro schemas to the generated POJOs. 2. Manually map POJO stream to `DataStream<Row>` with the Upsert semantics 3. Define two tables A and B using `.fromChangelogStream(source, schema, ChangelogMode.upsert())` where schema includes a primary key. 4. Use `SELECT A.id as a_id, B.id as b_id, A.code as a_code, B.code as b_code FROM A JOIN B ON A.b_id = B.id` to join the two tables together 5. Convert back to a stream with `.toChangelogStream(joinedTable, schema, ChangelogMode.upsert())` where `schema` includes `a_id` as the primary key 6. Sink that stream to ElasticSearch On Step 2, I generate a stream of `+U` row kinds. However, the JOIN on Step 4 converts them into `-D`/`+I` pairs even if in the source I only have `+U` changing just the `A.code` field. The primary key information seems to have been lost. This leads to a stream of Delete/Insert events going to Elastic which isn't optimal. I understand that in a general scenario, JOINs are many-to-many, but in my case I'm joining many-to-one, so I expect `A.id` to be preserved as the primary key for the join result. I can work around this limitation with buffering before the final sink to aggregate `-D`/`+I` pairs back into `+U`. However, I wonder if there is a way to somehow preserve the `+U` through the Join, as that would reduce the amount of events going through the system? Thank you in advance for taking the time to read and answer this question, I really appreciate the help. Kind regards, Andrey Starostin