I think you could try StreamExecutionEnvironment.clean(pendingProjectsTag).
Oytun Tez <oy...@motaword.com>于2019年4月19日 周五下午9:58写道: > Forgot to answer one of your points: the parent class compiles well > without this CEP selector (with timeout signature)... > > > --- > Oytun Tez > > *M O T A W O R D* > The World's Fastest Human Translation Platform. > oy...@motaword.com — www.motaword.com > > > On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez <oy...@motaword.com> wrote: > >> Hey JingsongLee! >> >> Here are some findings... >> >> - flatSelect *without timeout* works normally: >> patternStream.flatSelect(PatternFlatSelectFunction), this compiles >> well. >> - Converted the both timeout and select selectors to an *inner class* >> (not static), yielded the same results, doesn't compile. >> - flatSelect *without* timeout, but with an inner class for >> PatternFlatSelectFunction, it compiles (same as first bullet). >> - Tried both of these selectors with *empty* body. Just a skeleton >> class. Doesn't compile either. Empty body example is in my first email. >> - Tried making both selectors *static public inner* classes, doesn't >> compile either. >> - Extracted both timeout and flat selectors to their own *independent >> classes* in separate files. Doesn't compile. >> - I am putting the *error stack* below. >> - Without the timeout selector in any class or lambda shape, with >> empty or full body, flatSelect compiles well. >> >> Would these findings help? Any ideas? >> >> Here is an error stack: >> >> 09:36:51,925 ERROR >> com.motaword.ipm.kernel.error.controller.ExceptionHandler - >> org.apache.flink.api.common.InvalidProgramException: The implementation >> of the PatternFlatSelectAdapter is not serializable. The object probably >> contains or references non serializable fields. >> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558) >> at >> org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86) >> at org.apache.flink.cep.PatternStream.process(PatternStream.java:114) >> at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451) >> at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408) >> at >> com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89) >> at >> com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45) >> at >> com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31) >> at com.motaword.ipm.kernel.Application.main(Application.java:63) >> Caused by: java.io.NotSerializableException: >> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> at >> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576) >> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) >> ... 9 more >> >> >> >> >> >> >> --- >> Oytun Tez >> >> *M O T A W O R D* >> The World's Fastest Human Translation Platform. >> oy...@motaword.com — www.motaword.com >> >> >> On Fri, Apr 19, 2019 at 3:14 AM JingsongLee <lzljs3620...@aliyun.com> >> wrote: >> >>> Hi @Oytun Tez >>> It Looks like your *PatternFlatSelectFunction* is not serializable. >>> Because you use anonymous inner class, >>> Check the class to which getPending belongs, maybe that class is not >>> serializable? >>> >>> Or you may be advised not to use internal classes, but to use a static >>> internal class. >>> >>> Best, JingsongLee >>> >>> ------------------------------------------------------------------ >>> From:Oytun Tez <oy...@motaword.com> >>> Send Time:2019年4月19日(星期五) 03:38 >>> To:user <user@flink.apache.org> >>> Subject:PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade >>> >>> Hi all, >>> >>> We are just migration from 1.6 to 1.8. I encountered a serialization >>> error which we didn't have before if memory serves: The implementation >>> of the *PatternFlatSelectAdapter* is not serializable. The object >>> probably contains or references non serializable fields. >>> >>> The method below simply intakes a PatternStream from CEP.pattern() and >>> makes use of the sideoutput for timed-out events. Can you see anything >>> weird here (WorkerEvent is the input, but collectors collect Project >>> object)? >>> >>> protected DataStream<Project> getPending(PatternStream<WorkerEvent> >>> patternStream) { >>> OutputTag<Project> pendingProjectsTag = new *OutputTag* >>> <Project>("invitation-pending-projects"){}; >>> >>> return patternStream.*flatSelect*( >>> pendingProjectsTag, >>> new *PatternFlatTimeoutFunction*<WorkerEvent, >>> Project>() { >>> @Override >>> public void *timeout*(Map<String, >>> List<WorkerEvent>> map, long l, Collector<Project> collector) { >>> } >>> }, >>> new *PatternFlatSelectFunction*<WorkerEvent, >>> Project>() { >>> @Override >>> public void *flatSelect*(Map<String, >>> List<WorkerEvent>> pattern, Collector<Project> collector) { >>> } >>> } >>> ).name("Select pending projects for invitation"). >>> *getSideOutput*(pendingProjectsTag); >>> } >>> >>> --- >>> Oytun Tez >>> >>> *M O T A W O R D* >>> The World's Fastest Human Translation Platform. >>> oy...@motaword.com — www.motaword.com >>> >>> -- Best, Guowei