Hi, Restarting a changed query from a savepoint is currently not supported. In general this is a very difficult problem as new queries might result in completely different execution plans. The special case of adding and removing aggregates is easier to solve, but the schema of the stored state changes and we would need to analyze the previous and current query and generate compatible serializers. So far we did not explore this rabbit hole.
Also, starting a different query from a savepoint can also lead to weird result semantics. I'd recommend to bootstrap the state of the new query from scatch. Best, Fabian Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky < shahar.kobrin...@gmail.com>: > Or is it the SQL state that is incompatible.. ? > > On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky < > shahar.kobrin...@gmail.com> wrote: > >> Thanks Guys, >> >> I actually got an error now adding some fields into the select statement: >> >> java.lang.RuntimeException: Error while getting state >> at >> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) >> at >> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135) >> at >> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74) >> at >> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >> at >> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.flink.util.StateMigrationException: For heap >> backends, the new state serializer must not be incompatible. >> at >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301) >> at >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341) >> at >> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) >> at >> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63) >> at >> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241) >> at >> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290) >> at >> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) >> at >> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) >> ... 9 more >> >> Does that mean i should move from having a Pojo storing the result of the >> SQL retracted stream to Avro? trying to understand how to mitigate it. >> >> Thanks >> >> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <walter...@gmail.com> wrote: >> >>> Hi Shahar, >>> >>> From my understanding, if you use "groupby" withAggregateFunctions, they >>> save the accumulators to SQL internal states: which are invariant from your >>> input schema. Based on what you described I think that's why it is fine for >>> recovering from existing state. >>> I think one confusion you might have is the "toRetractStream" syntax. >>> This actually passes the "retracting" flag to the Flink planner to indicate >>> how the DataStream operator gets generated based on your SQL. >>> >>> So in my understanding, there's really no "state" associated with the >>> "retracting stream", but rather associated with the generated operators. >>> However, I am not expert in Table/SQL state recovery: I recall there >>> were an open JIRA[1] that might be related to your question regarding >>> SQL/Table generated operator recovery. Maybe @Fabian can provide more >>> insight here? >>> >>> Regarding the rest of the pipeline, both "filter" and "map" operators >>> are stateless; and sink state recovery depends on what you do. >>> >>> -- >>> Rong >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-6966 >>> >>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <shahar.kobrin...@gmail.com> >>> wrote: >>> >>>> Thanks Rong, >>>> >>>> I have made some quick test changing the SQL select (adding a select >>>> field >>>> in the middle) and reran the job from a savepoint and it worked without >>>> any >>>> errors. I want to make sure i understand how at what point the state is >>>> stored and how does it work. >>>> >>>> Let's simplify the scenario and forget my specific case of dynamically >>>> generated pojo. let's focus on generic steps of: >>>> Source->register table->SQL select and group by session->retracted >>>> stream >>>> (Row)->transformToPojo (Custom Map function) ->pushToSink >>>> >>>> And let's assume the SQL select is changed (a field is added somewhere >>>> in >>>> the middle of the select field). >>>> So: >>>> We had intermediate results that are in the old format that are loaded >>>> from >>>> state to the new Row object in the retracted stream. is that an accurate >>>> statement? at what operator/format is the state stored in this case? is >>>> it >>>> the SQL result/Row? is it the Pojo? as this scenario does not fail for >>>> me im >>>> trying to understand how/where it is handled in Flink? >>>> >>>> >>>> >>>> >>>> >>>> -- >>>> Sent from: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>> >>>