[ https://issues.apache.org/jira/browse/FLINK-19692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tzu-Li (Gordon) Tai updated FLINK-19692: ---------------------------------------- Fix Version/s: (was: statefun-2.3.0) > Can't restore feedback channel from savepoint > --------------------------------------------- > > Key: FLINK-19692 > URL: https://issues.apache.org/jira/browse/FLINK-19692 > Project: Flink > Issue Type: Bug > Components: Stateful Functions > Affects Versions: statefun-2.0.0, statefun-2.1.0, statefun-2.2.0 > Reporter: Antti Kaikkonen > Assignee: Igal Shilman > Priority: Blocker > Labels: pull-request-available > Fix For: statefun-2.2.1 > > > When using the new statefun-flink-datastream integration the following error > is thrown by the *feedback -> union* task when trying to restore from a > savepoint: > {code:java} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: java.io.IOException: position out of bounds > at > org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167) > ... 9 more > Caused by: java.io.IOException: position out of bounds > at > org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228) > ... 10 more > {code} > The error is only thrown when the feedback channel has been used. > I have tested with the [example > application|https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java] > and the error is thrown only if it is modified to actually use the feedback > channel. I simply modified the invoke method to sometimes forward the > greeting to a random name: > {code:java} > @Override > public void invoke(Context context, Object input) { > int seen = seenCount.updateAndGet(MyFunction::increment); > context.send(GREETINGS, String.format("Hello %s at the %d-th time", input, > seen)); > String[] names = {"Stephan", "Igal", "Gordon", "Seth", "Marta"}; > ThreadLocalRandom random = ThreadLocalRandom.current(); > int index = random.nextInt(names.length); > final String name2 = names[index]; > if (random.nextDouble() < 0.5) context.send(new Address(GREET, name2), > input); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)