Thanks Guys,

I actually got an error now adding some fields into the select statement:

java.lang.RuntimeException: Error while getting state
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
at
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap
backends, the new state serializer must not be incompatible.
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more

Does that mean i should move from having a Pojo storing the result of the
SQL retracted stream to Avro? trying to understand how to mitigate it.

Thanks

On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <walter...@gmail.com> wrote:

> Hi Shahar,
>
> From my understanding, if you use "groupby" withAggregateFunctions, they
> save the accumulators to SQL internal states: which are invariant from your
> input schema. Based on what you described I think that's why it is fine for
> recovering from existing state.
> I think one confusion you might have is the "toRetractStream" syntax. This
> actually passes the "retracting" flag to the Flink planner to indicate how
> the DataStream operator gets generated based on your SQL.
>
> So in my understanding, there's really no "state" associated with the
> "retracting stream", but rather associated with the generated operators.
> However, I am not expert in Table/SQL state recovery: I recall there were
> an open JIRA[1] that might be related to your question regarding SQL/Table
> generated operator recovery. Maybe @Fabian can provide more insight here?
>
> Regarding the rest of the pipeline, both "filter" and "map" operators are
> stateless; and sink state recovery depends on what you do.
>
> --
> Rong
>
> [1] https://issues.apache.org/jira/browse/FLINK-6966
>
> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <shahar.kobrin...@gmail.com> wrote:
>
>> Thanks Rong,
>>
>> I have made some quick test changing the SQL select (adding a select field
>> in the middle) and reran the job from a savepoint and it worked without
>> any
>> errors. I want to make sure i understand how at what point the state is
>> stored and how does it work.
>>
>> Let's simplify the scenario and forget my specific case of dynamically
>> generated pojo. let's focus on generic steps of:
>> Source->register table->SQL select and group by session->retracted stream
>> (Row)->transformToPojo (Custom Map function) ->pushToSink
>>
>> And let's assume the SQL select is changed (a field is added somewhere in
>> the middle of the select field).
>> So:
>> We had intermediate results that are in the old format that are loaded
>> from
>> state to the new Row object in the retracted stream. is that an accurate
>> statement? at what operator/format is the state stored in this case? is it
>> the SQL result/Row? is it the Pojo? as this scenario does not fail for me
>> im
>> trying to understand how/where it is handled in Flink?
>>
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Reply via email to