Hi Juan, This is an issue of flink, I have created ticket in flink community, https://issues.apache.org/jira/browse/FLINK-16969 The workaround is to use POJO class instead of case class.
Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> 于2020年4月19日周日 下午7:58写道: > Thanks also to Som and Zahid for your answers. But as I show in my > previous answer, this issue happens without using the CSV source, and it > doesn't show in the Flink Scala shell, so it looks like an issue with > Zeppelin interpreter for Flink > > On Sun, Apr 19, 2020 at 1:56 PM Juan Rodríguez Hortalá < > juan.rodriguez.hort...@gmail.com> wrote: > >> Hi Ravi, >> >> I posted another message with the minimal reproduction, I repeat it here: >> >> ```scala >> case class C(x: Int) >> >> val xs = benv.fromCollection(1 to 10) >> val cs = xs.map{C(_)} >> >> cs.count >> ``` >> >> defined class C xs: org.apache.flink.api.scala.DataSet[Int] = >> org.apache.flink.api.scala.DataSet@39c713c6 cs: >> org.apache.flink.api.scala.DataSet[C] = >> org.apache.flink.api.scala.DataSet@205a35a >> java.lang.IllegalArgumentException: requirement failed: The class C is an >> instance class, meaning it is not a member of a toplevel object, or of an >> object contained in a toplevel object, therefore it requires an outer >> instance to be instantiated, but we don't have a reference to the outer >> instance. Please consider changing the outer class to an object. at >> scala.Predef$.require(Predef.scala:224) at >> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90) >> at >> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46) >> ... 125 elided >> >> As you can see, this minimal example doesn't use CSV files, so I don't >> think the CSV connector it the problem. >> >> I don't think this is a Flink issue either, as that example works fine in >> the Flink shell: >> >> ``` >> root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh local >> scala> val xs = benv.fromCollection(1 to 10) >> xs: org.apache.flink.api.scala.DataSet[Int] = >> org.apache.flink.api.scala.DataSet@6012bee8 >> >> scala> xs.collect >> res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) >> >> scala> val cs = xs.map{C(_)} >> cs: org.apache.flink.api.scala.DataSet[C] = >> org.apache.flink.api.scala.DataSet@36f807f5 >> >> scala> cs.collect >> res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8), >> C(9), C(10)) >> >> scala> >> ``` >> >> Note I was running Zeppelin in a docker container in my laptop, also >> using Flink's local mode >> >> I think this is a problem with the Zeppelin integratin with Flink, and >> how it processes case class definitions in a cell. >> >> Thanks for your answer, >> >> Juan >> >> >> On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy < >> ravi.pullare...@minlog.com.au> wrote: >> >>> Hi Juan >>> >>> >>> >>> I have written various applications for continuous processing of csv >>> files. Please post your entire code and how you are mapping. It becomes >>> easy to highlight the issue. >>> >>> >>> >>> Thanks >>> >>> Ravi Pullareddy >>> >>> >>> >>> >>> >>> *From:* Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> >>> *Sent:* Sunday, 19 April 2020 6:25 PM >>> *To:* users@zeppelin.apache.org >>> *Subject:* Re: Serialization issues with case classes with Flink >>> >>> >>> >>> Just for the record, the Spark version of that works fine: >>> >>> >>> >>> ``` >>> %spark >>> >>> case class C2(x: Int) >>> >>> val xs = sc.parallelize(1 to 10) >>> val csSpark = xs.map{C2(_)} >>> >>> csSpark.collect >>> >>> >>> >>> res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), >>> C2(7), C2(8), C2(9), C2(10)) >>> >>> ``` >>> >>> >>> >>> Thanks, >>> >>> >>> >>> Juan >>> >>> >>> >>> On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá < >>> juan.rodriguez.hort...@gmail.com> wrote: >>> >>> Minimal reproduction: >>> >>> - Fist option >>> >>> ```scala >>> >>> case class C(x: Int) >>> >>> val xs = benv.fromCollection(1 to 10) >>> val cs = xs.map{C(_)} >>> >>> cs.count >>> >>> ``` >>> >>> >>> >>> defined class C xs: org.apache.flink.api.scala.DataSet[Int] = >>> org.apache.flink.api.scala.DataSet@39c713c6 cs: >>> org.apache.flink.api.scala.DataSet[C] = >>> org.apache.flink.api.scala.DataSet@205a35a >>> java.lang.IllegalArgumentException: requirement failed: The class C is an >>> instance class, meaning it is not a member of a toplevel object, or of an >>> object contained in a toplevel object, therefore it requires an outer >>> instance to be instantiated, but we don't have a reference to the outer >>> instance. Please consider changing the outer class to an object. at >>> scala.Predef$.require(Predef.scala:224) at >>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90) >>> at >>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46) >>> ... 125 elided >>> >>> - Second option >>> >>> ```scala >>> >>> object Types { >>> case class C(x: Int) >>> } >>> >>> val cs2 = xs.map{Types.C(_)} >>> >>> cs2.count >>> >>> ``` >>> >>> >>> >>> defined object Types >>> org.apache.flink.api.common.InvalidProgramException: Task not serializable >>> at >>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408) >>> at >>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400) >>> at >>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168) >>> at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at >>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at >>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided >>> Caused by: java.io.NotSerializableException: >>> org.apache.flink.api.scala.DataSet at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>> >>> >>> >>> Greetings, >>> >>> >>> >>> Juan >>> >>> >>> >>> >>> >>> On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá < >>> juan.rodriguez.hort...@gmail.com> wrote: >>> >>> Hi, >>> >>> >>> >>> I'm using the Flink interpreter and the benv environment. I'm reading >>> some csv files using benv.readCsvFile and it works ok. I have also defined >>> a case class C for the csv records. The problem happens when I apply a >>> map operation on the DataSet of tuples returned by benv.readCsvFile, to >>> convert it into a DataSet[C]. >>> >>> - If I define the case class C in some cell I get this error: >>> >>> java.lang.IllegalArgumentException: requirement failed: The class C is >>> an instance class, meaning it is not a member of a toplevel object, or of >>> an object contained in a toplevel object, therefore it requires an outer >>> instance to be instantiated, but we don't have a reference to the outer >>> instance. Please consider changing the outer class to an object. >>> >>> >>> >>> - That sounds related to this >>> >>> https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink, >>> it looks like the zeppelin flink interpreter is wrapping the case class >>> definition as an inner class. I tried defining the case class C inside an >>> object Types that I define in another cell. With that I also get a >>> serialization exception. >>> >>> org.apache.flink.api.common.InvalidProgramException: Task not >>> serializable >>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408) >>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400) >>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168) >>> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) >>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) >>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) >>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135) >>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133) >>> >>> >>> >>> I guess that didn't work because the object Types is still defined >>> inside some class implicitly defined by the interpreter. >>> >>> >>> >>> Any thoughs about how can I fix this? Also, I understand $line163 etc >>> refer to the code in the cells, is there some convention I can use to >>> understand to which line in the notebook those error messages are referring >>> to? >>> >>> >>> >>> Thanks in advance, >>> >>> >>> >>> Juan >>> >>> -- Best Regards Jeff Zhang