Thanks Guys, I actually got an error now adding some fields into the select statement:
java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135) at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible. at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341) at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) ... 9 more Does that mean i should move from having a Pojo storing the result of the SQL retracted stream to Avro? trying to understand how to mitigate it. Thanks On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <walter...@gmail.com> wrote: > Hi Shahar, > > From my understanding, if you use "groupby" withAggregateFunctions, they > save the accumulators to SQL internal states: which are invariant from your > input schema. Based on what you described I think that's why it is fine for > recovering from existing state. > I think one confusion you might have is the "toRetractStream" syntax. This > actually passes the "retracting" flag to the Flink planner to indicate how > the DataStream operator gets generated based on your SQL. > > So in my understanding, there's really no "state" associated with the > "retracting stream", but rather associated with the generated operators. > However, I am not expert in Table/SQL state recovery: I recall there were > an open JIRA[1] that might be related to your question regarding SQL/Table > generated operator recovery. Maybe @Fabian can provide more insight here? > > Regarding the rest of the pipeline, both "filter" and "map" operators are > stateless; and sink state recovery depends on what you do. > > -- > Rong > > [1] https://issues.apache.org/jira/browse/FLINK-6966 > > On Fri, Mar 8, 2019 at 12:07 PM shkob1 <shahar.kobrin...@gmail.com> wrote: > >> Thanks Rong, >> >> I have made some quick test changing the SQL select (adding a select field >> in the middle) and reran the job from a savepoint and it worked without >> any >> errors. I want to make sure i understand how at what point the state is >> stored and how does it work. >> >> Let's simplify the scenario and forget my specific case of dynamically >> generated pojo. let's focus on generic steps of: >> Source->register table->SQL select and group by session->retracted stream >> (Row)->transformToPojo (Custom Map function) ->pushToSink >> >> And let's assume the SQL select is changed (a field is added somewhere in >> the middle of the select field). >> So: >> We had intermediate results that are in the old format that are loaded >> from >> state to the new Row object in the retracted stream. is that an accurate >> statement? at what operator/format is the state stored in this case? is it >> the SQL result/Row? is it the Pojo? as this scenario does not fail for me >> im >> trying to understand how/where it is handled in Flink? >> >> >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >