Antti Kaikkonen created FLINK-19692:
---------------------------------------
Summary: Can't restore feedback channel from savepoint
Key: FLINK-19692
URL: https://issues.apache.org/jira/browse/FLINK-19692
Project: Flink
Issue Type: Bug
Components: API / DataStream, API / State Processor, Stateful
Functions
Affects Versions: 1.11.2
Reporter: Antti Kaikkonen
When using the new statefun-flink-datastream integration the following error is
thrown by the *feedback -> union* task when trying to restore from a savepoint:
{code:java}
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: java.io.IOException: position out of bounds
at
org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
... 9 more
Caused by: java.io.IOException: position out of bounds
at
org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
... 10 more
{code}
The error is only thrown when the feedback channel has been used.
I have tested with the [example
application|https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java]
and the error is thrown only if it is modified to actually use the feedback
channel. I simply modified the invoke method to sometimes forward the greeting
to a random name:
{code:java}
@Override
public void invoke(Context context, Object input) {
int seen = seenCount.updateAndGet(MyFunction::increment);
context.send(GREETINGS, String.format("Hello %s at the %d-th time", input,
seen));
String[] names = {"Stephan", "Igal", "Gordon", "Seth", "Marta"};
ThreadLocalRandom random = ThreadLocalRandom.current();
int index = random.nextInt(names.length);
final String name2 = names[index];
if (random.nextDouble() < 0.5) context.send(new Address(GREET, name2), input);
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)