Hi Juan
I see what your problem is. You have declared class C with field x of type Int. When you map to class fields you have to specify the type. Try the below code it works. case class C(x: Int) val xs = benv.fromCollection(1 to 10).map(y => C(y.toInt)) xs.count benv.execute("Count Collection") Cheers Ravi Pullareddy *From:* Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> *Sent:* Monday, 20 April 2020 2:44 AM *To:* users@zeppelin.apache.org *Subject:* Re: Serialization issues with case classes with Flink Hi Jeff, I’ll try using POJO clases instead. Thanks, Juan El El dom, 19 abr 2020 a las 15:51, Jeff Zhang <zjf...@gmail.com> escribió: Hi Juan, This is an issue of flink, I have created ticket in flink community, https://issues.apache.org/jira/browse/FLINK-16969 The workaround is to use POJO class instead of case class. Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> 于2020年4月19日周日 下午 7:58写道: Thanks also to Som and Zahid for your answers. But as I show in my previous answer, this issue happens without using the CSV source, and it doesn't show in the Flink Scala shell, so it looks like an issue with Zeppelin interpreter for Flink On Sun, Apr 19, 2020 at 1:56 PM Juan Rodríguez Hortalá < juan.rodriguez.hort...@gmail.com> wrote: Hi Ravi, I posted another message with the minimal reproduction, I repeat it here: ```scala case class C(x: Int) val xs = benv.fromCollection(1 to 10) val cs = xs.map{C(_)} cs.count ``` defined class C xs: org.apache.flink.api.scala.DataSet[Int] = org.apache.flink.api.scala.DataSet@39c713c6 cs: org.apache.flink.api.scala.DataSet[C] = org.apache.flink.api.scala.DataSet@205a35a java.lang.IllegalArgumentException: requirement failed: The class C is an instance class, meaning it is not a member of a toplevel object, or of an object contained in a toplevel object, therefore it requires an outer instance to be instantiated, but we don't have a reference to the outer instance. Please consider changing the outer class to an object. at scala.Predef$.require(Predef.scala:224) at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90) at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46) ... 125 elided As you can see, this minimal example doesn't use CSV files, so I don't think the CSV connector it the problem. I don't think this is a Flink issue either, as that example works fine in the Flink shell: ``` root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh local scala> val xs = benv.fromCollection(1 to 10) xs: org.apache.flink.api.scala.DataSet[Int] = org.apache.flink.api.scala.DataSet@6012bee8 scala> xs.collect res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> val cs = xs.map{C(_)} cs: org.apache.flink.api.scala.DataSet[C] = org.apache.flink.api.scala.DataSet@36f807f5 scala> cs.collect res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8), C(9), C(10)) scala> ``` Note I was running Zeppelin in a docker container in my laptop, also using Flink's local mode I think this is a problem with the Zeppelin integratin with Flink, and how it processes case class definitions in a cell. Thanks for your answer, Juan On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy < ravi.pullare...@minlog.com.au> wrote: Hi Juan I have written various applications for continuous processing of csv files. Please post your entire code and how you are mapping. It becomes easy to highlight the issue. Thanks Ravi Pullareddy *From:* Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> *Sent:* Sunday, 19 April 2020 6:25 PM *To:* users@zeppelin.apache.org *Subject:* Re: Serialization issues with case classes with Flink Just for the record, the Spark version of that works fine: ``` %spark case class C2(x: Int) val xs = sc.parallelize(1 to 10) val csSpark = xs.map{C2(_)} csSpark.collect res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7), C2(8), C2(9), C2(10)) ``` Thanks, Juan On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá < juan.rodriguez.hort...@gmail.com> wrote: Minimal reproduction: - Fist option ```scala case class C(x: Int) val xs = benv.fromCollection(1 to 10) val cs = xs.map{C(_)} cs.count ``` defined class C xs: org.apache.flink.api.scala.DataSet[Int] = org.apache.flink.api.scala.DataSet@39c713c6 cs: org.apache.flink.api.scala.DataSet[C] = org.apache.flink.api.scala.DataSet@205a35a java.lang.IllegalArgumentException: requirement failed: The class C is an instance class, meaning it is not a member of a toplevel object, or of an object contained in a toplevel object, therefore it requires an outer instance to be instantiated, but we don't have a reference to the outer instance. Please consider changing the outer class to an object. at scala.Predef$.require(Predef.scala:224) at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90) at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46) ... 125 elided - Second option ```scala object Types { case class C(x: Int) } val cs2 = xs.map{Types.C(_)} cs2.count ``` defined object Types org.apache.flink.api.common.InvalidProgramException: Task not serializable at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408) at org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400) at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168) at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided Caused by: java.io.NotSerializableException: org.apache.flink.api.scala.DataSet at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) Greetings, Juan On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá < juan.rodriguez.hort...@gmail.com> wrote: Hi, I'm using the Flink interpreter and the benv environment. I'm reading some csv files using benv.readCsvFile and it works ok. I have also defined a case class C for the csv records. The problem happens when I apply a map operation on the DataSet of tuples returned by benv.readCsvFile, to convert it into a DataSet[C]. - If I define the case class C in some cell I get this error: java.lang.IllegalArgumentException: requirement failed: The class C is an instance class, meaning it is not a member of a toplevel object, or of an object contained in a toplevel object, therefore it requires an outer instance to be instantiated, but we don't have a reference to the outer instance. Please consider changing the outer class to an object. - That sounds related to this https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink, it looks like the zeppelin flink interpreter is wrapping the case class definition as an inner class. I tried defining the case class C inside an object Types that I define in another cell. With that I also get a serialization exception. org.apache.flink.api.common.InvalidProgramException: Task not serializable org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408) org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400) org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168) org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135) $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133) I guess that didn't work because the object Types is still defined inside some class implicitly defined by the interpreter. Any thoughs about how can I fix this? Also, I understand $line163 etc refer to the code in the cells, is there some convention I can use to understand to which line in the notebook those error messages are referring to? Thanks in advance, Juan -- Best Regards Jeff Zhang