Hi Martijn,

Indeed, I was thinking along the lines of the underlying operators
changing, which does not happen in this case (The docs here:
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution
specify that it depends on the underlying operator topology, which hinted
to me a simple schema change might work, though perhaps with some custom
serialization adaptations.)

Ok, thanks, I was thinking there might be a custom way, since the
documentation does mention custom access to state processing api,
serializers, etc., and these sounded possibly promising, especially since
it was the one operator state that was incompatible.

Is there an open issue I could watch for regarding these sorts of upcoming
changes? I've seen the other issue for enabling Table API to migrate across
versions.

I suppose I will then try my backup plan.

Cheers,
Mike



On Fri, Jul 7, 2023 at 1:35 PM Martijn Visser <martijnvis...@apache.org>
wrote:

> Hi Michael,
>
> A change in your schema would also result in a change in the columns that
> you are selecting with your SELECT * query, would it not? Given that your
> compiled plan contains the fields you're selecting, it would mean that your
> compiled plan changes when you have a different schema. After all, if you
> have a different schema, you would also need to alter the definition for
> your table in the catalog.
>
> I don't think we can and I also don't think we should try to fix just one
> case: this should have a proper solution, something that has been brought
> up in our conversation for Flink 2.0.
>
> Best regards,
>
> Martijn
>
>
>
> On Fri, Jul 7, 2023 at 11:40 AM Michael Marino <michael.mar...@tado.com>
> wrote:
>
>> Hi Martijn,
>>
>> Thanks for the reply.
>>
>> I understand that this is generally the case for the Table/SQL API,
>> though I didn't expect a change in the schema should effect a change in the
>> compiled plan, especially since we are not changing the join keys, etc. I
>> was wondering if there is a way to deal with this for this particular case
>> by, e.g.:
>>
>> - doing something to enable a custom migration of the type
>> - removing the state for the given operator when loading from savepoint
>> - make the row data be "opaque" to the operator (e.g. keeping the data as
>> *bytes* + the join key, and then doing a protobuf deserialization after the
>> join).
>> - something else entirely?
>>
>> We don't expect to be changing the input schema so often, but often
>> enough (~ 1/ month) such that we need to be able to do this without data
>> loss.
>>
>> If nothing is possible at the moment, my backup plan is to make use of
>> our committed data (our final sink is an Iceberg table, also with the
>> kinesis metadata) and restart the application using the latest info as our
>> starting point. I'd like to avoid this, though, because it would certainly
>> create a bit of complexity.
>>
>> Thanks,
>> Mike
>>
>> On Fri, Jul 7, 2023 at 10:58 AM Martijn Visser <martijnvis...@apache.org>
>> wrote:
>>
>>> Hi Michael,
>>>
>>> In the current Table API/SQL, there's no guarantee that a change to
>>> either the query or the Flink version won't lead to state incompatibility.
>>> That's also documented at
>>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/upgrading/#table-api--sql
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Fri, Jul 7, 2023 at 9:23 AM Michael Marino <michael.mar...@tado.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I had a question about how to allow a migration of state in a temporal
>>>> left join. Let me first describe what we are doing:
>>>>
>>>> - We have an input stream (Kinesis stream) of device data and we are
>>>> performing a left temporal join on this stream to enrich it with metadata
>>>> about the device as the data arrives.
>>>> - The Kinesis stream is watermarked using the arrival time, and the
>>>> enrichment table (in this case a MySQL CDC connector from ververica) is
>>>> watermarked with a constant, far-future timestamp (following the
>>>> recommendation here; https://stackoverflow.com/a/69397954/533501), to
>>>> ensure it doesn't stall and more or less immediately emits the row. The
>>>> join is on the serial number, so:
>>>>
>>>>         SELECT * FROM input_stream st
>>>>         LEFT JOIN enrichment_table FOR SYSTEM_TIME AS OF arrival_ts et
>>>>           ON st.device_serial_number = et.sn;
>>>>
>>>> - We are using KDA, so Flink 1.15.2.
>>>>
>>>> Our issue is as follows. When we update the input schema of the Kinesis
>>>> Stream (we are using the Kinesis Stream Table API connector, with protobuf
>>>> format which I backported from 1.16), the state cannot be migrated since
>>>> the new type is incompatible, e.g.:
>>>>
>>>>
>>>> TemporalJoin[10] -> Calc[11] -> (... -> IcebergStreamWriter) (1/1)#10
>>>> (...) switched from INITIALIZING to FAILED with failure cause:
>>>> java.lang.RuntimeException: Error while getting state
>>>> at
>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:109)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:232)
>>>> at
>>>> org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator.open(TemporalRowTimeJoinOperator.java:159)
>>>> ...
>>>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>>>> serializer
>>>> (org.apache.flink.api.common.typeutils.base.MapSerializer@6621650b)
>>>> must not be incompatible with the old state serializer
>>>> (org.apache.flink.api.common.typeutils.base.MapSerializer@97ed2886).
>>>>
>>>> In this particular case, the schema evolution was an added field, so
>>>> something I would expect to be supported (or at least somehow supportable).
>>>>
>>>> Having a look at the code (
>>>> https://github.com/apache/flink/blob/69e812688b43be9a0c4f79e6af81bc2d1d8a873e/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java#L160),
>>>> this is coming when the left state of the join is obtained, which, I
>>>> suppose, is expected since the type there has changed.
>>>>
>>>> At the moment, the only way to fix this is to restart the application
>>>> without state. This is of course, not ideal, since it can result in data
>>>> loss. My question is if there is anyway around this? I'm not sure the state
>>>> of the temporal join is actually required for our case because we really
>>>> just need to ensure that all records from the upstream Kinesis are
>>>> processed. It is not absolutely critical that the enrichment is
>>>> "deterministic" or "exactly right at the point in time", it must just be
>>>> eventually consistent, which is, e.g. why we use the far-future
>>>> watermarking in the enrichment table. Here are some options I was
>>>> considering, I'm not sure if they all make sense:
>>>>
>>>> - just remove the state for this operator on reboot somehow, and then
>>>> start the application allowing non-restored state.
>>>> - Somehow use a customer serializer to manage this.
>>>> - If it is not possible to migrate the state at all, then we would
>>>> somehow try to get the state of the Kinesis iterators (or maybe just a
>>>> timestamp) and restart the job starting from that point in the Kinesis
>>>> stream.
>>>>
>>>> I'm looking forward to your input, thanks for your help!
>>>>
>>>> Cheers,
>>>> Mike
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Michael Marino
>>>>
>>>> Principal Data Science & Analytics
>>>>
>>>> Phone:  +49 89 7167786 - 14
>>>>
>>>> linkedin.com/company/tadogmbh
>>>> <https://www.linkedin.com/company/tadogmbh> | facebook.com/tado
>>>> <http://www.facebook.com/tado> | twitter.com/tado
>>>> <http://www.twitter.com/tado> | youtube.com/tado
>>>> <http://www.youtube.com/tado>
>>>>
>>>> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany
>>>>
>>>>  Managing Directors: Toon Bouten | Christian Deilmann | Johannes
>>>> Schwarz | Josef Wenzl
>>>>
>>>> Registered with the Commercial Register Munich as HRB 194769 | VAT-No:
>>>> DE 280012558
>>>>
>>>
>>
>> --
>>
>> Michael Marino
>>
>> Principal Data Science & Analytics
>>
>> Phone:  +49 89 7167786 - 14
>>
>> linkedin.com/company/tadogmbh <https://www.linkedin.com/company/tadogmbh>
>>  | facebook.com/tado <http://www.facebook.com/tado> | twitter.com/tado
>> <http://www.twitter.com/tado> | youtube.com/tado
>> <http://www.youtube.com/tado>
>>
>> www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany
>>
>>  Managing Directors: Toon Bouten | Christian Deilmann | Johannes Schwarz |
>> Josef Wenzl
>>
>> Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
>> 280012558
>>
>

-- 

Michael Marino

Principal Data Science & Analytics

Phone:  +49 89 7167786 - 14

linkedin.com/company/tadogmbh <https://www.linkedin.com/company/tadogmbh> |
facebook.com/tado <http://www.facebook.com/tado> | twitter.com/tado
<http://www.twitter.com/tado> | youtube.com/tado
<http://www.youtube.com/tado>

www.tado.com | tado GmbH | Sapporobogen 6-8 | 80637 Munich | Germany

 Managing Directors: Toon Bouten | Christian Deilmann | Johannes Schwarz |
Josef Wenzl

Registered with the Commercial Register Munich as HRB 194769 | VAT-No: DE
280012558

Reply via email to