Yes, with the information I have, the conclusion would be the same, that I think the reason is problem with hashcode. Without some data to reproduce it unfortunately I won’t be able to help you further. I could just advise you to debug the method SharedBuffer#serialize and pay attention to the entryID map.
> On 10 Aug 2017, at 14:54, Daiqing Li <lidaiqing1...@gmail.com> wrote: > > Oh sorry, the data in {} is not empty because I hide private information > about my model. Do you have that same conclusion? >> On Aug 10, 2017, at 8:52 AM, Dawid Wysakowicz <wysakowicz.da...@gmail.com> >> wrote: >> >> You are right, I won’t be able to reproduce this problem without data. One >> thing I can tell though that I think the problem is indeed with the >> hashcode. Unforunately I don’t know Gson, but one strange thing I noticed is >> the exception message: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, >> 0), [SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, >> which seems odd as if your event was empty. >> >> Generally speaking as I understand this Exception is thrown because the >> hashcode of your event changes during serialization, and access to some >> internal temporary cache is broken. >> >>> On 10 Aug 2017, at 14:29, Daiqing Li <lidaiqing1...@gmail.com> wrote: >>> >>> Hi, >>> >>> Here is the code. But I am not sure if you can reproduce the problem >>> without data source. >>> >>> Best, >>> Daiqing >>> >>> On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz >>> <wysakowicz.da...@gmail.com> wrote: >>> As @Kostas asked in your previous thread would be possible for you to share >>> your code for that job or at least a minimal example to reproduce this >>> behaviour. I fear we won’t be able to help you without any further info. >>> >>> Regards, >>> Dawid >>> >>>> On 10 Aug 2017, at 14:10, Daiqing Li <lidaiqing1...@gmail.com> wrote: >>>> >>>> Hi Flink user, >>>> >>>> I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this >>>> exception after running for a while. Could anyone give me some help to >>>> debug this? I try parallelism 1, and it has the same problem. I also try >>>> reimplemented hashcode and equals method. I use UUID as hashcode right now. >>>> 2017-08-09 18:15:04,572 INFO >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - >>>> KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) >>>> (d4749a4c3469732a2a5edf40b83f88 >>>> d4) switched from RUNNING to FAILED. >>>> AsynchronousException{java. >>>> lang.Exception: Could not materialize checkpoint 946 for operator >>>> KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).} >>>> at org.apache.flink.streaming. >>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( >>>> StreamTask.java:970) >>>> at java.util.concurrent. >>>> Executors$RunnableAdapter. >>>> call(Executors.java:511) >>>> at java.util.concurrent. >>>> FutureTask.run(FutureTask. >>>> java:266) >>>> at java.util.concurrent. >>>> ThreadPoolExecutor.runWorker( >>>> ThreadPoolExecutor.java:1149) >>>> at java.util.concurrent. >>>> ThreadPoolExecutor$Worker.run( >>>> ThreadPoolExecutor.java:624) >>>> at java.lang.Thread.run(Thread. >>>> java:748) >>>> Caused by: java.lang.Exception: Could not materialize checkpoint 946 for >>>> operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4). >>>> ... 6 more >>>> Caused by: java.util.concurrent. >>>> ExecutionException: java.lang.IllegalStateException: Could not find id for >>>> entry: SharedBufferEntry( >>>> ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1) >>>> at java.util.concurrent. >>>> FutureTask.report(FutureTask. >>>> java:122) >>>> at java.util.concurrent. >>>> FutureTask.get(FutureTask. >>>> java:192) >>>> at org.apache.flink.util. >>>> FutureUtil.runIfNotDoneAndGet( >>>> FutureUtil.java:43) >>>> at org.apache.flink.streaming. >>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( >>>> StreamTask.java:897) >>>> ... 5 more >>>> Suppressed: java.lang.Exception: Could not properly cancel managed >>>> keyed state future. >>>> at org.apache.flink.streaming. >>>> api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java: >>>> 90) >>>> at org.apache.flink.streaming. >>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable. >>>> cleanup(StreamTask.java:1023) >>>> at org.apache.flink.streaming. >>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( >>>> StreamTask.java:961) >>>> ... 5 more >>>> >>> >>> >>> <MilestoneEvent.java><example.java> >> >
signature.asc
Description: Message signed with OpenPGP