Thanks Fabian,

Im thinking about how to work around that issue and one thing that came to
my mind is to create a map that holds keys & values that can be edited
without changing the schema, though im thinking how to implement it in
Calcite.
Considering the following original SQL in which "metrics" can be
added/deleted/renamed
Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
Group by a

im looking both at json_objectagg & map to change it but it seems that
json_objectagg is on a later calcite version and map doesnt work for me.
Trying something like
Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map
group by a

results with "Non-query expression encountered in illegal context"
is my train of thought the right one? if so, do i have a mistake in the way
im trying to implement it?

Thanks!







On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> Restarting a changed query from a savepoint is currently not supported.
> In general this is a very difficult problem as new queries might result in
> completely different execution plans.
> The special case of adding and removing aggregates is easier to solve, but
> the schema of the stored state changes and we would need to analyze the
> previous and current query and generate compatible serializers.
> So far we did not explore this rabbit hole.
>
> Also, starting a different query from a savepoint can also lead to weird
> result semantics.
> I'd recommend to bootstrap the state of the new query from scatch.
>
> Best, Fabian
>
>
>
> Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <
> shahar.kobrin...@gmail.com>:
>
>> Or is it the SQL state that is incompatible.. ?
>>
>> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <
>> shahar.kobrin...@gmail.com> wrote:
>>
>>> Thanks Guys,
>>>
>>> I actually got an error now adding some fields into the select statement:
>>>
>>> java.lang.RuntimeException: Error while getting state
>>> at
>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
>>> at
>>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
>>> at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>> at
>>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.util.StateMigrationException: For heap
>>> backends, the new state serializer must not be incompatible.
>>> at
>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
>>> at
>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
>>> at
>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>> at
>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
>>> at
>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
>>> at
>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
>>> at
>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>>> at
>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>> ... 9 more
>>>
>>> Does that mean i should move from having a Pojo storing the result of
>>> the SQL retracted stream to Avro? trying to understand how to mitigate it.
>>>
>>> Thanks
>>>
>>> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <walter...@gmail.com> wrote:
>>>
>>>> Hi Shahar,
>>>>
>>>> From my understanding, if you use "groupby" withAggregateFunctions,
>>>> they save the accumulators to SQL internal states: which are invariant from
>>>> your input schema. Based on what you described I think that's why it is
>>>> fine for recovering from existing state.
>>>> I think one confusion you might have is the "toRetractStream" syntax.
>>>> This actually passes the "retracting" flag to the Flink planner to indicate
>>>> how the DataStream operator gets generated based on your SQL.
>>>>
>>>> So in my understanding, there's really no "state" associated with the
>>>> "retracting stream", but rather associated with the generated operators.
>>>> However, I am not expert in Table/SQL state recovery: I recall there
>>>> were an open JIRA[1] that might be related to your question regarding
>>>> SQL/Table generated operator recovery. Maybe @Fabian can provide more
>>>> insight here?
>>>>
>>>> Regarding the rest of the pipeline, both "filter" and "map" operators
>>>> are stateless; and sink state recovery depends on what you do.
>>>>
>>>> --
>>>> Rong
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-6966
>>>>
>>>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <shahar.kobrin...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Rong,
>>>>>
>>>>> I have made some quick test changing the SQL select (adding a select
>>>>> field
>>>>> in the middle) and reran the job from a savepoint and it worked
>>>>> without any
>>>>> errors. I want to make sure i understand how at what point the state is
>>>>> stored and how does it work.
>>>>>
>>>>> Let's simplify the scenario and forget my specific case of dynamically
>>>>> generated pojo. let's focus on generic steps of:
>>>>> Source->register table->SQL select and group by session->retracted
>>>>> stream
>>>>> (Row)->transformToPojo (Custom Map function) ->pushToSink
>>>>>
>>>>> And let's assume the SQL select is changed (a field is added somewhere
>>>>> in
>>>>> the middle of the select field).
>>>>> So:
>>>>> We had intermediate results that are in the old format that are loaded
>>>>> from
>>>>> state to the new Row object in the retracted stream. is that an
>>>>> accurate
>>>>> statement? at what operator/format is the state stored in this case?
>>>>> is it
>>>>> the SQL result/Row? is it the Pojo? as this scenario does not fail for
>>>>> me im
>>>>> trying to understand how/where it is handled in Flink?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Sent from:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>
>>>>

Reply via email to