Hi,

I think this would work.
However, you should be aware that all keys are kept forever in state
(unless you configure idle state retention time [1]).
This includes previous versions of keys.

Also note that we are not guaranteeing savepoint compatibility across Flink
versions yet.
If the state of the aggregation operator changes in a later version (say
Flink 1.9.x), it might not be possible to migrate to a later Flink version.
Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided.

Best,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/query_configuration.html#idle-state-retention-time

Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky <
shahar.kobrin...@gmail.com>:

> My bad. it actually did work with
> Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
> group by a
>
> do you think thats OK as a workaround? main schema should be changed that
> way - only keys in the map
>
> On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <
> shahar.kobrin...@gmail.com> wrote:
>
>> Thanks Fabian,
>>
>> Im thinking about how to work around that issue and one thing that came
>> to my mind is to create a map that holds keys & values that can be edited
>> without changing the schema, though im thinking how to implement it in
>> Calcite.
>> Considering the following original SQL in which "metrics" can be
>> added/deleted/renamed
>> Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
>> Group by a
>>
>> im looking both at json_objectagg & map to change it but it seems that
>> json_objectagg is on a later calcite version and map doesnt work for me.
>> Trying something like
>> Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as
>> metric_map
>> group by a
>>
>> results with "Non-query expression encountered in illegal context"
>> is my train of thought the right one? if so, do i have a mistake in the
>> way im trying to implement it?
>>
>> Thanks!
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Restarting a changed query from a savepoint is currently not supported.
>>> In general this is a very difficult problem as new queries might result
>>> in completely different execution plans.
>>> The special case of adding and removing aggregates is easier to solve,
>>> but the schema of the stored state changes and we would need to analyze the
>>> previous and current query and generate compatible serializers.
>>> So far we did not explore this rabbit hole.
>>>
>>> Also, starting a different query from a savepoint can also lead to weird
>>> result semantics.
>>> I'd recommend to bootstrap the state of the new query from scatch.
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <
>>> shahar.kobrin...@gmail.com>:
>>>
>>>> Or is it the SQL state that is incompatible.. ?
>>>>
>>>> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <
>>>> shahar.kobrin...@gmail.com> wrote:
>>>>
>>>>> Thanks Guys,
>>>>>
>>>>> I actually got an error now adding some fields into the select
>>>>> statement:
>>>>>
>>>>> java.lang.RuntimeException: Error while getting state
>>>>> at
>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
>>>>> at
>>>>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
>>>>> at
>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: org.apache.flink.util.StateMigrationException: For heap
>>>>> backends, the new state serializer must not be incompatible.
>>>>> at
>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
>>>>> at
>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
>>>>> at
>>>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>>>> at
>>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
>>>>> at
>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
>>>>> at
>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
>>>>> at
>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>>>>> at
>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>>>> ... 9 more
>>>>>
>>>>> Does that mean i should move from having a Pojo storing the result of
>>>>> the SQL retracted stream to Avro? trying to understand how to mitigate it.
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <walter...@gmail.com> wrote:
>>>>>
>>>>>> Hi Shahar,
>>>>>>
>>>>>> From my understanding, if you use "groupby" withAggregateFunctions,
>>>>>> they save the accumulators to SQL internal states: which are invariant 
>>>>>> from
>>>>>> your input schema. Based on what you described I think that's why it is
>>>>>> fine for recovering from existing state.
>>>>>> I think one confusion you might have is the "toRetractStream" syntax.
>>>>>> This actually passes the "retracting" flag to the Flink planner to 
>>>>>> indicate
>>>>>> how the DataStream operator gets generated based on your SQL.
>>>>>>
>>>>>> So in my understanding, there's really no "state" associated with the
>>>>>> "retracting stream", but rather associated with the generated operators.
>>>>>> However, I am not expert in Table/SQL state recovery: I recall there
>>>>>> were an open JIRA[1] that might be related to your question regarding
>>>>>> SQL/Table generated operator recovery. Maybe @Fabian can provide more
>>>>>> insight here?
>>>>>>
>>>>>> Regarding the rest of the pipeline, both "filter" and "map" operators
>>>>>> are stateless; and sink state recovery depends on what you do.
>>>>>>
>>>>>> --
>>>>>> Rong
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-6966
>>>>>>
>>>>>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <shahar.kobrin...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Rong,
>>>>>>>
>>>>>>> I have made some quick test changing the SQL select (adding a select
>>>>>>> field
>>>>>>> in the middle) and reran the job from a savepoint and it worked
>>>>>>> without any
>>>>>>> errors. I want to make sure i understand how at what point the state
>>>>>>> is
>>>>>>> stored and how does it work.
>>>>>>>
>>>>>>> Let's simplify the scenario and forget my specific case of
>>>>>>> dynamically
>>>>>>> generated pojo. let's focus on generic steps of:
>>>>>>> Source->register table->SQL select and group by session->retracted
>>>>>>> stream
>>>>>>> (Row)->transformToPojo (Custom Map function) ->pushToSink
>>>>>>>
>>>>>>> And let's assume the SQL select is changed (a field is added
>>>>>>> somewhere in
>>>>>>> the middle of the select field).
>>>>>>> So:
>>>>>>> We had intermediate results that are in the old format that are
>>>>>>> loaded from
>>>>>>> state to the new Row object in the retracted stream. is that an
>>>>>>> accurate
>>>>>>> statement? at what operator/format is the state stored in this case?
>>>>>>> is it
>>>>>>> the SQL result/Row? is it the Pojo? as this scenario does not fail
>>>>>>> for me im
>>>>>>> trying to understand how/where it is handled in Flink?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Sent from:
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>>
>>>>>>

Reply via email to