Looking at your original message Your first error was java.lang.IllegalArgumentException: requirement failed:
Not serialisation error. As a golden rule of thumb always look at the first error , irrespective of programming language. On Mon, 20 Apr 2020, 03:13 Ravi Pullareddy, <ravi.pullare...@minlog.com.au> wrote: > Hi Juan > > > > I see what your problem is. You have declared class C with field x of type > Int. When you map to class fields you have to specify the type. Try the > below code it works. > > > > case class C(x: Int) > > val xs = benv.fromCollection(1 to 10).map(y => C(y.toInt)) > > xs.count > > > > benv.execute("Count Collection") > > > > Cheers > > Ravi Pullareddy > > > > *From:* Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> > *Sent:* Monday, 20 April 2020 2:44 AM > *To:* users@zeppelin.apache.org > *Subject:* Re: Serialization issues with case classes with Flink > > > > Hi Jeff, > > > > I’ll try using POJO clases instead. > > > > Thanks, > > > > Juan > > > > El El dom, 19 abr 2020 a las 15:51, Jeff Zhang <zjf...@gmail.com> > escribió: > > Hi Juan, > > > > This is an issue of flink, I have created ticket in flink community, > https://issues.apache.org/jira/browse/FLINK-16969 > > The workaround is to use POJO class instead of case class. > > > > Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> 于2020年4月19日周日 下午 > 7:58写道: > > Thanks also to Som and Zahid for your answers. But as I show in my > previous answer, this issue happens without using the CSV source, and it > doesn't show in the Flink Scala shell, so it looks like an issue with > Zeppelin interpreter for Flink > > > > On Sun, Apr 19, 2020 at 1:56 PM Juan Rodríguez Hortalá < > juan.rodriguez.hort...@gmail.com> wrote: > > Hi Ravi, > > > > I posted another message with the minimal reproduction, I repeat it here: > > > > ```scala > > case class C(x: Int) > > val xs = benv.fromCollection(1 to 10) > val cs = xs.map{C(_)} > > cs.count > > ``` > > > > defined class C xs: org.apache.flink.api.scala.DataSet[Int] = > org.apache.flink.api.scala.DataSet@39c713c6 cs: > org.apache.flink.api.scala.DataSet[C] = > org.apache.flink.api.scala.DataSet@205a35a > java.lang.IllegalArgumentException: requirement failed: The class C is an > instance class, meaning it is not a member of a toplevel object, or of an > object contained in a toplevel object, therefore it requires an outer > instance to be instantiated, but we don't have a reference to the outer > instance. Please consider changing the outer class to an object. at > scala.Predef$.require(Predef.scala:224) at > org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90) > at > org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46) > ... 125 elided > > > > As you can see, this minimal example doesn't use CSV files, so I don't > think the CSV connector it the problem. > > > > I don't think this is a Flink issue either, as that example works fine in > the Flink shell: > > > > ``` > > root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh local > > scala> val xs = benv.fromCollection(1 to 10) > xs: org.apache.flink.api.scala.DataSet[Int] = > org.apache.flink.api.scala.DataSet@6012bee8 > > scala> xs.collect > res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) > > scala> val cs = xs.map{C(_)} > cs: org.apache.flink.api.scala.DataSet[C] = > org.apache.flink.api.scala.DataSet@36f807f5 > > scala> cs.collect > res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8), > C(9), C(10)) > > scala> > > ``` > > > > Note I was running Zeppelin in a docker container in my laptop, also using > Flink's local mode > > > > I think this is a problem with the Zeppelin integratin with Flink, and how > it processes case class definitions in a cell. > > > > Thanks for your answer, > > > > Juan > > > > > > On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy < > ravi.pullare...@minlog.com.au> wrote: > > Hi Juan > > > > I have written various applications for continuous processing of csv > files. Please post your entire code and how you are mapping. It becomes > easy to highlight the issue. > > > > Thanks > > Ravi Pullareddy > > > > > > *From:* Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> > *Sent:* Sunday, 19 April 2020 6:25 PM > *To:* users@zeppelin.apache.org > *Subject:* Re: Serialization issues with case classes with Flink > > > > Just for the record, the Spark version of that works fine: > > > > ``` > %spark > > case class C2(x: Int) > > val xs = sc.parallelize(1 to 10) > val csSpark = xs.map{C2(_)} > > csSpark.collect > > > > res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7), > C2(8), C2(9), C2(10)) > > ``` > > > > Thanks, > > > > Juan > > > > On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá < > juan.rodriguez.hort...@gmail.com> wrote: > > Minimal reproduction: > > - Fist option > > ```scala > > case class C(x: Int) > > val xs = benv.fromCollection(1 to 10) > val cs = xs.map{C(_)} > > cs.count > > ``` > > > > defined class C xs: org.apache.flink.api.scala.DataSet[Int] = > org.apache.flink.api.scala.DataSet@39c713c6 cs: > org.apache.flink.api.scala.DataSet[C] = > org.apache.flink.api.scala.DataSet@205a35a > java.lang.IllegalArgumentException: requirement failed: The class C is an > instance class, meaning it is not a member of a toplevel object, or of an > object contained in a toplevel object, therefore it requires an outer > instance to be instantiated, but we don't have a reference to the outer > instance. Please consider changing the outer class to an object. at > scala.Predef$.require(Predef.scala:224) at > org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90) > at > org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46) > ... 125 elided > > - Second option > > ```scala > > object Types { > case class C(x: Int) > } > > val cs2 = xs.map{Types.C(_)} > > cs2.count > > ``` > > > > defined object Types org.apache.flink.api.common.InvalidProgramException: > Task not serializable at > org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408) > at > org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400) > at > org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168) > at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at > org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at > org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided > Caused by: java.io.NotSerializableException: > org.apache.flink.api.scala.DataSet at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > > > > Greetings, > > > > Juan > > > > > > On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá < > juan.rodriguez.hort...@gmail.com> wrote: > > Hi, > > > > I'm using the Flink interpreter and the benv environment. I'm reading some > csv files using benv.readCsvFile and it works ok. I have also defined a > case class C for the csv records. The problem happens when I apply a > map operation on the DataSet of tuples returned by benv.readCsvFile, to > convert it into a DataSet[C]. > > - If I define the case class C in some cell I get this error: > > java.lang.IllegalArgumentException: requirement failed: The class C is an > instance class, meaning it is not a member of a toplevel object, or of an > object contained in a toplevel object, therefore it requires an outer > instance to be instantiated, but we don't have a reference to the outer > instance. Please consider changing the outer class to an object. > > > > - That sounds related to this > > https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink, > it looks like the zeppelin flink interpreter is wrapping the case class > definition as an inner class. I tried defining the case class C inside an > object Types that I define in another cell. With that I also get a > serialization exception. > > org.apache.flink.api.common.InvalidProgramException: Task not serializable > org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408) > org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400) > org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168) > org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) > org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) > org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) > $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135) > $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133) > > > > I guess that didn't work because the object Types is still defined inside > some class implicitly defined by the interpreter. > > > > Any thoughs about how can I fix this? Also, I understand $line163 etc > refer to the code in the cells, is there some convention I can use to > understand to which line in the notebook those error messages are referring > to? > > > > Thanks in advance, > > > > Juan > > > > > -- > > Best Regards > > Jeff Zhang > >