Hi, I think this would work. However, you should be aware that all keys are kept forever in state (unless you configure idle state retention time [1]). This includes previous versions of keys.
Also note that we are not guaranteeing savepoint compatibility across Flink versions yet. If the state of the aggregation operator changes in a later version (say Flink 1.9.x), it might not be possible to migrate to a later Flink version. Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/query_configuration.html#idle-state-retention-time Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky < shahar.kobrin...@gmail.com>: > My bad. it actually did work with > Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map > group by a > > do you think thats OK as a workaround? main schema should be changed that > way - only keys in the map > > On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky < > shahar.kobrin...@gmail.com> wrote: > >> Thanks Fabian, >> >> Im thinking about how to work around that issue and one thing that came >> to my mind is to create a map that holds keys & values that can be edited >> without changing the schema, though im thinking how to implement it in >> Calcite. >> Considering the following original SQL in which "metrics" can be >> added/deleted/renamed >> Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c >> Group by a >> >> im looking both at json_objectagg & map to change it but it seems that >> json_objectagg is on a later calcite version and map doesnt work for me. >> Trying something like >> Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as >> metric_map >> group by a >> >> results with "Non-query expression encountered in illegal context" >> is my train of thought the right one? if so, do i have a mistake in the >> way im trying to implement it? >> >> Thanks! >> >> >> >> >> >> >> >> On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi, >>> >>> Restarting a changed query from a savepoint is currently not supported. >>> In general this is a very difficult problem as new queries might result >>> in completely different execution plans. >>> The special case of adding and removing aggregates is easier to solve, >>> but the schema of the stored state changes and we would need to analyze the >>> previous and current query and generate compatible serializers. >>> So far we did not explore this rabbit hole. >>> >>> Also, starting a different query from a savepoint can also lead to weird >>> result semantics. >>> I'd recommend to bootstrap the state of the new query from scatch. >>> >>> Best, Fabian >>> >>> >>> >>> Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky < >>> shahar.kobrin...@gmail.com>: >>> >>>> Or is it the SQL state that is incompatible.. ? >>>> >>>> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky < >>>> shahar.kobrin...@gmail.com> wrote: >>>> >>>>> Thanks Guys, >>>>> >>>>> I actually got an error now adding some fields into the select >>>>> statement: >>>>> >>>>> java.lang.RuntimeException: Error while getting state >>>>> at >>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) >>>>> at >>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135) >>>>> at >>>>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74) >>>>> at >>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >>>>> at >>>>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> Caused by: org.apache.flink.util.StateMigrationException: For heap >>>>> backends, the new state serializer must not be incompatible. >>>>> at >>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301) >>>>> at >>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341) >>>>> at >>>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) >>>>> at >>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63) >>>>> at >>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241) >>>>> at >>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290) >>>>> at >>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) >>>>> at >>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) >>>>> ... 9 more >>>>> >>>>> Does that mean i should move from having a Pojo storing the result of >>>>> the SQL retracted stream to Avro? trying to understand how to mitigate it. >>>>> >>>>> Thanks >>>>> >>>>> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <walter...@gmail.com> wrote: >>>>> >>>>>> Hi Shahar, >>>>>> >>>>>> From my understanding, if you use "groupby" withAggregateFunctions, >>>>>> they save the accumulators to SQL internal states: which are invariant >>>>>> from >>>>>> your input schema. Based on what you described I think that's why it is >>>>>> fine for recovering from existing state. >>>>>> I think one confusion you might have is the "toRetractStream" syntax. >>>>>> This actually passes the "retracting" flag to the Flink planner to >>>>>> indicate >>>>>> how the DataStream operator gets generated based on your SQL. >>>>>> >>>>>> So in my understanding, there's really no "state" associated with the >>>>>> "retracting stream", but rather associated with the generated operators. >>>>>> However, I am not expert in Table/SQL state recovery: I recall there >>>>>> were an open JIRA[1] that might be related to your question regarding >>>>>> SQL/Table generated operator recovery. Maybe @Fabian can provide more >>>>>> insight here? >>>>>> >>>>>> Regarding the rest of the pipeline, both "filter" and "map" operators >>>>>> are stateless; and sink state recovery depends on what you do. >>>>>> >>>>>> -- >>>>>> Rong >>>>>> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-6966 >>>>>> >>>>>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <shahar.kobrin...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks Rong, >>>>>>> >>>>>>> I have made some quick test changing the SQL select (adding a select >>>>>>> field >>>>>>> in the middle) and reran the job from a savepoint and it worked >>>>>>> without any >>>>>>> errors. I want to make sure i understand how at what point the state >>>>>>> is >>>>>>> stored and how does it work. >>>>>>> >>>>>>> Let's simplify the scenario and forget my specific case of >>>>>>> dynamically >>>>>>> generated pojo. let's focus on generic steps of: >>>>>>> Source->register table->SQL select and group by session->retracted >>>>>>> stream >>>>>>> (Row)->transformToPojo (Custom Map function) ->pushToSink >>>>>>> >>>>>>> And let's assume the SQL select is changed (a field is added >>>>>>> somewhere in >>>>>>> the middle of the select field). >>>>>>> So: >>>>>>> We had intermediate results that are in the old format that are >>>>>>> loaded from >>>>>>> state to the new Row object in the retracted stream. is that an >>>>>>> accurate >>>>>>> statement? at what operator/format is the state stored in this case? >>>>>>> is it >>>>>>> the SQL result/Row? is it the Pojo? as this scenario does not fail >>>>>>> for me im >>>>>>> trying to understand how/where it is handled in Flink? >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Sent from: >>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>>>> >>>>>>