That makes sense Fabian! So I want to make sure I fully understand how this should look. Would the same expression look like:
custom_groupby(my_group_fields, map[ 'a', sum(a)...]) ? Will I be able to use the builtin aggregation function internally such as sum/avg etc? or would I need to reimplement all such functions? In terms of schema evolution, if these are implemented as a map state, will I be OK as new items are added to that map? Thanks again, and congrats on an awesome conference, I had learned a lot Shahar From: Fabian Hueske Sent: Monday, April 8, 02:54 Subject: Re: Schema Evolution on Dynamic Schema To: Shahar Cizer Kobrinsky Cc: Rong Rong, user Hi Shahar, Sorry for the late response. The problem is not with the type of the retract stream, but with the GROUP BY aggregation operator. The optimizer is converting the plan into an aggregation operator that computes all aggregates followed by a projection that inserts the aggregation results into a MAP type. The problem is the state of the aggregation operator. By adding a new field to the map, the state of the operator changes and you cannot restore it. The only workaround that I can think of would be to implement a user-defined aggregation function [1] that performs all aggregations internally and manually maintain state compatibility for the accumulator of the UDAGG. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#aggregation-functions Am Do., 28. März 2019 um 22:28 Uhr schrieb Shahar Cizer Kobrinsky <shahar.kobrin...@gmail.com<mailto:shahar.kobrin...@gmail.com>>: Hmm kinda stuck here. Seems like SQL Group by is translated to a GroupAggProcessFunction which stores a state for every aggregation element (thus flattening the map items for state store). Seems like there's no way around it. Am i wrong? is there any way to evolve the map elements when doing SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by .. ? On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>> wrote: Hi, I think this would work. However, you should be aware that all keys are kept forever in state (unless you configure idle state retention time [1]). This includes previous versions of keys. Also note that we are not guaranteeing savepoint compatibility across Flink versions yet. If the state of the aggregation operator changes in a later version (say Flink 1.9.x), it might not be possible to migrate to a later Flink version. Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/query_configuration.html#idle-state-retention-time Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky <shahar.kobrin...@gmail.com<mailto:shahar.kobrin...@gmail.com>>: My bad. it actually did work with Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map group by a do you think thats OK as a workaround? main schema should be changed that way - only keys in the map On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <shahar.kobrin...@gmail.com<mailto:shahar.kobrin...@gmail.com>> wrote: Thanks Fabian, Im thinking about how to work around that issue and one thing that came to my mind is to create a map that holds keys & values that can be edited without changing the schema, though im thinking how to implement it in Calcite. Considering the following original SQL in which "metrics" can be added/deleted/renamed Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c Group by a im looking both at json_objectagg & map to change it but it seems that json_objectagg is on a later calcite version and map doesnt work for me. Trying something like Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map group by a results with "Non-query expression encountered in illegal context" is my train of thought the right one? if so, do i have a mistake in the way im trying to implement it? Thanks! On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>> wrote: Hi, Restarting a changed query from a savepoint is currently not supported. In general this is a very difficult problem as new queries might result in completely different execution plans. The special case of adding and removing aggregates is easier to solve, but the schema of the stored state changes and we would need to analyze the previous and current query and generate compatible serializers. So far we did not explore this rabbit hole. Also, starting a different query from a savepoint can also lead to weird result semantics. I'd recommend to bootstrap the state of the new query from scatch. Best, Fabian Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <shahar.kobrin...@gmail.com<mailto:shahar.kobrin...@gmail.com>>: Or is it the SQL state that is incompatible.. ? On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <shahar.kobrin...@gmail.com<mailto:shahar.kobrin...@gmail.com>> wrote: Thanks Guys, I actually got an error now adding some fields into the select statement: java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135) at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible. at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341) at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) ... 9 more Does that mean i should move from having a Pojo storing the result of the SQL retracted stream to Avro? trying to understand how to mitigate it. Thanks On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <walter...@gmail.com<mailto:walter...@gmail.com>> wrote: Hi Shahar, >From my understanding, if you use "groupby" withAggregateFunctions, they save >the accumulators to SQL internal states: which are invariant from your input >schema. Based on what you described I think that's why it is fine for >recovering from existing state. I think one confusion you might have is the "toRetractStream" syntax. This actually passes the "retracting" flag to the Flink planner to indicate how the DataStream operator gets generated based on your SQL. So in my understanding, there's really no "state" associated with the "retracting stream", but rather associated with the generated operators. However, I am not expert in Table/SQL state recovery: I recall there were an open JIRA[1] that might be related to your question regarding SQL/Table generated operator recovery. Maybe @Fabian can provide more insight here? Regarding the rest of the pipeline, both "filter" and "map" operators are stateless; and sink state recovery depends on what you do. -- Rong [1] https://issues.apache.org/jira/browse/FLINK-6966 On Fri, Mar 8, 2019 at 12:07 PM shkob1 <shahar.kobrin...@gmail.com<mailto:shahar.kobrin...@gmail.com>> wrote: Thanks Rong, I have made some quick test changing the SQL select (adding a select field in the middle) and reran the job from a savepoint and it worked without any errors. I want to make sure i understand how at what point the state is stored and how does it work. Let's simplify the scenario and forget my specific case of dynamically generated pojo. let's focus on generic steps of: Source->register table->SQL select and group by session->retracted stream (Row)->transformToPojo (Custom Map function) ->pushToSink And let's assume the SQL select is changed (a field is added somewhere in the middle of the select field). So: We had intermediate results that are in the old format that are loaded from state to the new Row object in the retracted stream. is that an accurate statement? at what operator/format is the state stored in this case? is it the SQL result/Row? is it the Pojo? as this scenario does not fail for me im trying to understand how/where it is handled in Flink? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/