Thanks Fabian, Im thinking about how to work around that issue and one thing that came to my mind is to create a map that holds keys & values that can be edited without changing the schema, though im thinking how to implement it in Calcite. Considering the following original SQL in which "metrics" can be added/deleted/renamed Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c Group by a
im looking both at json_objectagg & map to change it but it seems that json_objectagg is on a later calcite version and map doesnt work for me. Trying something like Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map group by a results with "Non-query expression encountered in illegal context" is my train of thought the right one? if so, do i have a mistake in the way im trying to implement it? Thanks! On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > Restarting a changed query from a savepoint is currently not supported. > In general this is a very difficult problem as new queries might result in > completely different execution plans. > The special case of adding and removing aggregates is easier to solve, but > the schema of the stored state changes and we would need to analyze the > previous and current query and generate compatible serializers. > So far we did not explore this rabbit hole. > > Also, starting a different query from a savepoint can also lead to weird > result semantics. > I'd recommend to bootstrap the state of the new query from scatch. > > Best, Fabian > > > > Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky < > shahar.kobrin...@gmail.com>: > >> Or is it the SQL state that is incompatible.. ? >> >> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky < >> shahar.kobrin...@gmail.com> wrote: >> >>> Thanks Guys, >>> >>> I actually got an error now adding some fields into the select statement: >>> >>> java.lang.RuntimeException: Error while getting state >>> at >>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) >>> at >>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135) >>> at >>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74) >>> at >>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >>> at >>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: org.apache.flink.util.StateMigrationException: For heap >>> backends, the new state serializer must not be incompatible. >>> at >>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301) >>> at >>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341) >>> at >>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) >>> at >>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63) >>> at >>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241) >>> at >>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290) >>> at >>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) >>> at >>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) >>> ... 9 more >>> >>> Does that mean i should move from having a Pojo storing the result of >>> the SQL retracted stream to Avro? trying to understand how to mitigate it. >>> >>> Thanks >>> >>> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <walter...@gmail.com> wrote: >>> >>>> Hi Shahar, >>>> >>>> From my understanding, if you use "groupby" withAggregateFunctions, >>>> they save the accumulators to SQL internal states: which are invariant from >>>> your input schema. Based on what you described I think that's why it is >>>> fine for recovering from existing state. >>>> I think one confusion you might have is the "toRetractStream" syntax. >>>> This actually passes the "retracting" flag to the Flink planner to indicate >>>> how the DataStream operator gets generated based on your SQL. >>>> >>>> So in my understanding, there's really no "state" associated with the >>>> "retracting stream", but rather associated with the generated operators. >>>> However, I am not expert in Table/SQL state recovery: I recall there >>>> were an open JIRA[1] that might be related to your question regarding >>>> SQL/Table generated operator recovery. Maybe @Fabian can provide more >>>> insight here? >>>> >>>> Regarding the rest of the pipeline, both "filter" and "map" operators >>>> are stateless; and sink state recovery depends on what you do. >>>> >>>> -- >>>> Rong >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-6966 >>>> >>>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <shahar.kobrin...@gmail.com> >>>> wrote: >>>> >>>>> Thanks Rong, >>>>> >>>>> I have made some quick test changing the SQL select (adding a select >>>>> field >>>>> in the middle) and reran the job from a savepoint and it worked >>>>> without any >>>>> errors. I want to make sure i understand how at what point the state is >>>>> stored and how does it work. >>>>> >>>>> Let's simplify the scenario and forget my specific case of dynamically >>>>> generated pojo. let's focus on generic steps of: >>>>> Source->register table->SQL select and group by session->retracted >>>>> stream >>>>> (Row)->transformToPojo (Custom Map function) ->pushToSink >>>>> >>>>> And let's assume the SQL select is changed (a field is added somewhere >>>>> in >>>>> the middle of the select field). >>>>> So: >>>>> We had intermediate results that are in the old format that are loaded >>>>> from >>>>> state to the new Row object in the retracted stream. is that an >>>>> accurate >>>>> statement? at what operator/format is the state stored in this case? >>>>> is it >>>>> the SQL result/Row? is it the Pojo? as this scenario does not fail for >>>>> me im >>>>> trying to understand how/where it is handled in Flink? >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Sent from: >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>> >>>>