Hi all, Making the tag a static element worked out, thank you!
--- Oytun Tez *M O T A W O R D* The World's Fastest Human Translation Platform. oy...@motaword.com — www.motaword.com On Tue, Apr 23, 2019 at 10:37 AM Oytun Tez <oy...@motaword.com> wrote: > Thank you Guowei and Dawid! I am trying your suggestions today and will > report back. > > - I assume the cleaning operation should be done only once because of the > upgrade, or should I run every time the application is up? > - `static` sounds a very simple fix to get rid of this. Any drawbacks here? > > > > > --- > Oytun Tez > > *M O T A W O R D* > The World's Fastest Human Translation Platform. > oy...@motaword.com — www.motaword.com > > > On Tue, Apr 23, 2019 at 2:56 AM Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > >> Hi Oytun, >> >> I think there is a regression introduced in 1.8 how we handle output >> tags. The problem is we do not call ClosureCleaner on OutputTag. >> >> There are two options how you can workaround this issue: >> >> 1. Declare the OutputTag static >> >> 2. Clean the closure explicitly as Guowei suggested: >> StreamExecutionEnvironment.clean(pendingProjectsTag) >> >> I also opened a jira issue to fix this (FLINK-12297[1]) >> >> Best, >> >> Dawid >> >> [1] https://issues.apache.org/jira/browse/FLINK-12297 >> On 22/04/2019 03:06, Guowei Ma wrote: >> >> I think you could try >> StreamExecutionEnvironment.clean(pendingProjectsTag). >> >> >> Oytun Tez <oy...@motaword.com>于2019年4月19日 周五下午9:58写道: >> >>> Forgot to answer one of your points: the parent class compiles well >>> without this CEP selector (with timeout signature)... >>> >>> >>> --- >>> Oytun Tez >>> >>> *M O T A W O R D* >>> The World's Fastest Human Translation Platform. >>> oy...@motaword.com — www.motaword.com >>> >>> >>> On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez <oy...@motaword.com> wrote: >>> >>>> Hey JingsongLee! >>>> >>>> Here are some findings... >>>> >>>> - flatSelect *without timeout* works normally: >>>> patternStream.flatSelect(PatternFlatSelectFunction), this compiles >>>> well. >>>> - Converted the both timeout and select selectors to an *inner >>>> class* (not static), yielded the same results, doesn't compile. >>>> - flatSelect *without* timeout, but with an inner class for >>>> PatternFlatSelectFunction, it compiles (same as first bullet). >>>> - Tried both of these selectors with *empty* body. Just a skeleton >>>> class. Doesn't compile either. Empty body example is in my first email. >>>> - Tried making both selectors *static public inner* classes, >>>> doesn't compile either. >>>> - Extracted both timeout and flat selectors to their own *independent >>>> classes* in separate files. Doesn't compile. >>>> - I am putting the *error stack* below. >>>> - Without the timeout selector in any class or lambda shape, with >>>> empty or full body, flatSelect compiles well. >>>> >>>> Would these findings help? Any ideas? >>>> >>>> Here is an error stack: >>>> >>>> 09:36:51,925 ERROR >>>> com.motaword.ipm.kernel.error.controller.ExceptionHandler - >>>> org.apache.flink.api.common.InvalidProgramException: The implementation >>>> of the PatternFlatSelectAdapter is not serializable. The object probably >>>> contains or references non serializable fields. >>>> at >>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) >>>> at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558) >>>> at >>>> org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86) >>>> at org.apache.flink.cep.PatternStream.process(PatternStream.java:114) >>>> at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451) >>>> at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408) >>>> at >>>> com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89) >>>> at >>>> com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45) >>>> at >>>> com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31) >>>> at com.motaword.ipm.kernel.Application.main(Application.java:63) >>>> Caused by: java.io.NotSerializableException: >>>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator >>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>> at >>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>> at >>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>> at >>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>> at >>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>> at >>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>> at >>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>> at >>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>> at >>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>> at >>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>> at >>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576) >>>> at >>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) >>>> ... 9 more >>>> >>>> >>>> >>>> >>>> >>>> >>>> --- >>>> Oytun Tez >>>> >>>> *M O T A W O R D* >>>> The World's Fastest Human Translation Platform. >>>> oy...@motaword.com — www.motaword.com >>>> >>>> >>>> On Fri, Apr 19, 2019 at 3:14 AM JingsongLee <lzljs3620...@aliyun.com> >>>> wrote: >>>> >>>>> Hi @Oytun Tez >>>>> It Looks like your *PatternFlatSelectFunction* is not serializable. >>>>> Because you use anonymous inner class, >>>>> Check the class to which getPending belongs, maybe that class is not >>>>> serializable? >>>>> >>>>> Or you may be advised not to use internal classes, but to use a static >>>>> internal class. >>>>> >>>>> Best, JingsongLee >>>>> >>>>> ------------------------------------------------------------------ >>>>> From:Oytun Tez <oy...@motaword.com> >>>>> Send Time:2019年4月19日(星期五) 03:38 >>>>> To:user <user@flink.apache.org> >>>>> Subject:PatternFlatSelectAdapter - Serialization issue after 1.8 >>>>> upgrade >>>>> >>>>> Hi all, >>>>> >>>>> We are just migration from 1.6 to 1.8. I encountered a serialization >>>>> error which we didn't have before if memory serves: The >>>>> implementation of the *PatternFlatSelectAdapter* is not serializable. >>>>> The object probably contains or references non serializable fields. >>>>> >>>>> The method below simply intakes a PatternStream from CEP.pattern() and >>>>> makes use of the sideoutput for timed-out events. Can you see anything >>>>> weird here (WorkerEvent is the input, but collectors collect Project >>>>> object)? >>>>> >>>>> protected DataStream<Project> getPending(PatternStream<WorkerEvent> >>>>> patternStream) { >>>>> OutputTag<Project> pendingProjectsTag = new *OutputTag* >>>>> <Project>("invitation-pending-projects"){}; >>>>> >>>>> return patternStream.*flatSelect*( >>>>> pendingProjectsTag, >>>>> new *PatternFlatTimeoutFunction*<WorkerEvent, >>>>> Project>() { >>>>> @Override >>>>> public void *timeout*(Map<String, >>>>> List<WorkerEvent>> map, long l, Collector<Project> collector) { >>>>> } >>>>> }, >>>>> new *PatternFlatSelectFunction*<WorkerEvent, >>>>> Project>() { >>>>> @Override >>>>> public void *flatSelect*(Map<String, >>>>> List<WorkerEvent>> pattern, Collector<Project> collector) { >>>>> } >>>>> } >>>>> ).name("Select pending projects for invitation"). >>>>> *getSideOutput*(pendingProjectsTag); >>>>> } >>>>> >>>>> --- >>>>> Oytun Tez >>>>> >>>>> *M O T A W O R D* >>>>> The World's Fastest Human Translation Platform. >>>>> oy...@motaword.com — www.motaword.com >>>>> >>>>> -- >> Best, >> Guowei >> >>