Or is it the SQL state that is incompatible.. ?

On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <
shahar.kobrin...@gmail.com> wrote:

> Thanks Guys,
>
> I actually got an error now adding some fields into the select statement:
>
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
> at
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: For heap
> backends, the new state serializer must not be incompatible.
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
> at
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 9 more
>
> Does that mean i should move from having a Pojo storing the result of the
> SQL retracted stream to Avro? trying to understand how to mitigate it.
>
> Thanks
>
> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <walter...@gmail.com> wrote:
>
>> Hi Shahar,
>>
>> From my understanding, if you use "groupby" withAggregateFunctions, they
>> save the accumulators to SQL internal states: which are invariant from your
>> input schema. Based on what you described I think that's why it is fine for
>> recovering from existing state.
>> I think one confusion you might have is the "toRetractStream" syntax.
>> This actually passes the "retracting" flag to the Flink planner to indicate
>> how the DataStream operator gets generated based on your SQL.
>>
>> So in my understanding, there's really no "state" associated with the
>> "retracting stream", but rather associated with the generated operators.
>> However, I am not expert in Table/SQL state recovery: I recall there were
>> an open JIRA[1] that might be related to your question regarding SQL/Table
>> generated operator recovery. Maybe @Fabian can provide more insight here?
>>
>> Regarding the rest of the pipeline, both "filter" and "map" operators are
>> stateless; and sink state recovery depends on what you do.
>>
>> --
>> Rong
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-6966
>>
>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <shahar.kobrin...@gmail.com>
>> wrote:
>>
>>> Thanks Rong,
>>>
>>> I have made some quick test changing the SQL select (adding a select
>>> field
>>> in the middle) and reran the job from a savepoint and it worked without
>>> any
>>> errors. I want to make sure i understand how at what point the state is
>>> stored and how does it work.
>>>
>>> Let's simplify the scenario and forget my specific case of dynamically
>>> generated pojo. let's focus on generic steps of:
>>> Source->register table->SQL select and group by session->retracted stream
>>> (Row)->transformToPojo (Custom Map function) ->pushToSink
>>>
>>> And let's assume the SQL select is changed (a field is added somewhere in
>>> the middle of the select field).
>>> So:
>>> We had intermediate results that are in the old format that are loaded
>>> from
>>> state to the new Row object in the retracted stream. is that an accurate
>>> statement? at what operator/format is the state stored in this case? is
>>> it
>>> the SQL result/Row? is it the Pojo? as this scenario does not fail for
>>> me im
>>> trying to understand how/where it is handled in Flink?
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>

Reply via email to