Thank you Guowei and Dawid! I am trying your suggestions today and will report back.
- I assume the cleaning operation should be done only once because of the upgrade, or should I run every time the application is up? - `static` sounds a very simple fix to get rid of this. Any drawbacks here? --- Oytun Tez *M O T A W O R D* The World's Fastest Human Translation Platform. oy...@motaword.com — www.motaword.com On Tue, Apr 23, 2019 at 2:56 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Oytun, > > I think there is a regression introduced in 1.8 how we handle output tags. > The problem is we do not call ClosureCleaner on OutputTag. > > There are two options how you can workaround this issue: > > 1. Declare the OutputTag static > > 2. Clean the closure explicitly as Guowei suggested: > StreamExecutionEnvironment.clean(pendingProjectsTag) > > I also opened a jira issue to fix this (FLINK-12297[1]) > > Best, > > Dawid > > [1] https://issues.apache.org/jira/browse/FLINK-12297 > On 22/04/2019 03:06, Guowei Ma wrote: > > I think you could try > StreamExecutionEnvironment.clean(pendingProjectsTag). > > > Oytun Tez <oy...@motaword.com>于2019年4月19日 周五下午9:58写道: > >> Forgot to answer one of your points: the parent class compiles well >> without this CEP selector (with timeout signature)... >> >> >> --- >> Oytun Tez >> >> *M O T A W O R D* >> The World's Fastest Human Translation Platform. >> oy...@motaword.com — www.motaword.com >> >> >> On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez <oy...@motaword.com> wrote: >> >>> Hey JingsongLee! >>> >>> Here are some findings... >>> >>> - flatSelect *without timeout* works normally: >>> patternStream.flatSelect(PatternFlatSelectFunction), this compiles >>> well. >>> - Converted the both timeout and select selectors to an *inner class* >>> (not static), yielded the same results, doesn't compile. >>> - flatSelect *without* timeout, but with an inner class for >>> PatternFlatSelectFunction, it compiles (same as first bullet). >>> - Tried both of these selectors with *empty* body. Just a skeleton >>> class. Doesn't compile either. Empty body example is in my first email. >>> - Tried making both selectors *static public inner* classes, doesn't >>> compile either. >>> - Extracted both timeout and flat selectors to their own *independent >>> classes* in separate files. Doesn't compile. >>> - I am putting the *error stack* below. >>> - Without the timeout selector in any class or lambda shape, with >>> empty or full body, flatSelect compiles well. >>> >>> Would these findings help? Any ideas? >>> >>> Here is an error stack: >>> >>> 09:36:51,925 ERROR >>> com.motaword.ipm.kernel.error.controller.ExceptionHandler - >>> org.apache.flink.api.common.InvalidProgramException: The implementation >>> of the PatternFlatSelectAdapter is not serializable. The object probably >>> contains or references non serializable fields. >>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558) >>> at >>> org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86) >>> at org.apache.flink.cep.PatternStream.process(PatternStream.java:114) >>> at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451) >>> at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408) >>> at >>> com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89) >>> at >>> com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45) >>> at >>> com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31) >>> at com.motaword.ipm.kernel.Application.main(Application.java:63) >>> Caused by: java.io.NotSerializableException: >>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator >>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>> at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>> at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>> at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>> at >>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576) >>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) >>> ... 9 more >>> >>> >>> >>> >>> >>> >>> --- >>> Oytun Tez >>> >>> *M O T A W O R D* >>> The World's Fastest Human Translation Platform. >>> oy...@motaword.com — www.motaword.com >>> >>> >>> On Fri, Apr 19, 2019 at 3:14 AM JingsongLee <lzljs3620...@aliyun.com> >>> wrote: >>> >>>> Hi @Oytun Tez >>>> It Looks like your *PatternFlatSelectFunction* is not serializable. >>>> Because you use anonymous inner class, >>>> Check the class to which getPending belongs, maybe that class is not >>>> serializable? >>>> >>>> Or you may be advised not to use internal classes, but to use a static >>>> internal class. >>>> >>>> Best, JingsongLee >>>> >>>> ------------------------------------------------------------------ >>>> From:Oytun Tez <oy...@motaword.com> >>>> Send Time:2019年4月19日(星期五) 03:38 >>>> To:user <user@flink.apache.org> >>>> Subject:PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade >>>> >>>> Hi all, >>>> >>>> We are just migration from 1.6 to 1.8. I encountered a serialization >>>> error which we didn't have before if memory serves: The implementation >>>> of the *PatternFlatSelectAdapter* is not serializable. The object >>>> probably contains or references non serializable fields. >>>> >>>> The method below simply intakes a PatternStream from CEP.pattern() and >>>> makes use of the sideoutput for timed-out events. Can you see anything >>>> weird here (WorkerEvent is the input, but collectors collect Project >>>> object)? >>>> >>>> protected DataStream<Project> getPending(PatternStream<WorkerEvent> >>>> patternStream) { >>>> OutputTag<Project> pendingProjectsTag = new *OutputTag* >>>> <Project>("invitation-pending-projects"){}; >>>> >>>> return patternStream.*flatSelect*( >>>> pendingProjectsTag, >>>> new *PatternFlatTimeoutFunction*<WorkerEvent, >>>> Project>() { >>>> @Override >>>> public void *timeout*(Map<String, >>>> List<WorkerEvent>> map, long l, Collector<Project> collector) { >>>> } >>>> }, >>>> new *PatternFlatSelectFunction*<WorkerEvent, >>>> Project>() { >>>> @Override >>>> public void *flatSelect*(Map<String, >>>> List<WorkerEvent>> pattern, Collector<Project> collector) { >>>> } >>>> } >>>> ).name("Select pending projects for invitation"). >>>> *getSideOutput*(pendingProjectsTag); >>>> } >>>> >>>> --- >>>> Oytun Tez >>>> >>>> *M O T A W O R D* >>>> The World's Fastest Human Translation Platform. >>>> oy...@motaword.com — www.motaword.com >>>> >>>> -- > Best, > Guowei > >