Hi all,

I had a question about how to allow a migration of state in a temporal left
join. Let me first describe what we are doing:

- We have an input stream (Kinesis stream) of device data and we are
performing a left temporal join on this stream to enrich it with metadata
about the device as the data arrives.
- The Kinesis stream is watermarked using the arrival time, and the
enrichment table (in this case a MySQL CDC connector from ververica) is
watermarked with a constant, far-future timestamp (following the
recommendation here; https://stackoverflow.com/a/69397954/533501), to
ensure it doesn't stall and more or less immediately emits the row. The
join is on the serial number, so:

        SELECT * FROM input_stream st
        LEFT JOIN enrichment_table FOR SYSTEM_TIME AS OF arrival_ts et
          ON st.device_serial_number = et.sn;

- We are using KDA, so Flink 1.15.2.

Our issue is as follows. When we update the input schema of the Kinesis
Stream (we are using the Kinesis Stream Table API connector, with protobuf
format which I backported from 1.16), the state cannot be migrated since
the new type is incompatible, e.g.:


TemporalJoin[10] -> Calc[11] -> (... -> IcebergStreamWriter) (1/1)#10 (...)
switched from INITIALIZING to FAILED with failure cause:
java.lang.RuntimeException: Error while getting state
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:109)
at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:232)
at
org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator.open(TemporalRowTimeJoinOperator.java:159)
...
Caused by: org.apache.flink.util.StateMigrationException: The new state
serializer
(org.apache.flink.api.common.typeutils.base.MapSerializer@6621650b) must
not be incompatible with the old state serializer
(org.apache.flink.api.common.typeutils.base.MapSerializer@97ed2886).

In this particular case, the schema evolution was an added field, so
something I would expect to be supported (or at least somehow supportable).

Having a look at the code (
https://github.com/apache/flink/blob/69e812688b43be9a0c4f79e6af81bc2d1d8a873e/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java#L160),
this is coming when the left state of the join is obtained, which, I
suppose, is expected since the type there has changed.

At the moment, the only way to fix this is to restart the application
without state. This is of course, not ideal, since it can result in data
loss. My question is if there is anyway around this? I'm not sure the state
of the temporal join is actually required for our case because we really
just need to ensure that all records from the upstream Kinesis are
processed. It is not absolutely critical that the enrichment is
"deterministic" or "exactly right at the point in time", it must just be
eventually consistent, which is, e.g. why we use the far-future
watermarking in the enrichment table. Here are some options I was
considering, I'm not sure if they all make sense:

- just remove the state for this operator on reboot somehow, and then start
the application allowing non-restored state.
- Somehow use a customer serializer to manage this.
- If it is not possible to migrate the state at all, then we would somehow
try to get the state of the Kinesis iterators (or maybe just a timestamp)
and restart the job starting from that point in the Kinesis stream.

I'm looking forward to your input, thanks for your help!

Cheers,
Mike



-- 

Michael Marino

Principal Data Science & Analytics

Phone:  +49 89 7167786 - 14

linkedin.com/company/tadogmbh <https://www.linkedin.com/company/tadogmbh> |
facebook.com/tado <http://www.facebook.com/tado> | twitter.com/tado
<http://www.twitter.com/tado> | youtube.com/tado
<http://www.youtube.com/tado>

www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany

 Managing Directors: Toon Bouten | Christian Deilmann | Johannes Schwarz |
Josef Wenzl

Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
280012558

Reply via email to