Hi, I am running fling 1.3.1 on EMR. But I am getting this exception after running for a while.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Could not copy NFA. at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908) at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296) at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) ... 7 more Caused by: java.io.StreamCorruptedException: invalid type code: 00 at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620) at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719) at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903) ... 17 more