Hi Xu,

Thank you for providing more context.

I was using inner and left outer joins in my stream processing.
After digging further I have understood that both D/UB and I/UA records are
needed to handle many-to-one joins correctly in a general case when
parallelisation can be greater than 1, and Flink needs to rekey those CDC
records to different subtasks.
E.g. when changing the "parentId" of an item from A to B, and joining the
parents within a Flink Job, the D/UB CDC record can go to subtask on task
manager 1, and I/UA CDC record to subtask on task manager 2 of the join
operator. Hence, we need both records to keep the state up to date
correctly.
I have also followed a suggestion found online to rerank items after a
join, to get the unique key back and optimize downstream joins. This worked
well.

Thank you for your support.

Best regards,
Andrey Starostin


On Wed, Apr 16, 2025 at 7:54 AM shuai xu <xushuai...@gmail.com> wrote:

> Hi Andrey,
>
> Regarding the streaming join operator, it handles RowKind differently
> based on the type of join:
>
> For inner joins, it directly forwards the input RowKind.
> For other types of joins (e.g., left, right, or full outer joins), it
> simplifies processing by always emitting INSERT and DELETE messages.
>
> This behavior is documented in FLINK-17337. As shown in the code
> snippet for step 4, this is an inner join scenario.
>
> If you were using a left, right, or full outer join, the observed
> behavior would be expected due to the simplified handling of RowKind.
> Could you confirm which type of join you are working with?
>
> Best,
> Xu Shuai
>
>
> > 2025年3月29日 18:19,Andrey <fsstar....@gmail.com> 写道:
> >
> > Dear Flink Community,
> >
> > I'm starting my Flink journey, and I have stumbled upon a behavior of
> Dynamic Tables / Changelog Streams which I cannot quite understand. So far
> I haven't found an answer across the mailing list, articles and conference
> talk recordings. Could you please help to understand the situation?
> >
> > TLDR:
> > Many-to-one JOIN converts `+U` events to `-D`/`+I` pairs. Is there a way
> to keep `+U` flowing through the system?
> >
> > More detailed:
> >
> > My use-case seems to be quite standard: maintaining asynchronous
> incremental materialized denormalized view of my PostgreSQL database
> >
> > In simple terms, I have a bunch of tables that I need to join together
> in Flink and send the result to multiple destinations, including
> ElasticSearch.
> >
> > In the setup I'm working with there is already Debezium and Kafka
> present, so it seemed natural to me to subscribe to Kafka topics in Flink
> instead of the PostgreSQL directly.
> >
> > The end-to-end setup looks like this:
> > PostgreSQL -> Debezium -> Kafka -> Flink -> ElasticSearch
> >
> > In the Flink application I did the following:
> > 1. Create a `KafkaSource` for each table to subscribe to the debezium
> topics, deserializing from AWS Glue Avro schemas to the generated POJOs.
> > 2. Manually map POJO stream to `DataStream<Row>` with the Upsert
> semantics
> > 3. Define two tables A and B using `.fromChangelogStream(source, schema,
> ChangelogMode.upsert())` where schema includes a primary key.
> > 4. Use `SELECT A.id as a_id, B.id as b_id, A.code as a_code, B.code as
> b_code FROM A JOIN B ON A.b_id = B.id` to join the two tables together
> > 5. Convert back to a stream with `.toChangelogStream(joinedTable,
> schema, ChangelogMode.upsert())` where `schema` includes `a_id` as the
> primary key
> > 6. Sink that stream to ElasticSearch
> >
> > On Step 2, I generate a stream of `+U` row kinds. However, the JOIN on
> Step 4 converts them into `-D`/`+I` pairs even if in the source I only have
> `+U` changing just the `A.code` field. The primary key information seems to
> have been lost.
> >
> > This leads to a stream of Delete/Insert events going to Elastic which
> isn't optimal.
> >
> > I understand that in a general scenario, JOINs are many-to-many, but in
> my case I'm joining many-to-one, so I expect `A.id` to be preserved as the
> primary key for the join result.
> >
> > I can work around this limitation with buffering before the final sink
> to aggregate `-D`/`+I` pairs back into `+U`. However, I wonder if there is
> a way to somehow preserve the `+U` through the Join, as that would reduce
> the amount of events going through the system?
> >
> > Thank you in advance for taking the time to read and answer this
> question, I really appreciate the help.
> >
> > Kind regards,
> > Andrey Starostin
> >
>
>

Reply via email to