Hi Martijn, Thanks for the reply.
I understand that this is generally the case for the Table/SQL API, though I didn't expect a change in the schema should effect a change in the compiled plan, especially since we are not changing the join keys, etc. I was wondering if there is a way to deal with this for this particular case by, e.g.: - doing something to enable a custom migration of the type - removing the state for the given operator when loading from savepoint - make the row data be "opaque" to the operator (e.g. keeping the data as *bytes* + the join key, and then doing a protobuf deserialization after the join). - something else entirely? We don't expect to be changing the input schema so often, but often enough (~ 1/ month) such that we need to be able to do this without data loss. If nothing is possible at the moment, my backup plan is to make use of our committed data (our final sink is an Iceberg table, also with the kinesis metadata) and restart the application using the latest info as our starting point. I'd like to avoid this, though, because it would certainly create a bit of complexity. Thanks, Mike On Fri, Jul 7, 2023 at 10:58 AM Martijn Visser <martijnvis...@apache.org> wrote: > Hi Michael, > > In the current Table API/SQL, there's no guarantee that a change to either > the query or the Flink version won't lead to state incompatibility. That's > also documented at > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/upgrading/#table-api--sql > > Best regards, > > Martijn > > On Fri, Jul 7, 2023 at 9:23 AM Michael Marino <michael.mar...@tado.com> > wrote: > >> Hi all, >> >> I had a question about how to allow a migration of state in a temporal >> left join. Let me first describe what we are doing: >> >> - We have an input stream (Kinesis stream) of device data and we are >> performing a left temporal join on this stream to enrich it with metadata >> about the device as the data arrives. >> - The Kinesis stream is watermarked using the arrival time, and the >> enrichment table (in this case a MySQL CDC connector from ververica) is >> watermarked with a constant, far-future timestamp (following the >> recommendation here; https://stackoverflow.com/a/69397954/533501), to >> ensure it doesn't stall and more or less immediately emits the row. The >> join is on the serial number, so: >> >> SELECT * FROM input_stream st >> LEFT JOIN enrichment_table FOR SYSTEM_TIME AS OF arrival_ts et >> ON st.device_serial_number = et.sn; >> >> - We are using KDA, so Flink 1.15.2. >> >> Our issue is as follows. When we update the input schema of the Kinesis >> Stream (we are using the Kinesis Stream Table API connector, with protobuf >> format which I backported from 1.16), the state cannot be migrated since >> the new type is incompatible, e.g.: >> >> >> TemporalJoin[10] -> Calc[11] -> (... -> IcebergStreamWriter) (1/1)#10 >> (...) switched from INITIALIZING to FAILED with failure cause: >> java.lang.RuntimeException: Error while getting state >> at >> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:109) >> at >> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:232) >> at >> org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator.open(TemporalRowTimeJoinOperator.java:159) >> ... >> Caused by: org.apache.flink.util.StateMigrationException: The new state >> serializer >> (org.apache.flink.api.common.typeutils.base.MapSerializer@6621650b) must >> not be incompatible with the old state serializer >> (org.apache.flink.api.common.typeutils.base.MapSerializer@97ed2886). >> >> In this particular case, the schema evolution was an added field, so >> something I would expect to be supported (or at least somehow supportable). >> >> Having a look at the code ( >> https://github.com/apache/flink/blob/69e812688b43be9a0c4f79e6af81bc2d1d8a873e/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java#L160), >> this is coming when the left state of the join is obtained, which, I >> suppose, is expected since the type there has changed. >> >> At the moment, the only way to fix this is to restart the application >> without state. This is of course, not ideal, since it can result in data >> loss. My question is if there is anyway around this? I'm not sure the state >> of the temporal join is actually required for our case because we really >> just need to ensure that all records from the upstream Kinesis are >> processed. It is not absolutely critical that the enrichment is >> "deterministic" or "exactly right at the point in time", it must just be >> eventually consistent, which is, e.g. why we use the far-future >> watermarking in the enrichment table. Here are some options I was >> considering, I'm not sure if they all make sense: >> >> - just remove the state for this operator on reboot somehow, and then >> start the application allowing non-restored state. >> - Somehow use a customer serializer to manage this. >> - If it is not possible to migrate the state at all, then we would >> somehow try to get the state of the Kinesis iterators (or maybe just a >> timestamp) and restart the job starting from that point in the Kinesis >> stream. >> >> I'm looking forward to your input, thanks for your help! >> >> Cheers, >> Mike >> >> >> >> -- >> >> Michael Marino >> >> Principal Data Science & Analytics >> >> Phone: +49 89 7167786 - 14 >> >> linkedin.com/company/tadogmbh <https://www.linkedin.com/company/tadogmbh> >> | facebook.com/tado <http://www.facebook.com/tado> | twitter.com/tado >> <http://www.twitter.com/tado> | youtube.com/tado >> <http://www.youtube.com/tado> >> >> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany >> >> Managing Directors: Toon Bouten | Christian Deilmann | Johannes Schwarz | >> Josef Wenzl >> >> Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE >> 280012558 >> > -- Michael Marino Principal Data Science & Analytics Phone: +49 89 7167786 - 14 linkedin.com/company/tadogmbh <https://www.linkedin.com/company/tadogmbh> | facebook.com/tado <http://www.facebook.com/tado> | twitter.com/tado <http://www.twitter.com/tado> | youtube.com/tado <http://www.youtube.com/tado> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany Managing Directors: Toon Bouten | Christian Deilmann | Johannes Schwarz | Josef Wenzl Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE 280012558