Hi Michael,

In the current Table API/SQL, there's no guarantee that a change to either
the query or the Flink version won't lead to state incompatibility. That's
also documented at
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/upgrading/#table-api--sql

Best regards,

Martijn

On Fri, Jul 7, 2023 at 9:23 AM Michael Marino <michael.mar...@tado.com>
wrote:

> Hi all,
>
> I had a question about how to allow a migration of state in a temporal
> left join. Let me first describe what we are doing:
>
> - We have an input stream (Kinesis stream) of device data and we are
> performing a left temporal join on this stream to enrich it with metadata
> about the device as the data arrives.
> - The Kinesis stream is watermarked using the arrival time, and the
> enrichment table (in this case a MySQL CDC connector from ververica) is
> watermarked with a constant, far-future timestamp (following the
> recommendation here; https://stackoverflow.com/a/69397954/533501), to
> ensure it doesn't stall and more or less immediately emits the row. The
> join is on the serial number, so:
>
>         SELECT * FROM input_stream st
>         LEFT JOIN enrichment_table FOR SYSTEM_TIME AS OF arrival_ts et
>           ON st.device_serial_number = et.sn;
>
> - We are using KDA, so Flink 1.15.2.
>
> Our issue is as follows. When we update the input schema of the Kinesis
> Stream (we are using the Kinesis Stream Table API connector, with protobuf
> format which I backported from 1.16), the state cannot be migrated since
> the new type is incompatible, e.g.:
>
>
> TemporalJoin[10] -> Calc[11] -> (... -> IcebergStreamWriter) (1/1)#10
> (...) switched from INITIALIZING to FAILED with failure cause:
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:109)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:232)
> at
> org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator.open(TemporalRowTimeJoinOperator.java:159)
> ...
> Caused by: org.apache.flink.util.StateMigrationException: The new state
> serializer
> (org.apache.flink.api.common.typeutils.base.MapSerializer@6621650b) must
> not be incompatible with the old state serializer
> (org.apache.flink.api.common.typeutils.base.MapSerializer@97ed2886).
>
> In this particular case, the schema evolution was an added field, so
> something I would expect to be supported (or at least somehow supportable).
>
> Having a look at the code (
> https://github.com/apache/flink/blob/69e812688b43be9a0c4f79e6af81bc2d1d8a873e/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java#L160),
> this is coming when the left state of the join is obtained, which, I
> suppose, is expected since the type there has changed.
>
> At the moment, the only way to fix this is to restart the application
> without state. This is of course, not ideal, since it can result in data
> loss. My question is if there is anyway around this? I'm not sure the state
> of the temporal join is actually required for our case because we really
> just need to ensure that all records from the upstream Kinesis are
> processed. It is not absolutely critical that the enrichment is
> "deterministic" or "exactly right at the point in time", it must just be
> eventually consistent, which is, e.g. why we use the far-future
> watermarking in the enrichment table. Here are some options I was
> considering, I'm not sure if they all make sense:
>
> - just remove the state for this operator on reboot somehow, and then
> start the application allowing non-restored state.
> - Somehow use a customer serializer to manage this.
> - If it is not possible to migrate the state at all, then we would somehow
> try to get the state of the Kinesis iterators (or maybe just a timestamp)
> and restart the job starting from that point in the Kinesis stream.
>
> I'm looking forward to your input, thanks for your help!
>
> Cheers,
> Mike
>
>
>
> --
>
> Michael Marino
>
> Principal Data Science & Analytics
>
> Phone:  +49 89 7167786 - 14
>
> linkedin.com/company/tadogmbh <https://www.linkedin.com/company/tadogmbh>
> | facebook.com/tado <http://www.facebook.com/tado> | twitter.com/tado
> <http://www.twitter.com/tado> | youtube.com/tado
> <http://www.youtube.com/tado>
>
> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany
>
>  Managing Directors: Toon Bouten | Christian Deilmann | Johannes Schwarz |
> Josef Wenzl
>
> Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
> 280012558
>

Reply via email to