Hi Xu, Thank you for providing more context.
I was using inner and left outer joins in my stream processing. After digging further I have understood that both D/UB and I/UA records are needed to handle many-to-one joins correctly in a general case when parallelisation can be greater than 1, and Flink needs to rekey those CDC records to different subtasks. E.g. when changing the "parentId" of an item from A to B, and joining the parents within a Flink Job, the D/UB CDC record can go to subtask on task manager 1, and I/UA CDC record to subtask on task manager 2 of the join operator. Hence, we need both records to keep the state up to date correctly. I have also followed a suggestion found online to rerank items after a join, to get the unique key back and optimize downstream joins. This worked well. Thank you for your support. Best regards, Andrey Starostin On Wed, Apr 16, 2025 at 7:54 AM shuai xu <xushuai...@gmail.com> wrote: > Hi Andrey, > > Regarding the streaming join operator, it handles RowKind differently > based on the type of join: > > For inner joins, it directly forwards the input RowKind. > For other types of joins (e.g., left, right, or full outer joins), it > simplifies processing by always emitting INSERT and DELETE messages. > > This behavior is documented in FLINK-17337. As shown in the code > snippet for step 4, this is an inner join scenario. > > If you were using a left, right, or full outer join, the observed > behavior would be expected due to the simplified handling of RowKind. > Could you confirm which type of join you are working with? > > Best, > Xu Shuai > > > > 2025年3月29日 18:19,Andrey <fsstar....@gmail.com> 写道: > > > > Dear Flink Community, > > > > I'm starting my Flink journey, and I have stumbled upon a behavior of > Dynamic Tables / Changelog Streams which I cannot quite understand. So far > I haven't found an answer across the mailing list, articles and conference > talk recordings. Could you please help to understand the situation? > > > > TLDR: > > Many-to-one JOIN converts `+U` events to `-D`/`+I` pairs. Is there a way > to keep `+U` flowing through the system? > > > > More detailed: > > > > My use-case seems to be quite standard: maintaining asynchronous > incremental materialized denormalized view of my PostgreSQL database > > > > In simple terms, I have a bunch of tables that I need to join together > in Flink and send the result to multiple destinations, including > ElasticSearch. > > > > In the setup I'm working with there is already Debezium and Kafka > present, so it seemed natural to me to subscribe to Kafka topics in Flink > instead of the PostgreSQL directly. > > > > The end-to-end setup looks like this: > > PostgreSQL -> Debezium -> Kafka -> Flink -> ElasticSearch > > > > In the Flink application I did the following: > > 1. Create a `KafkaSource` for each table to subscribe to the debezium > topics, deserializing from AWS Glue Avro schemas to the generated POJOs. > > 2. Manually map POJO stream to `DataStream<Row>` with the Upsert > semantics > > 3. Define two tables A and B using `.fromChangelogStream(source, schema, > ChangelogMode.upsert())` where schema includes a primary key. > > 4. Use `SELECT A.id as a_id, B.id as b_id, A.code as a_code, B.code as > b_code FROM A JOIN B ON A.b_id = B.id` to join the two tables together > > 5. Convert back to a stream with `.toChangelogStream(joinedTable, > schema, ChangelogMode.upsert())` where `schema` includes `a_id` as the > primary key > > 6. Sink that stream to ElasticSearch > > > > On Step 2, I generate a stream of `+U` row kinds. However, the JOIN on > Step 4 converts them into `-D`/`+I` pairs even if in the source I only have > `+U` changing just the `A.code` field. The primary key information seems to > have been lost. > > > > This leads to a stream of Delete/Insert events going to Elastic which > isn't optimal. > > > > I understand that in a general scenario, JOINs are many-to-many, but in > my case I'm joining many-to-one, so I expect `A.id` to be preserved as the > primary key for the join result. > > > > I can work around this limitation with buffering before the final sink > to aggregate `-D`/`+I` pairs back into `+U`. However, I wonder if there is > a way to somehow preserve the `+U` through the Join, as that would reduce > the amount of events going through the system? > > > > Thank you in advance for taking the time to read and answer this > question, I really appreciate the help. > > > > Kind regards, > > Andrey Starostin > > > >