Yes, with the information I have, the conclusion would be the same, that I 
think the reason is problem with hashcode. Without some data to reproduce it 
unfortunately I won’t be able to help you further. I could just advise you to 
debug the method SharedBuffer#serialize and pay attention to the entryID map.

> On 10 Aug 2017, at 14:54, Daiqing Li <lidaiqing1...@gmail.com> wrote:
> 
> Oh sorry, the data in {} is not empty because I hide private information 
> about my model. Do you have that same conclusion?
>> On Aug 10, 2017, at 8:52 AM, Dawid Wysakowicz <wysakowicz.da...@gmail.com> 
>> wrote:
>> 
>> You are right, I won’t be able to reproduce this problem without data. One 
>> thing I can tell though that I think the problem is indeed with the 
>> hashcode. Unforunately I don’t know Gson, but one strange thing I noticed is 
>> the exception message: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, 
>> 0), [SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, 
>> which seems odd as if your event was empty.
>> 
>> Generally speaking as I understand this Exception is thrown because the 
>> hashcode of your event changes during serialization, and access to some 
>> internal temporary cache is broken.
>> 
>>> On 10 Aug 2017, at 14:29, Daiqing Li <lidaiqing1...@gmail.com> wrote:
>>> 
>>> Hi,
>>> 
>>> Here is the code. But I am not sure if you can reproduce the problem 
>>> without data source.
>>> 
>>> Best,
>>> Daiqing
>>> 
>>> On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz 
>>> <wysakowicz.da...@gmail.com> wrote:
>>> As @Kostas asked in your previous thread would be possible for you to share 
>>> your code for that job or at least a minimal example to reproduce this 
>>> behaviour. I fear we won’t be able to help you without any further info.
>>> 
>>> Regards,
>>> Dawid
>>> 
>>>> On 10 Aug 2017, at 14:10, Daiqing Li <lidaiqing1...@gmail.com> wrote:
>>>> 
>>>> Hi Flink user,
>>>> 
>>>> I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this 
>>>> exception after running for a while. Could anyone give me some help to 
>>>> debug this? I try parallelism 1, and it has the same problem. I also try 
>>>> reimplemented hashcode and equals method. I use UUID as hashcode right now.
>>>> 2017-08-09 18:15:04,572 INFO  
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
>>>> KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) 
>>>> (d4749a4c3469732a2a5edf40b83f88
>>>> d4) switched from RUNNING to FAILED.
>>>> AsynchronousException{java.
>>>> lang.Exception: Could not materialize checkpoint 946 for operator 
>>>> KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).}
>>>>     at org.apache.flink.streaming.
>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>>> StreamTask.java:970)
>>>>     at java.util.concurrent.
>>>> Executors$RunnableAdapter.
>>>> call(Executors.java:511)
>>>>     at java.util.concurrent.
>>>> FutureTask.run(FutureTask.
>>>> java:266)
>>>>     at java.util.concurrent.
>>>> ThreadPoolExecutor.runWorker(
>>>> ThreadPoolExecutor.java:1149)
>>>>     at java.util.concurrent.
>>>> ThreadPoolExecutor$Worker.run(
>>>> ThreadPoolExecutor.java:624)
>>>>     at java.lang.Thread.run(Thread.
>>>> java:748)
>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 946 for 
>>>> operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
>>>>     ... 6 more
>>>> Caused by: java.util.concurrent.
>>>> ExecutionException: java.lang.IllegalStateException: Could not find id for 
>>>> entry: SharedBufferEntry(
>>>> ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1)
>>>>     at java.util.concurrent.
>>>> FutureTask.report(FutureTask.
>>>> java:122)
>>>>     at java.util.concurrent.
>>>> FutureTask.get(FutureTask.
>>>> java:192)
>>>>     at org.apache.flink.util.
>>>> FutureUtil.runIfNotDoneAndGet(
>>>> FutureUtil.java:43)
>>>>     at org.apache.flink.streaming.
>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>>> StreamTask.java:897)
>>>>     ... 5 more
>>>>     Suppressed: java.lang.Exception: Could not properly cancel managed 
>>>> keyed state future.
>>>>             at org.apache.flink.streaming.
>>>> api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:
>>>> 90)
>>>>             at org.apache.flink.streaming.
>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.
>>>> cleanup(StreamTask.java:1023)
>>>>             at org.apache.flink.streaming.
>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>>> StreamTask.java:961)
>>>>             ... 5 more
>>>> 
>>> 
>>> 
>>> <MilestoneEvent.java><example.java>
>> 
> 

Attachment: signature.asc
Description: Message signed with OpenPGP

Reply via email to