Hi Michael, A change in your schema would also result in a change in the columns that you are selecting with your SELECT * query, would it not? Given that your compiled plan contains the fields you're selecting, it would mean that your compiled plan changes when you have a different schema. After all, if you have a different schema, you would also need to alter the definition for your table in the catalog.
I don't think we can and I also don't think we should try to fix just one case: this should have a proper solution, something that has been brought up in our conversation for Flink 2.0. Best regards, Martijn On Fri, Jul 7, 2023 at 11:40 AM Michael Marino <michael.mar...@tado.com> wrote: > Hi Martijn, > > Thanks for the reply. > > I understand that this is generally the case for the Table/SQL API, though > I didn't expect a change in the schema should effect a change in the > compiled plan, especially since we are not changing the join keys, etc. I > was wondering if there is a way to deal with this for this particular case > by, e.g.: > > - doing something to enable a custom migration of the type > - removing the state for the given operator when loading from savepoint > - make the row data be "opaque" to the operator (e.g. keeping the data as > *bytes* + the join key, and then doing a protobuf deserialization after the > join). > - something else entirely? > > We don't expect to be changing the input schema so often, but often enough > (~ 1/ month) such that we need to be able to do this without data loss. > > If nothing is possible at the moment, my backup plan is to make use of our > committed data (our final sink is an Iceberg table, also with the kinesis > metadata) and restart the application using the latest info as our starting > point. I'd like to avoid this, though, because it would certainly create a > bit of complexity. > > Thanks, > Mike > > On Fri, Jul 7, 2023 at 10:58 AM Martijn Visser <martijnvis...@apache.org> > wrote: > >> Hi Michael, >> >> In the current Table API/SQL, there's no guarantee that a change to >> either the query or the Flink version won't lead to state incompatibility. >> That's also documented at >> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/upgrading/#table-api--sql >> >> Best regards, >> >> Martijn >> >> On Fri, Jul 7, 2023 at 9:23 AM Michael Marino <michael.mar...@tado.com> >> wrote: >> >>> Hi all, >>> >>> I had a question about how to allow a migration of state in a temporal >>> left join. Let me first describe what we are doing: >>> >>> - We have an input stream (Kinesis stream) of device data and we are >>> performing a left temporal join on this stream to enrich it with metadata >>> about the device as the data arrives. >>> - The Kinesis stream is watermarked using the arrival time, and the >>> enrichment table (in this case a MySQL CDC connector from ververica) is >>> watermarked with a constant, far-future timestamp (following the >>> recommendation here; https://stackoverflow.com/a/69397954/533501), to >>> ensure it doesn't stall and more or less immediately emits the row. The >>> join is on the serial number, so: >>> >>> SELECT * FROM input_stream st >>> LEFT JOIN enrichment_table FOR SYSTEM_TIME AS OF arrival_ts et >>> ON st.device_serial_number = et.sn; >>> >>> - We are using KDA, so Flink 1.15.2. >>> >>> Our issue is as follows. When we update the input schema of the Kinesis >>> Stream (we are using the Kinesis Stream Table API connector, with protobuf >>> format which I backported from 1.16), the state cannot be migrated since >>> the new type is incompatible, e.g.: >>> >>> >>> TemporalJoin[10] -> Calc[11] -> (... -> IcebergStreamWriter) (1/1)#10 >>> (...) switched from INITIALIZING to FAILED with failure cause: >>> java.lang.RuntimeException: Error while getting state >>> at >>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:109) >>> at >>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:232) >>> at >>> org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator.open(TemporalRowTimeJoinOperator.java:159) >>> ... >>> Caused by: org.apache.flink.util.StateMigrationException: The new state >>> serializer >>> (org.apache.flink.api.common.typeutils.base.MapSerializer@6621650b) >>> must not be incompatible with the old state serializer >>> (org.apache.flink.api.common.typeutils.base.MapSerializer@97ed2886). >>> >>> In this particular case, the schema evolution was an added field, so >>> something I would expect to be supported (or at least somehow supportable). >>> >>> Having a look at the code ( >>> https://github.com/apache/flink/blob/69e812688b43be9a0c4f79e6af81bc2d1d8a873e/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java#L160), >>> this is coming when the left state of the join is obtained, which, I >>> suppose, is expected since the type there has changed. >>> >>> At the moment, the only way to fix this is to restart the application >>> without state. This is of course, not ideal, since it can result in data >>> loss. My question is if there is anyway around this? I'm not sure the state >>> of the temporal join is actually required for our case because we really >>> just need to ensure that all records from the upstream Kinesis are >>> processed. It is not absolutely critical that the enrichment is >>> "deterministic" or "exactly right at the point in time", it must just be >>> eventually consistent, which is, e.g. why we use the far-future >>> watermarking in the enrichment table. Here are some options I was >>> considering, I'm not sure if they all make sense: >>> >>> - just remove the state for this operator on reboot somehow, and then >>> start the application allowing non-restored state. >>> - Somehow use a customer serializer to manage this. >>> - If it is not possible to migrate the state at all, then we would >>> somehow try to get the state of the Kinesis iterators (or maybe just a >>> timestamp) and restart the job starting from that point in the Kinesis >>> stream. >>> >>> I'm looking forward to your input, thanks for your help! >>> >>> Cheers, >>> Mike >>> >>> >>> >>> -- >>> >>> Michael Marino >>> >>> Principal Data Science & Analytics >>> >>> Phone: +49 89 7167786 - 14 >>> >>> linkedin.com/company/tadogmbh >>> <https://www.linkedin.com/company/tadogmbh> | facebook.com/tado >>> <http://www.facebook.com/tado> | twitter.com/tado >>> <http://www.twitter.com/tado> | youtube.com/tado >>> <http://www.youtube.com/tado> >>> >>> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany >>> >>> Managing Directors: Toon Bouten | Christian Deilmann | Johannes >>> Schwarz | Josef Wenzl >>> >>> Registered with the Commercial Register Munich as HRB 194769 | VAT-No: >>> DE 280012558 >>> >> > > -- > > Michael Marino > > Principal Data Science & Analytics > > Phone: +49 89 7167786 - 14 > > linkedin.com/company/tadogmbh <https://www.linkedin.com/company/tadogmbh> > | facebook.com/tado <http://www.facebook.com/tado> | twitter.com/tado > <http://www.twitter.com/tado> | youtube.com/tado > <http://www.youtube.com/tado> > > www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany > > Managing Directors: Toon Bouten | Christian Deilmann | Johannes Schwarz | > Josef Wenzl > > Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE > 280012558 >