Hi, Maj.
> 1. Does the materializer support jobs containing different types of joins > (more specifically regular and temporal joins)? > 2. Does the materializer support different types of input connectors: kafka > with both debezium-avro-confluent and avro-confluent formats and upsert-kafka > with avro-confluent format? All with well defined primary key (PK) The common answer to both questions is "no." The upsert materializer is only related to the sink and the node before the sink (usually a join or an aggregation, etc.). By default (with table.exec.sink.upsert-materialize = AUTO), the upsert materializer will appear when the upsert key of the upstream node before the sink and the pk of the sink do not match. Usually, we do not need to manually set this parameter to FORCE. Suppose we have a source table T1, with a schema of "a", "b", "c", and "a" is the pk. Downstream, "b" is used as the join key to join with table T2, and the result is written into table T3, where "a" is also the pk. The global parallelism is set to 2. The source will issue (+I, a1, b1, c1), (-U, a1, b1, c1), (+U, a1, b2, c2). Because the join key is "b", the upsert key for the join becomes "b", which does not match the sink's pk "a", hence a sink materializer is produced. Since the join key is "b", (+I, a1, b1, c1) and (-U, a1, b1, c1) will be sent to the first parallel instance of the join "join1", and (+U, a1, b2, c2) will be sent to the second parallel instance of the join "join2". At the same time, since the sink's pk is "a", these three pieces of data are actually related in sequence at the sink. In practice, due to different processing speeds of join1 and join2, the sink may receive the following three possible sequences: (+I, a1, b1, c1), (-U, a1, b1, c1), (+U, a1, b2, c2) (+U, a1, b2, c2), (+I, a1, b1, c1), (-U, a1, b1, c1) (+I, a1, b1, c1), (+U, a1, b2, c2), (-U, a1, b1, c1) (Note that the two pieces of data (+I, a1, b1, c1) and (-U, a1, b1, c1) must be in order because they are processed by the single parallel "join1".) Without an upsert materializer, in cases 2 and 3, the sink would ultimately receive -U, leading to data deletion. The upsert materializer is used to correctly the finally issue (+U, a1, b2, c2) in cases 2 and 3. Regrettably, Flink currently does not have a means of online debugging. To confirm the logic related to the upsert materializer, you may need to download the repo from the Flink repository, build & compile it, and then run the SinkUpsertMaterializerTest test class to observe and test youself. Regarding the upsert key, you can use EXPLAIN CHANGELOG_MODE ... to view them in the plan. If there are any issues with the above, please correct me. -- Best! Xuyang At 2024-01-31 20:24:57, "Marek Maj" <marekm...@gmail.com> wrote: Hello Flink Community, In our Flink SQL job we are experiencing undesirable behavior that is related to events reordering (more below in background section) I have a few questions related to sink upsert materializer, the answer to them should help me understand its capabilities: 1. Does the materializer support jobs containing different types of joins (more specifically regular and temporal joins)? 2. Does the materializer support different types of input connectors: kafka with both debezium-avro-confluent and avro-confluent formats and upsert-kafka with avro-confluent format? All with well defined primary key (PK) 3. What is the recommended way to debug sink materializer? I outputed compiled plan for SQL job and I can see that upsert materializer is on, but I am not sure if I can extract more information about its behavior Flink version we are using: 1.16.1 best regards Marek Background: We have deployed Flink SQL job that uses multiple joins to enrich data coming from main table. Altogether we have 11 different joins used for enrichment: temporal joins as well as regular joins (both: left and inner). All source tables and output table use kafka topics under the hood. Grain of the main table does not change: main table and output table are using the same non-nullable column for their PK. Job parallelism is 16 We are experiencing data reorder that is presented below Data in kafka input topic for main table (correct order, all in the same partition): Data in kafka output topic after enrichment (events reordered, all in the same partition): Highlighted event in the source becomes the last in the output and as a consequence incorrectly overrides enriched values to nulls. Reordering happens probably due to one slow join where there is a significant data skew and output from one parallel task of this operator is delayed compared to other parallel tasks. This join uses as its key value that is null in first event, and non-null in all other 4 events However, my understanding is that events reordering should be corrected by SinkUpsertMaterializer operator. Our configuration contains: table.exec.sink.upsert-materialize: FORCE table.exec.state.ttl (not configured, default = 0) Sink upsert materializer is visible in the compiled plan info and in the flink dashboard job graph after deployment. It looks like it is not behaving as I would expect and I would like to understand the reason