Hi Michael,

A change in your schema would also result in a change in the columns that
you are selecting with your SELECT * query, would it not? Given that your
compiled plan contains the fields you're selecting, it would mean that your
compiled plan changes when you have a different schema. After all, if you
have a different schema, you would also need to alter the definition for
your table in the catalog.

I don't think we can and I also don't think we should try to fix just one
case: this should have a proper solution, something that has been brought
up in our conversation for Flink 2.0.

Best regards,

Martijn



On Fri, Jul 7, 2023 at 11:40 AM Michael Marino <michael.mar...@tado.com>
wrote:

> Hi Martijn,
>
> Thanks for the reply.
>
> I understand that this is generally the case for the Table/SQL API, though
> I didn't expect a change in the schema should effect a change in the
> compiled plan, especially since we are not changing the join keys, etc. I
> was wondering if there is a way to deal with this for this particular case
> by, e.g.:
>
> - doing something to enable a custom migration of the type
> - removing the state for the given operator when loading from savepoint
> - make the row data be "opaque" to the operator (e.g. keeping the data as
> *bytes* + the join key, and then doing a protobuf deserialization after the
> join).
> - something else entirely?
>
> We don't expect to be changing the input schema so often, but often enough
> (~ 1/ month) such that we need to be able to do this without data loss.
>
> If nothing is possible at the moment, my backup plan is to make use of our
> committed data (our final sink is an Iceberg table, also with the kinesis
> metadata) and restart the application using the latest info as our starting
> point. I'd like to avoid this, though, because it would certainly create a
> bit of complexity.
>
> Thanks,
> Mike
>
> On Fri, Jul 7, 2023 at 10:58 AM Martijn Visser <martijnvis...@apache.org>
> wrote:
>
>> Hi Michael,
>>
>> In the current Table API/SQL, there's no guarantee that a change to
>> either the query or the Flink version won't lead to state incompatibility.
>> That's also documented at
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/upgrading/#table-api--sql
>>
>> Best regards,
>>
>> Martijn
>>
>> On Fri, Jul 7, 2023 at 9:23 AM Michael Marino <michael.mar...@tado.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I had a question about how to allow a migration of state in a temporal
>>> left join. Let me first describe what we are doing:
>>>
>>> - We have an input stream (Kinesis stream) of device data and we are
>>> performing a left temporal join on this stream to enrich it with metadata
>>> about the device as the data arrives.
>>> - The Kinesis stream is watermarked using the arrival time, and the
>>> enrichment table (in this case a MySQL CDC connector from ververica) is
>>> watermarked with a constant, far-future timestamp (following the
>>> recommendation here; https://stackoverflow.com/a/69397954/533501), to
>>> ensure it doesn't stall and more or less immediately emits the row. The
>>> join is on the serial number, so:
>>>
>>>         SELECT * FROM input_stream st
>>>         LEFT JOIN enrichment_table FOR SYSTEM_TIME AS OF arrival_ts et
>>>           ON st.device_serial_number = et.sn;
>>>
>>> - We are using KDA, so Flink 1.15.2.
>>>
>>> Our issue is as follows. When we update the input schema of the Kinesis
>>> Stream (we are using the Kinesis Stream Table API connector, with protobuf
>>> format which I backported from 1.16), the state cannot be migrated since
>>> the new type is incompatible, e.g.:
>>>
>>>
>>> TemporalJoin[10] -> Calc[11] -> (... -> IcebergStreamWriter) (1/1)#10
>>> (...) switched from INITIALIZING to FAILED with failure cause:
>>> java.lang.RuntimeException: Error while getting state
>>> at
>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:109)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:232)
>>> at
>>> org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator.open(TemporalRowTimeJoinOperator.java:159)
>>> ...
>>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>>> serializer
>>> (org.apache.flink.api.common.typeutils.base.MapSerializer@6621650b)
>>> must not be incompatible with the old state serializer
>>> (org.apache.flink.api.common.typeutils.base.MapSerializer@97ed2886).
>>>
>>> In this particular case, the schema evolution was an added field, so
>>> something I would expect to be supported (or at least somehow supportable).
>>>
>>> Having a look at the code (
>>> https://github.com/apache/flink/blob/69e812688b43be9a0c4f79e6af81bc2d1d8a873e/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java#L160),
>>> this is coming when the left state of the join is obtained, which, I
>>> suppose, is expected since the type there has changed.
>>>
>>> At the moment, the only way to fix this is to restart the application
>>> without state. This is of course, not ideal, since it can result in data
>>> loss. My question is if there is anyway around this? I'm not sure the state
>>> of the temporal join is actually required for our case because we really
>>> just need to ensure that all records from the upstream Kinesis are
>>> processed. It is not absolutely critical that the enrichment is
>>> "deterministic" or "exactly right at the point in time", it must just be
>>> eventually consistent, which is, e.g. why we use the far-future
>>> watermarking in the enrichment table. Here are some options I was
>>> considering, I'm not sure if they all make sense:
>>>
>>> - just remove the state for this operator on reboot somehow, and then
>>> start the application allowing non-restored state.
>>> - Somehow use a customer serializer to manage this.
>>> - If it is not possible to migrate the state at all, then we would
>>> somehow try to get the state of the Kinesis iterators (or maybe just a
>>> timestamp) and restart the job starting from that point in the Kinesis
>>> stream.
>>>
>>> I'm looking forward to your input, thanks for your help!
>>>
>>> Cheers,
>>> Mike
>>>
>>>
>>>
>>> --
>>>
>>> Michael Marino
>>>
>>> Principal Data Science & Analytics
>>>
>>> Phone:  +49 89 7167786 - 14
>>>
>>> linkedin.com/company/tadogmbh
>>> <https://www.linkedin.com/company/tadogmbh> | facebook.com/tado
>>> <http://www.facebook.com/tado> | twitter.com/tado
>>> <http://www.twitter.com/tado> | youtube.com/tado
>>> <http://www.youtube.com/tado>
>>>
>>> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany
>>>
>>>  Managing Directors: Toon Bouten | Christian Deilmann | Johannes
>>> Schwarz | Josef Wenzl
>>>
>>> Registered with the Commercial Register Munich as HRB 194769 | VAT-No:
>>> DE 280012558
>>>
>>
>
> --
>
> Michael Marino
>
> Principal Data Science & Analytics
>
> Phone:  +49 89 7167786 - 14
>
> linkedin.com/company/tadogmbh <https://www.linkedin.com/company/tadogmbh>
> | facebook.com/tado <http://www.facebook.com/tado> | twitter.com/tado
> <http://www.twitter.com/tado> | youtube.com/tado
> <http://www.youtube.com/tado>
>
> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany
>
>  Managing Directors: Toon Bouten | Christian Deilmann | Johannes Schwarz |
> Josef Wenzl
>
> Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
> 280012558
>

Reply via email to