Hi Jeff, I’ll try using POJO clases instead.
Thanks, Juan El El dom, 19 abr 2020 a las 15:51, Jeff Zhang <zjf...@gmail.com> escribió: > Hi Juan, > > This is an issue of flink, I have created ticket in flink community, > https://issues.apache.org/jira/browse/FLINK-16969 > The workaround is to use POJO class instead of case class. > > Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> 于2020年4月19日周日 > 下午7:58写道: > >> Thanks also to Som and Zahid for your answers. But as I show in my >> previous answer, this issue happens without using the CSV source, and it >> doesn't show in the Flink Scala shell, so it looks like an issue with >> Zeppelin interpreter for Flink >> >> On Sun, Apr 19, 2020 at 1:56 PM Juan Rodríguez Hortalá < >> juan.rodriguez.hort...@gmail.com> wrote: >> >>> Hi Ravi, >>> >>> I posted another message with the minimal reproduction, I repeat it here: >>> >>> ```scala >>> case class C(x: Int) >>> >>> val xs = benv.fromCollection(1 to 10) >>> val cs = xs.map{C(_)} >>> >>> cs.count >>> ``` >>> >>> defined class C xs: org.apache.flink.api.scala.DataSet[Int] = >>> org.apache.flink.api.scala.DataSet@39c713c6 cs: >>> org.apache.flink.api.scala.DataSet[C] = >>> org.apache.flink.api.scala.DataSet@205a35a >>> java.lang.IllegalArgumentException: requirement failed: The class C is an >>> instance class, meaning it is not a member of a toplevel object, or of an >>> object contained in a toplevel object, therefore it requires an outer >>> instance to be instantiated, but we don't have a reference to the outer >>> instance. Please consider changing the outer class to an object. at >>> scala.Predef$.require(Predef.scala:224) at >>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90) >>> at >>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46) >>> ... 125 elided >>> >>> As you can see, this minimal example doesn't use CSV files, so I don't >>> think the CSV connector it the problem. >>> >>> I don't think this is a Flink issue either, as that example works fine >>> in the Flink shell: >>> >>> ``` >>> root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh local >>> scala> val xs = benv.fromCollection(1 to 10) >>> xs: org.apache.flink.api.scala.DataSet[Int] = >>> org.apache.flink.api.scala.DataSet@6012bee8 >>> >>> scala> xs.collect >>> res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) >>> >>> scala> val cs = xs.map{C(_)} >>> cs: org.apache.flink.api.scala.DataSet[C] = >>> org.apache.flink.api.scala.DataSet@36f807f5 >>> >>> scala> cs.collect >>> res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8), >>> C(9), C(10)) >>> >>> scala> >>> ``` >>> >>> Note I was running Zeppelin in a docker container in my laptop, also >>> using Flink's local mode >>> >>> I think this is a problem with the Zeppelin integratin with Flink, and >>> how it processes case class definitions in a cell. >>> >>> Thanks for your answer, >>> >>> Juan >>> >>> >>> On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy < >>> ravi.pullare...@minlog.com.au> wrote: >>> >>>> Hi Juan >>>> >>>> >>>> >>>> I have written various applications for continuous processing of csv >>>> files. Please post your entire code and how you are mapping. It becomes >>>> easy to highlight the issue. >>>> >>>> >>>> >>>> Thanks >>>> >>>> Ravi Pullareddy >>>> >>>> >>>> >>>> >>>> >>>> *From:* Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> >>>> *Sent:* Sunday, 19 April 2020 6:25 PM >>>> *To:* users@zeppelin.apache.org >>>> *Subject:* Re: Serialization issues with case classes with Flink >>>> >>>> >>>> >>>> Just for the record, the Spark version of that works fine: >>>> >>>> >>>> >>>> ``` >>>> %spark >>>> >>>> case class C2(x: Int) >>>> >>>> val xs = sc.parallelize(1 to 10) >>>> val csSpark = xs.map{C2(_)} >>>> >>>> csSpark.collect >>>> >>>> >>>> >>>> res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), >>>> C2(7), C2(8), C2(9), C2(10)) >>>> >>>> ``` >>>> >>>> >>>> >>>> Thanks, >>>> >>>> >>>> >>>> Juan >>>> >>>> >>>> >>>> On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá < >>>> juan.rodriguez.hort...@gmail.com> wrote: >>>> >>>> Minimal reproduction: >>>> >>>> - Fist option >>>> >>>> ```scala >>>> >>>> case class C(x: Int) >>>> >>>> val xs = benv.fromCollection(1 to 10) >>>> val cs = xs.map{C(_)} >>>> >>>> cs.count >>>> >>>> ``` >>>> >>>> >>>> >>>> defined class C xs: org.apache.flink.api.scala.DataSet[Int] = >>>> org.apache.flink.api.scala.DataSet@39c713c6 cs: >>>> org.apache.flink.api.scala.DataSet[C] = >>>> org.apache.flink.api.scala.DataSet@205a35a >>>> java.lang.IllegalArgumentException: requirement failed: The class C is an >>>> instance class, meaning it is not a member of a toplevel object, or of an >>>> object contained in a toplevel object, therefore it requires an outer >>>> instance to be instantiated, but we don't have a reference to the outer >>>> instance. Please consider changing the outer class to an object. at >>>> scala.Predef$.require(Predef.scala:224) at >>>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90) >>>> at >>>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46) >>>> ... 125 elided >>>> >>>> - Second option >>>> >>>> ```scala >>>> >>>> object Types { >>>> case class C(x: Int) >>>> } >>>> >>>> val cs2 = xs.map{Types.C(_)} >>>> >>>> cs2.count >>>> >>>> ``` >>>> >>>> >>>> >>>> defined object Types >>>> org.apache.flink.api.common.InvalidProgramException: Task not serializable >>>> at >>>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408) >>>> at >>>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400) >>>> at >>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168) >>>> at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at >>>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at >>>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided >>>> Caused by: java.io.NotSerializableException: >>>> org.apache.flink.api.scala.DataSet at >>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at >>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>> at >>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at >>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>> at >>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>> >>>> >>>> >>>> Greetings, >>>> >>>> >>>> >>>> Juan >>>> >>>> >>>> >>>> >>>> >>>> On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá < >>>> juan.rodriguez.hort...@gmail.com> wrote: >>>> >>>> Hi, >>>> >>>> >>>> >>>> I'm using the Flink interpreter and the benv environment. I'm reading >>>> some csv files using benv.readCsvFile and it works ok. I have also defined >>>> a case class C for the csv records. The problem happens when I apply a >>>> map operation on the DataSet of tuples returned by benv.readCsvFile, to >>>> convert it into a DataSet[C]. >>>> >>>> - If I define the case class C in some cell I get this error: >>>> >>>> java.lang.IllegalArgumentException: requirement failed: The class C is >>>> an instance class, meaning it is not a member of a toplevel object, or of >>>> an object contained in a toplevel object, therefore it requires an outer >>>> instance to be instantiated, but we don't have a reference to the outer >>>> instance. Please consider changing the outer class to an object. >>>> >>>> >>>> >>>> - That sounds related to this >>>> >>>> https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink, >>>> it looks like the zeppelin flink interpreter is wrapping the case class >>>> definition as an inner class. I tried defining the case class C inside >>>> an >>>> object Types that I define in another cell. With that I also get a >>>> serialization exception. >>>> >>>> org.apache.flink.api.common.InvalidProgramException: Task not >>>> serializable >>>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408) >>>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400) >>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168) >>>> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) >>>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) >>>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) >>>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135) >>>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133) >>>> >>>> >>>> >>>> I guess that didn't work because the object Types is still defined >>>> inside some class implicitly defined by the interpreter. >>>> >>>> >>>> >>>> Any thoughs about how can I fix this? Also, I understand $line163 etc >>>> refer to the code in the cells, is there some convention I can use to >>>> understand to which line in the notebook those error messages are referring >>>> to? >>>> >>>> >>>> >>>> Thanks in advance, >>>> >>>> >>>> >>>> Juan >>>> >>>> > > -- > Best Regards > > Jeff Zhang >