Or is it the SQL state that is incompatible.. ? On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky < shahar.kobrin...@gmail.com> wrote:
> Thanks Guys, > > I actually got an error now adding some fields into the select statement: > > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135) > at > org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.StateMigrationException: For heap > backends, the new state serializer must not be incompatible. > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341) > at > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) > ... 9 more > > Does that mean i should move from having a Pojo storing the result of the > SQL retracted stream to Avro? trying to understand how to mitigate it. > > Thanks > > On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <walter...@gmail.com> wrote: > >> Hi Shahar, >> >> From my understanding, if you use "groupby" withAggregateFunctions, they >> save the accumulators to SQL internal states: which are invariant from your >> input schema. Based on what you described I think that's why it is fine for >> recovering from existing state. >> I think one confusion you might have is the "toRetractStream" syntax. >> This actually passes the "retracting" flag to the Flink planner to indicate >> how the DataStream operator gets generated based on your SQL. >> >> So in my understanding, there's really no "state" associated with the >> "retracting stream", but rather associated with the generated operators. >> However, I am not expert in Table/SQL state recovery: I recall there were >> an open JIRA[1] that might be related to your question regarding SQL/Table >> generated operator recovery. Maybe @Fabian can provide more insight here? >> >> Regarding the rest of the pipeline, both "filter" and "map" operators are >> stateless; and sink state recovery depends on what you do. >> >> -- >> Rong >> >> [1] https://issues.apache.org/jira/browse/FLINK-6966 >> >> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <shahar.kobrin...@gmail.com> >> wrote: >> >>> Thanks Rong, >>> >>> I have made some quick test changing the SQL select (adding a select >>> field >>> in the middle) and reran the job from a savepoint and it worked without >>> any >>> errors. I want to make sure i understand how at what point the state is >>> stored and how does it work. >>> >>> Let's simplify the scenario and forget my specific case of dynamically >>> generated pojo. let's focus on generic steps of: >>> Source->register table->SQL select and group by session->retracted stream >>> (Row)->transformToPojo (Custom Map function) ->pushToSink >>> >>> And let's assume the SQL select is changed (a field is added somewhere in >>> the middle of the select field). >>> So: >>> We had intermediate results that are in the old format that are loaded >>> from >>> state to the new Row object in the retracted stream. is that an accurate >>> statement? at what operator/format is the state stored in this case? is >>> it >>> the SQL result/Row? is it the Pojo? as this scenario does not fail for >>> me im >>> trying to understand how/where it is handled in Flink? >>> >>> >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> >>