Hello Flink Community,
In our Flink SQL job we are experiencing undesirable behavior that is
related to events reordering (more below in background section)
I have a few questions related to sink upsert materializer, the answer to
them should help me understand its capabilities:

1. Does the materializer support jobs containing different types of joins
(more specifically regular and temporal joins)?
2. Does the materializer support different types of input connectors: kafka
with both debezium-avro-confluent and avro-confluent formats and
upsert-kafka with avro-confluent format? All with well defined primary key
(PK)
3. What is the recommended way to debug sink materializer? I outputed
compiled plan for SQL job and I can see that upsert materializer is on, but
I am not sure if I can extract more information about its behavior

Flink version we are using: 1.16.1

best regards
Marek


Background:
We have deployed Flink SQL job that uses multiple joins to enrich data
coming from main table. Altogether we have 11 different joins used for
enrichment: temporal joins as well as regular joins (both: left and inner).
All source tables and output table use kafka topics under the hood. Grain
of the main table does not change: main table and output table are using
the same non-nullable column for their PK. Job parallelism is 16

We are experiencing data reorder that is presented below
Data in kafka input topic for main table (correct order, all in the same
partition):
[image: source.png]

Data in kafka output topic after enrichment (events reordered, all in the
same partition):
[image: output.png]

Highlighted event in the source becomes the last in the output and as a
consequence incorrectly overrides enriched values to nulls.
Reordering happens probably due to one slow join where there is a
significant data skew and output from one parallel task of this operator is
delayed compared to other parallel tasks. This join uses as its key value
that is null in first event, and non-null in all other 4 events

However, my understanding is that events reordering should be corrected by
SinkUpsertMaterializer operator. Our configuration contains:
table.exec.sink.upsert-materialize: FORCE
table.exec.state.ttl (not configured, default = 0)

Sink upsert materializer is visible in the compiled plan info and in the
flink dashboard job graph after deployment. It looks like it is not
behaving as I would expect and I would like to understand the reason

Reply via email to