Hi Michael, In the current Table API/SQL, there's no guarantee that a change to either the query or the Flink version won't lead to state incompatibility. That's also documented at https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/upgrading/#table-api--sql
Best regards, Martijn On Fri, Jul 7, 2023 at 9:23 AM Michael Marino <michael.mar...@tado.com> wrote: > Hi all, > > I had a question about how to allow a migration of state in a temporal > left join. Let me first describe what we are doing: > > - We have an input stream (Kinesis stream) of device data and we are > performing a left temporal join on this stream to enrich it with metadata > about the device as the data arrives. > - The Kinesis stream is watermarked using the arrival time, and the > enrichment table (in this case a MySQL CDC connector from ververica) is > watermarked with a constant, far-future timestamp (following the > recommendation here; https://stackoverflow.com/a/69397954/533501), to > ensure it doesn't stall and more or less immediately emits the row. The > join is on the serial number, so: > > SELECT * FROM input_stream st > LEFT JOIN enrichment_table FOR SYSTEM_TIME AS OF arrival_ts et > ON st.device_serial_number = et.sn; > > - We are using KDA, so Flink 1.15.2. > > Our issue is as follows. When we update the input schema of the Kinesis > Stream (we are using the Kinesis Stream Table API connector, with protobuf > format which I backported from 1.16), the state cannot be migrated since > the new type is incompatible, e.g.: > > > TemporalJoin[10] -> Calc[11] -> (... -> IcebergStreamWriter) (1/1)#10 > (...) switched from INITIALIZING to FAILED with failure cause: > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:109) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:232) > at > org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator.open(TemporalRowTimeJoinOperator.java:159) > ... > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer > (org.apache.flink.api.common.typeutils.base.MapSerializer@6621650b) must > not be incompatible with the old state serializer > (org.apache.flink.api.common.typeutils.base.MapSerializer@97ed2886). > > In this particular case, the schema evolution was an added field, so > something I would expect to be supported (or at least somehow supportable). > > Having a look at the code ( > https://github.com/apache/flink/blob/69e812688b43be9a0c4f79e6af81bc2d1d8a873e/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java#L160), > this is coming when the left state of the join is obtained, which, I > suppose, is expected since the type there has changed. > > At the moment, the only way to fix this is to restart the application > without state. This is of course, not ideal, since it can result in data > loss. My question is if there is anyway around this? I'm not sure the state > of the temporal join is actually required for our case because we really > just need to ensure that all records from the upstream Kinesis are > processed. It is not absolutely critical that the enrichment is > "deterministic" or "exactly right at the point in time", it must just be > eventually consistent, which is, e.g. why we use the far-future > watermarking in the enrichment table. Here are some options I was > considering, I'm not sure if they all make sense: > > - just remove the state for this operator on reboot somehow, and then > start the application allowing non-restored state. > - Somehow use a customer serializer to manage this. > - If it is not possible to migrate the state at all, then we would somehow > try to get the state of the Kinesis iterators (or maybe just a timestamp) > and restart the job starting from that point in the Kinesis stream. > > I'm looking forward to your input, thanks for your help! > > Cheers, > Mike > > > > -- > > Michael Marino > > Principal Data Science & Analytics > > Phone: +49 89 7167786 - 14 > > linkedin.com/company/tadogmbh <https://www.linkedin.com/company/tadogmbh> > | facebook.com/tado <http://www.facebook.com/tado> | twitter.com/tado > <http://www.twitter.com/tado> | youtube.com/tado > <http://www.youtube.com/tado> > > www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany > > Managing Directors: Toon Bouten | Christian Deilmann | Johannes Schwarz | > Josef Wenzl > > Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE > 280012558 >