Hey JingsongLee!

Here are some findings...

   - flatSelect *without timeout* works normally:
   patternStream.flatSelect(PatternFlatSelectFunction), this compiles well.
   - Converted the both timeout and select selectors to an *inner class*
   (not static), yielded the same results, doesn't compile.
   - flatSelect *without* timeout, but with an inner class for
   PatternFlatSelectFunction, it compiles (same as first bullet).
   - Tried both of these selectors with *empty* body. Just a skeleton
   class. Doesn't compile either. Empty body example is in my first email.
   - Tried making both selectors *static public inner* classes, doesn't
   compile either.
   - Extracted both timeout and flat selectors to their own *independent
   classes* in separate files. Doesn't compile.
   - I am putting the *error stack* below.
   - Without the timeout selector in any class or lambda shape, with empty
   or full body, flatSelect compiles well.

Would these findings help? Any ideas?

Here is an error stack:

09:36:51,925 ERROR
com.motaword.ipm.kernel.error.controller.ExceptionHandler     -
org.apache.flink.api.common.InvalidProgramException: The implementation of
the PatternFlatSelectAdapter is not serializable. The object probably
contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
at
org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86)
at org.apache.flink.cep.PatternStream.process(PatternStream.java:114)
at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451)
at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408)
at
com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89)
at
com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45)
at
com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31)
at com.motaword.ipm.kernel.Application.main(Application.java:63)
Caused by: java.io.NotSerializableException:
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 9 more






---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Fri, Apr 19, 2019 at 3:14 AM JingsongLee <lzljs3620...@aliyun.com> wrote:

> Hi @Oytun Tez
> It Looks like your *PatternFlatSelectFunction* is not serializable.
> Because you use anonymous inner class,
> Check the class to which getPending belongs, maybe that class is not
> serializable?
>
> Or you may be advised not to use internal classes, but to use a static 
> internal class.
>
> Best, JingsongLee
>
> ------------------------------------------------------------------
> From:Oytun Tez <oy...@motaword.com>
> Send Time:2019年4月19日(星期五) 03:38
> To:user <user@flink.apache.org>
> Subject:PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade
>
> Hi all,
>
> We are just migration from 1.6 to 1.8. I encountered a serialization error
> which we didn't have before if memory serves: The implementation of the
> *PatternFlatSelectAdapter* is not serializable. The object probably
> contains or references non serializable fields.
>
> The method below simply intakes a PatternStream from CEP.pattern() and
> makes use of the sideoutput for timed-out events. Can you see anything
> weird here (WorkerEvent is the input, but collectors collect Project
> object)?
>
> protected DataStream<Project> getPending(PatternStream<WorkerEvent>
> patternStream) {
>             OutputTag<Project> pendingProjectsTag = new *OutputTag*
> <Project>("invitation-pending-projects"){};
>
>             return patternStream.*flatSelect*(
>                     pendingProjectsTag,
>                     new *PatternFlatTimeoutFunction*<WorkerEvent,
> Project>() {
>                         @Override
>                         public void *timeout*(Map<String,
> List<WorkerEvent>> map, long l, Collector<Project> collector) {
>                         }
>                     },
>                     new *PatternFlatSelectFunction*<WorkerEvent,
> Project>() {
>                         @Override
>                         public void *flatSelect*(Map<String,
> List<WorkerEvent>> pattern, Collector<Project> collector) {
>                         }
>                     }
>             ).name("Select pending projects for invitation").
> *getSideOutput*(pendingProjectsTag);
>         }
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>

Reply via email to