My bad. it actually did work with Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map group by a
do you think thats OK as a workaround? main schema should be changed that way - only keys in the map On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky < shahar.kobrin...@gmail.com> wrote: > Thanks Fabian, > > Im thinking about how to work around that issue and one thing that came to > my mind is to create a map that holds keys & values that can be edited > without changing the schema, though im thinking how to implement it in > Calcite. > Considering the following original SQL in which "metrics" can be > added/deleted/renamed > Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c > Group by a > > im looking both at json_objectagg & map to change it but it seems that > json_objectagg is on a later calcite version and map doesnt work for me. > Trying something like > Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map > group by a > > results with "Non-query expression encountered in illegal context" > is my train of thought the right one? if so, do i have a mistake in the > way im trying to implement it? > > Thanks! > > > > > > > > On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> Restarting a changed query from a savepoint is currently not supported. >> In general this is a very difficult problem as new queries might result >> in completely different execution plans. >> The special case of adding and removing aggregates is easier to solve, >> but the schema of the stored state changes and we would need to analyze the >> previous and current query and generate compatible serializers. >> So far we did not explore this rabbit hole. >> >> Also, starting a different query from a savepoint can also lead to weird >> result semantics. >> I'd recommend to bootstrap the state of the new query from scatch. >> >> Best, Fabian >> >> >> >> Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky < >> shahar.kobrin...@gmail.com>: >> >>> Or is it the SQL state that is incompatible.. ? >>> >>> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky < >>> shahar.kobrin...@gmail.com> wrote: >>> >>>> Thanks Guys, >>>> >>>> I actually got an error now adding some fields into the select >>>> statement: >>>> >>>> java.lang.RuntimeException: Error while getting state >>>> at >>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135) >>>> at >>>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74) >>>> at >>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >>>> at >>>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: org.apache.flink.util.StateMigrationException: For heap >>>> backends, the new state serializer must not be incompatible. >>>> at >>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301) >>>> at >>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341) >>>> at >>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) >>>> at >>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63) >>>> at >>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241) >>>> at >>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290) >>>> at >>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) >>>> at >>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) >>>> ... 9 more >>>> >>>> Does that mean i should move from having a Pojo storing the result of >>>> the SQL retracted stream to Avro? trying to understand how to mitigate it. >>>> >>>> Thanks >>>> >>>> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <walter...@gmail.com> wrote: >>>> >>>>> Hi Shahar, >>>>> >>>>> From my understanding, if you use "groupby" withAggregateFunctions, >>>>> they save the accumulators to SQL internal states: which are invariant >>>>> from >>>>> your input schema. Based on what you described I think that's why it is >>>>> fine for recovering from existing state. >>>>> I think one confusion you might have is the "toRetractStream" syntax. >>>>> This actually passes the "retracting" flag to the Flink planner to >>>>> indicate >>>>> how the DataStream operator gets generated based on your SQL. >>>>> >>>>> So in my understanding, there's really no "state" associated with the >>>>> "retracting stream", but rather associated with the generated operators. >>>>> However, I am not expert in Table/SQL state recovery: I recall there >>>>> were an open JIRA[1] that might be related to your question regarding >>>>> SQL/Table generated operator recovery. Maybe @Fabian can provide more >>>>> insight here? >>>>> >>>>> Regarding the rest of the pipeline, both "filter" and "map" operators >>>>> are stateless; and sink state recovery depends on what you do. >>>>> >>>>> -- >>>>> Rong >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-6966 >>>>> >>>>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <shahar.kobrin...@gmail.com> >>>>> wrote: >>>>> >>>>>> Thanks Rong, >>>>>> >>>>>> I have made some quick test changing the SQL select (adding a select >>>>>> field >>>>>> in the middle) and reran the job from a savepoint and it worked >>>>>> without any >>>>>> errors. I want to make sure i understand how at what point the state >>>>>> is >>>>>> stored and how does it work. >>>>>> >>>>>> Let's simplify the scenario and forget my specific case of dynamically >>>>>> generated pojo. let's focus on generic steps of: >>>>>> Source->register table->SQL select and group by session->retracted >>>>>> stream >>>>>> (Row)->transformToPojo (Custom Map function) ->pushToSink >>>>>> >>>>>> And let's assume the SQL select is changed (a field is added >>>>>> somewhere in >>>>>> the middle of the select field). >>>>>> So: >>>>>> We had intermediate results that are in the old format that are >>>>>> loaded from >>>>>> state to the new Row object in the retracted stream. is that an >>>>>> accurate >>>>>> statement? at what operator/format is the state stored in this case? >>>>>> is it >>>>>> the SQL result/Row? is it the Pojo? as this scenario does not fail >>>>>> for me im >>>>>> trying to understand how/where it is handled in Flink? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Sent from: >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>>> >>>>>