Hi all, I had a question about how to allow a migration of state in a temporal left join. Let me first describe what we are doing:
- We have an input stream (Kinesis stream) of device data and we are performing a left temporal join on this stream to enrich it with metadata about the device as the data arrives. - The Kinesis stream is watermarked using the arrival time, and the enrichment table (in this case a MySQL CDC connector from ververica) is watermarked with a constant, far-future timestamp (following the recommendation here; https://stackoverflow.com/a/69397954/533501), to ensure it doesn't stall and more or less immediately emits the row. The join is on the serial number, so: SELECT * FROM input_stream st LEFT JOIN enrichment_table FOR SYSTEM_TIME AS OF arrival_ts et ON st.device_serial_number = et.sn; - We are using KDA, so Flink 1.15.2. Our issue is as follows. When we update the input schema of the Kinesis Stream (we are using the Kinesis Stream Table API connector, with protobuf format which I backported from 1.16), the state cannot be migrated since the new type is incompatible, e.g.: TemporalJoin[10] -> Calc[11] -> (... -> IcebergStreamWriter) (1/1)#10 (...) switched from INITIALIZING to FAILED with failure cause: java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:109) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:232) at org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator.open(TemporalRowTimeJoinOperator.java:159) ... Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@6621650b) must not be incompatible with the old state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@97ed2886). In this particular case, the schema evolution was an added field, so something I would expect to be supported (or at least somehow supportable). Having a look at the code ( https://github.com/apache/flink/blob/69e812688b43be9a0c4f79e6af81bc2d1d8a873e/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java#L160), this is coming when the left state of the join is obtained, which, I suppose, is expected since the type there has changed. At the moment, the only way to fix this is to restart the application without state. This is of course, not ideal, since it can result in data loss. My question is if there is anyway around this? I'm not sure the state of the temporal join is actually required for our case because we really just need to ensure that all records from the upstream Kinesis are processed. It is not absolutely critical that the enrichment is "deterministic" or "exactly right at the point in time", it must just be eventually consistent, which is, e.g. why we use the far-future watermarking in the enrichment table. Here are some options I was considering, I'm not sure if they all make sense: - just remove the state for this operator on reboot somehow, and then start the application allowing non-restored state. - Somehow use a customer serializer to manage this. - If it is not possible to migrate the state at all, then we would somehow try to get the state of the Kinesis iterators (or maybe just a timestamp) and restart the job starting from that point in the Kinesis stream. I'm looking forward to your input, thanks for your help! Cheers, Mike -- Michael Marino Principal Data Science & Analytics Phone: +49 89 7167786 - 14 linkedin.com/company/tadogmbh <https://www.linkedin.com/company/tadogmbh> | facebook.com/tado <http://www.facebook.com/tado> | twitter.com/tado <http://www.twitter.com/tado> | youtube.com/tado <http://www.youtube.com/tado> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany Managing Directors: Toon Bouten | Christian Deilmann | Johannes Schwarz | Josef Wenzl Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE 280012558