Thank you Guowei and Dawid! I am trying your suggestions today and will
report back.

- I assume the cleaning operation should be done only once because of the
upgrade, or should I run every time the application is up?
- `static` sounds a very simple fix to get rid of this. Any drawbacks here?




---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Tue, Apr 23, 2019 at 2:56 AM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi Oytun,
>
> I think there is a regression introduced in 1.8 how we handle output tags.
> The problem is we do not call ClosureCleaner on OutputTag.
>
> There are two options how you can workaround this issue:
>
> 1. Declare the OutputTag static
>
> 2. Clean the closure explicitly as Guowei suggested:
> StreamExecutionEnvironment.clean(pendingProjectsTag)
>
> I also opened a jira issue to fix this (FLINK-12297[1])
>
> Best,
>
> Dawid
>
> [1] https://issues.apache.org/jira/browse/FLINK-12297
> On 22/04/2019 03:06, Guowei Ma wrote:
>
> I think you could try
> StreamExecutionEnvironment.clean(pendingProjectsTag).
>
>
> Oytun Tez <oy...@motaword.com>于2019年4月19日 周五下午9:58写道:
>
>> Forgot to answer one of your points: the parent class compiles well
>> without this CEP selector (with timeout signature)...
>>
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>>
>> On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez <oy...@motaword.com> wrote:
>>
>>> Hey JingsongLee!
>>>
>>> Here are some findings...
>>>
>>>    - flatSelect *without timeout* works normally:
>>>    patternStream.flatSelect(PatternFlatSelectFunction), this compiles
>>>    well.
>>>    - Converted the both timeout and select selectors to an *inner class*
>>>    (not static), yielded the same results, doesn't compile.
>>>    - flatSelect *without* timeout, but with an inner class for
>>>    PatternFlatSelectFunction, it compiles (same as first bullet).
>>>    - Tried both of these selectors with *empty* body. Just a skeleton
>>>    class. Doesn't compile either. Empty body example is in my first email.
>>>    - Tried making both selectors *static public inner* classes, doesn't
>>>    compile either.
>>>    - Extracted both timeout and flat selectors to their own *independent
>>>    classes* in separate files. Doesn't compile.
>>>    - I am putting the *error stack* below.
>>>    - Without the timeout selector in any class or lambda shape, with
>>>    empty or full body, flatSelect compiles well.
>>>
>>> Would these findings help? Any ideas?
>>>
>>> Here is an error stack:
>>>
>>> 09:36:51,925 ERROR
>>> com.motaword.ipm.kernel.error.controller.ExceptionHandler     -
>>> org.apache.flink.api.common.InvalidProgramException: The implementation
>>> of the PatternFlatSelectAdapter is not serializable. The object probably
>>> contains or references non serializable fields.
>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
>>> at
>>> org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86)
>>> at org.apache.flink.cep.PatternStream.process(PatternStream.java:114)
>>> at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451)
>>> at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408)
>>> at
>>> com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89)
>>> at
>>> com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45)
>>> at
>>> com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31)
>>> at com.motaword.ipm.kernel.Application.main(Application.java:63)
>>> Caused by: java.io.NotSerializableException:
>>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>> at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>> at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>> at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>> at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>> at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>> at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>> at
>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
>>> ... 9 more
>>>
>>>
>>>
>>>
>>>
>>>
>>> ---
>>> Oytun Tez
>>>
>>> *M O T A W O R D*
>>> The World's Fastest Human Translation Platform.
>>> oy...@motaword.com — www.motaword.com
>>>
>>>
>>> On Fri, Apr 19, 2019 at 3:14 AM JingsongLee <lzljs3620...@aliyun.com>
>>> wrote:
>>>
>>>> Hi @Oytun Tez
>>>> It Looks like your *PatternFlatSelectFunction* is not serializable.
>>>> Because you use anonymous inner class,
>>>> Check the class to which getPending belongs, maybe that class is not
>>>> serializable?
>>>>
>>>> Or you may be advised not to use internal classes, but to use a static 
>>>> internal class.
>>>>
>>>> Best, JingsongLee
>>>>
>>>> ------------------------------------------------------------------
>>>> From:Oytun Tez <oy...@motaword.com>
>>>> Send Time:2019年4月19日(星期五) 03:38
>>>> To:user <user@flink.apache.org>
>>>> Subject:PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade
>>>>
>>>> Hi all,
>>>>
>>>> We are just migration from 1.6 to 1.8. I encountered a serialization
>>>> error which we didn't have before if memory serves: The implementation
>>>> of the *PatternFlatSelectAdapter* is not serializable. The object
>>>> probably contains or references non serializable fields.
>>>>
>>>> The method below simply intakes a PatternStream from CEP.pattern() and
>>>> makes use of the sideoutput for timed-out events. Can you see anything
>>>> weird here (WorkerEvent is the input, but collectors collect Project
>>>> object)?
>>>>
>>>> protected DataStream<Project> getPending(PatternStream<WorkerEvent>
>>>> patternStream) {
>>>>             OutputTag<Project> pendingProjectsTag = new *OutputTag*
>>>> <Project>("invitation-pending-projects"){};
>>>>
>>>>             return patternStream.*flatSelect*(
>>>>                     pendingProjectsTag,
>>>>                     new *PatternFlatTimeoutFunction*<WorkerEvent,
>>>> Project>() {
>>>>                         @Override
>>>>                         public void *timeout*(Map<String,
>>>> List<WorkerEvent>> map, long l, Collector<Project> collector) {
>>>>                         }
>>>>                     },
>>>>                     new *PatternFlatSelectFunction*<WorkerEvent,
>>>> Project>() {
>>>>                         @Override
>>>>                         public void *flatSelect*(Map<String,
>>>> List<WorkerEvent>> pattern, Collector<Project> collector) {
>>>>                         }
>>>>                     }
>>>>             ).name("Select pending projects for invitation").
>>>> *getSideOutput*(pendingProjectsTag);
>>>>         }
>>>>
>>>> ---
>>>> Oytun Tez
>>>>
>>>> *M O T A W O R D*
>>>> The World's Fastest Human Translation Platform.
>>>> oy...@motaword.com — www.motaword.com
>>>>
>>>> --
> Best,
> Guowei
>
>

Reply via email to