Hi, Here is the code. But I am not sure if you can reproduce the problem without data source.
Best, Daiqing On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz < wysakowicz.da...@gmail.com> wrote: > As @Kostas asked in your previous thread would be possible for you to > share your code for that job or at least a minimal example to reproduce > this behaviour. I fear we won’t be able to help you without any further > info. > > Regards, > Dawid > > > On 10 Aug 2017, at 14:10, Daiqing Li <lidaiqing1...@gmail.com> wrote: > > > > Hi Flink user, > > > > I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this > exception after running for a while. Could anyone give me some help to > debug this? I try parallelism 1, and it has the same problem. I also try > reimplemented hashcode and equals method. I use UUID as hashcode right now. > > 2017-08-09 18:15:04,572 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph > - KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) ( > d4749a4c3469732a2a5edf40b83f88 > > d4) switched from RUNNING to FAILED. > > AsynchronousException{java. > > lang.Exception: Could not materialize checkpoint 946 for operator > KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).} > > at org.apache.flink.streaming. > > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > > StreamTask.java:970) > > at java.util.concurrent. > > Executors$RunnableAdapter. > > call(Executors.java:511) > > at java.util.concurrent. > > FutureTask.run(FutureTask. > > java:266) > > at java.util.concurrent. > > ThreadPoolExecutor.runWorker( > > ThreadPoolExecutor.java:1149) > > at java.util.concurrent. > > ThreadPoolExecutor$Worker.run( > > ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread. > > java:748) > > Caused by: java.lang.Exception: Could not materialize checkpoint 946 for > operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4). > > ... 6 more > > Caused by: java.util.concurrent. > > ExecutionException: java.lang.IllegalStateException: Could not find id > for entry: SharedBufferEntry( > > ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1) > > at java.util.concurrent. > > FutureTask.report(FutureTask. > > java:122) > > at java.util.concurrent. > > FutureTask.get(FutureTask. > > java:192) > > at org.apache.flink.util. > > FutureUtil.runIfNotDoneAndGet( > > FutureUtil.java:43) > > at org.apache.flink.streaming. > > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > > StreamTask.java:897) > > ... 5 more > > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > > at org.apache.flink.streaming. > > api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java: > > 90) > > at org.apache.flink.streaming. > > runtime.tasks.StreamTask$AsyncCheckpointRunnable. > > cleanup(StreamTask.java:1023) > > at org.apache.flink.streaming. > > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > > StreamTask.java:961) > > ... 5 more > > > >
MilestoneEvent.java
Description: Binary data
example.java
Description: Binary data