Hi Shahar, Sorry for the late response.
The problem is not with the type of the retract stream, but with the GROUP BY aggregation operator. The optimizer is converting the plan into an aggregation operator that computes all aggregates followed by a projection that inserts the aggregation results into a MAP type. The problem is the state of the aggregation operator. By adding a new field to the map, the state of the operator changes and you cannot restore it. The only workaround that I can think of would be to implement a user-defined aggregation function [1] that performs all aggregations internally and manually maintain state compatibility for the accumulator of the UDAGG. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#aggregation-functions Am Do., 28. März 2019 um 22:28 Uhr schrieb Shahar Cizer Kobrinsky < shahar.kobrin...@gmail.com>: > Hmm kinda stuck here. Seems like SQL Group by is translated to a > *GroupAggProcessFunction* which stores a state for every aggregation > element (thus flattening the map items for state store). Seems like there's > no way around it. Am i wrong? is there any way to evolve the map elements > when doing *SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by .. *? > > On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> I think this would work. >> However, you should be aware that all keys are kept forever in state >> (unless you configure idle state retention time [1]). >> This includes previous versions of keys. >> >> Also note that we are not guaranteeing savepoint compatibility across >> Flink versions yet. >> If the state of the aggregation operator changes in a later version (say >> Flink 1.9.x), it might not be possible to migrate to a later Flink version. >> Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided. >> >> Best, >> Fabian >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/query_configuration.html#idle-state-retention-time >> >> Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky < >> shahar.kobrin...@gmail.com>: >> >>> My bad. it actually did work with >>> Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map >>> group by a >>> >>> do you think thats OK as a workaround? main schema should be changed >>> that way - only keys in the map >>> >>> On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky < >>> shahar.kobrin...@gmail.com> wrote: >>> >>>> Thanks Fabian, >>>> >>>> Im thinking about how to work around that issue and one thing that came >>>> to my mind is to create a map that holds keys & values that can be edited >>>> without changing the schema, though im thinking how to implement it in >>>> Calcite. >>>> Considering the following original SQL in which "metrics" can be >>>> added/deleted/renamed >>>> Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c >>>> Group by a >>>> >>>> im looking both at json_objectagg & map to change it but it seems that >>>> json_objectagg is on a later calcite version and map doesnt work for me. >>>> Trying something like >>>> Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as >>>> metric_map >>>> group by a >>>> >>>> results with "Non-query expression encountered in illegal context" >>>> is my train of thought the right one? if so, do i have a mistake in the >>>> way im trying to implement it? >>>> >>>> Thanks! >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> Restarting a changed query from a savepoint is currently not supported. >>>>> In general this is a very difficult problem as new queries might >>>>> result in completely different execution plans. >>>>> The special case of adding and removing aggregates is easier to solve, >>>>> but the schema of the stored state changes and we would need to analyze >>>>> the >>>>> previous and current query and generate compatible serializers. >>>>> So far we did not explore this rabbit hole. >>>>> >>>>> Also, starting a different query from a savepoint can also lead to >>>>> weird result semantics. >>>>> I'd recommend to bootstrap the state of the new query from scatch. >>>>> >>>>> Best, Fabian >>>>> >>>>> >>>>> >>>>> Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky < >>>>> shahar.kobrin...@gmail.com>: >>>>> >>>>>> Or is it the SQL state that is incompatible.. ? >>>>>> >>>>>> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky < >>>>>> shahar.kobrin...@gmail.com> wrote: >>>>>> >>>>>>> Thanks Guys, >>>>>>> >>>>>>> I actually got an error now adding some fields into the select >>>>>>> statement: >>>>>>> >>>>>>> java.lang.RuntimeException: Error while getting state >>>>>>> at >>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135) >>>>>>> at >>>>>>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74) >>>>>>> at >>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) >>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>> Caused by: org.apache.flink.util.StateMigrationException: For heap >>>>>>> backends, the new state serializer must not be incompatible. >>>>>>> at >>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301) >>>>>>> at >>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341) >>>>>>> at >>>>>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) >>>>>>> at >>>>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63) >>>>>>> at >>>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241) >>>>>>> at >>>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290) >>>>>>> at >>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) >>>>>>> at >>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) >>>>>>> ... 9 more >>>>>>> >>>>>>> Does that mean i should move from having a Pojo storing the result >>>>>>> of the SQL retracted stream to Avro? trying to understand how to >>>>>>> mitigate >>>>>>> it. >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <walter...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Shahar, >>>>>>>> >>>>>>>> From my understanding, if you use "groupby" withAggregateFunctions, >>>>>>>> they save the accumulators to SQL internal states: which are invariant >>>>>>>> from >>>>>>>> your input schema. Based on what you described I think that's why it is >>>>>>>> fine for recovering from existing state. >>>>>>>> I think one confusion you might have is the "toRetractStream" >>>>>>>> syntax. This actually passes the "retracting" flag to the Flink >>>>>>>> planner to >>>>>>>> indicate how the DataStream operator gets generated based on your SQL. >>>>>>>> >>>>>>>> So in my understanding, there's really no "state" associated with >>>>>>>> the "retracting stream", but rather associated with the generated >>>>>>>> operators. >>>>>>>> However, I am not expert in Table/SQL state recovery: I recall >>>>>>>> there were an open JIRA[1] that might be related to your question >>>>>>>> regarding >>>>>>>> SQL/Table generated operator recovery. Maybe @Fabian can provide more >>>>>>>> insight here? >>>>>>>> >>>>>>>> Regarding the rest of the pipeline, both "filter" and "map" >>>>>>>> operators are stateless; and sink state recovery depends on what you >>>>>>>> do. >>>>>>>> >>>>>>>> -- >>>>>>>> Rong >>>>>>>> >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-6966 >>>>>>>> >>>>>>>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <shahar.kobrin...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks Rong, >>>>>>>>> >>>>>>>>> I have made some quick test changing the SQL select (adding a >>>>>>>>> select field >>>>>>>>> in the middle) and reran the job from a savepoint and it worked >>>>>>>>> without any >>>>>>>>> errors. I want to make sure i understand how at what point the >>>>>>>>> state is >>>>>>>>> stored and how does it work. >>>>>>>>> >>>>>>>>> Let's simplify the scenario and forget my specific case of >>>>>>>>> dynamically >>>>>>>>> generated pojo. let's focus on generic steps of: >>>>>>>>> Source->register table->SQL select and group by session->retracted >>>>>>>>> stream >>>>>>>>> (Row)->transformToPojo (Custom Map function) ->pushToSink >>>>>>>>> >>>>>>>>> And let's assume the SQL select is changed (a field is added >>>>>>>>> somewhere in >>>>>>>>> the middle of the select field). >>>>>>>>> So: >>>>>>>>> We had intermediate results that are in the old format that are >>>>>>>>> loaded from >>>>>>>>> state to the new Row object in the retracted stream. is that an >>>>>>>>> accurate >>>>>>>>> statement? at what operator/format is the state stored in this >>>>>>>>> case? is it >>>>>>>>> the SQL result/Row? is it the Pojo? as this scenario does not fail >>>>>>>>> for me im >>>>>>>>> trying to understand how/where it is handled in Flink? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Sent from: >>>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>>>>>> >>>>>>>>