[ 
https://issues.apache.org/jira/browse/FLINK-4331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15415410#comment-15415410
 ] 

Stephan Ewen commented on FLINK-4331:
-------------------------------------

Can you share some more of the code? Without knowing what "func" is, what the 
data types of any of the other objects are, it is hard to figure something out.

In the end, this is a Scala Closure serialization issue. Users always have to 
bear the closures they form in mind (that they are serializable)

Flink can only try to help via the CosureCleaner. It could be a case where the 
Flink Closure Cleaner does not clean as much as it could, which should be 
checked.

> Flink is not able to serialize scala classes / Task Not Serializable
> --------------------------------------------------------------------
>
>                 Key: FLINK-4331
>                 URL: https://issues.apache.org/jira/browse/FLINK-4331
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.1.0
>            Reporter: Pushpendra Jaiswal
>            Priority: Minor
>
> I have scala class having 2 fields which are vals but flink is saying it 
> doesn't have setters. Thus task is not serializable.
> I tried setters using var but then it says duplicate setter. vals are public 
> then why it is asking for setters. Flink version 1.1.0
> class Impression(val map: Map[String, String],val keySet:Set[String])
> ==========================================================================
>   val preAggregate = stream
> .filter(impression => {
>     true
> })
>  .map(impression => {
>   val xmap = impression.map
>   val values = valFunction(xmap)
>   new ImpressionRecord(impression, values._1, values._2, values._3)
> })
> class Impression does not contain a setter for field map 19:54:49.995 [main] 
> INFO o.a.f.a.java.typeutils.TypeExtractor - class Impression is not a valid 
> POJO type 19:54:49.997 [main] DEBUG o.a.flink.api.scala.ClosureCleaner$ - 
> accessedFields: Map(class -> Set()) Exception in thread "main" 
> org.apache.flink.api.common.InvalidProgramException: Task not serializable at 
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
>  at ) Caused by: java.io.NotSerializableException: 
> org.apache.flink.streaming.api.scala.DataStream at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at 
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
>  at 
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
>  ... 18 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to