Hi Juan,

This is an issue of flink, I have created ticket in flink community,
https://issues.apache.org/jira/browse/FLINK-16969
The workaround is to use POJO class instead of case class.

Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> 于2020年4月19日周日
下午7:58写道:

> Thanks also to Som and Zahid for your answers. But as I show in my
> previous answer, this issue happens without using the CSV source, and it
> doesn't show in the Flink Scala shell, so it looks like an issue with
> Zeppelin interpreter for Flink
>
> On Sun, Apr 19, 2020 at 1:56 PM Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
>> Hi Ravi,
>>
>> I posted another message with the minimal reproduction, I repeat it here:
>>
>> ```scala
>> case class C(x: Int)
>>
>> val xs = benv.fromCollection(1 to 10)
>> val cs = xs.map{C(_)}
>>
>> cs.count
>> ```
>>
>> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
>> org.apache.flink.api.scala.DataSet@39c713c6 cs:
>> org.apache.flink.api.scala.DataSet[C] =
>> org.apache.flink.api.scala.DataSet@205a35a
>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>> instance class, meaning it is not a member of a toplevel object, or of an
>> object contained in a toplevel object, therefore it requires an outer
>> instance to be instantiated, but we don't have a reference to the outer
>> instance. Please consider changing the outer class to an object. at
>> scala.Predef$.require(Predef.scala:224) at
>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
>> at
>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
>> ... 125 elided
>>
>> As you can see, this minimal example doesn't use CSV files, so I don't
>> think the CSV connector it the problem.
>>
>> I don't think this is a Flink issue either, as that example works fine in
>> the Flink shell:
>>
>> ```
>> root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh  local
>> scala> val xs = benv.fromCollection(1 to 10)
>> xs: org.apache.flink.api.scala.DataSet[Int] =
>> org.apache.flink.api.scala.DataSet@6012bee8
>>
>> scala> xs.collect
>> res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>>
>> scala> val cs = xs.map{C(_)}
>> cs: org.apache.flink.api.scala.DataSet[C] =
>> org.apache.flink.api.scala.DataSet@36f807f5
>>
>> scala> cs.collect
>> res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8),
>> C(9), C(10))
>>
>> scala>
>> ```
>>
>> Note I was running Zeppelin in a docker container in my laptop, also
>> using Flink's local mode
>>
>> I think this is a problem with the Zeppelin integratin with Flink, and
>> how it processes case class definitions in a cell.
>>
>> Thanks for your answer,
>>
>> Juan
>>
>>
>> On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy <
>> ravi.pullare...@minlog.com.au> wrote:
>>
>>> Hi Juan
>>>
>>>
>>>
>>> I have written various applications for continuous processing of csv
>>> files. Please post your entire code and how you are mapping. It becomes
>>> easy to highlight the issue.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Ravi Pullareddy
>>>
>>>
>>>
>>>
>>>
>>> *From:* Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com>
>>> *Sent:* Sunday, 19 April 2020 6:25 PM
>>> *To:* users@zeppelin.apache.org
>>> *Subject:* Re: Serialization issues with case classes with Flink
>>>
>>>
>>>
>>> Just for the record, the Spark version of that works fine:
>>>
>>>
>>>
>>> ```
>>> %spark
>>>
>>> case class C2(x: Int)
>>>
>>> val xs = sc.parallelize(1 to 10)
>>> val csSpark = xs.map{C2(_)}
>>>
>>> csSpark.collect
>>>
>>>
>>>
>>> res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6),
>>> C2(7), C2(8), C2(9), C2(10))
>>>
>>> ```
>>>
>>>
>>>
>>> Thanks,
>>>
>>>
>>>
>>> Juan
>>>
>>>
>>>
>>> On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
>>> juan.rodriguez.hort...@gmail.com> wrote:
>>>
>>> Minimal reproduction:
>>>
>>>    - Fist option
>>>
>>> ```scala
>>>
>>> case class C(x: Int)
>>>
>>> val xs = benv.fromCollection(1 to 10)
>>> val cs = xs.map{C(_)}
>>>
>>> cs.count
>>>
>>> ```
>>>
>>>
>>>
>>> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
>>> org.apache.flink.api.scala.DataSet@39c713c6 cs:
>>> org.apache.flink.api.scala.DataSet[C] =
>>> org.apache.flink.api.scala.DataSet@205a35a
>>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>>> instance class, meaning it is not a member of a toplevel object, or of an
>>> object contained in a toplevel object, therefore it requires an outer
>>> instance to be instantiated, but we don't have a reference to the outer
>>> instance. Please consider changing the outer class to an object. at
>>> scala.Predef$.require(Predef.scala:224) at
>>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
>>> at
>>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
>>> ... 125 elided
>>>
>>>    - Second option
>>>
>>> ```scala
>>>
>>> object Types {
>>>     case class C(x: Int)
>>> }
>>>
>>> val cs2 = xs.map{Types.C(_)}
>>>
>>> cs2.count
>>>
>>> ```
>>>
>>>
>>>
>>> defined object Types
>>> org.apache.flink.api.common.InvalidProgramException: Task not serializable
>>> at
>>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>>> at
>>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>>> at
>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>>> at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
>>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
>>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
>>> Caused by: java.io.NotSerializableException:
>>> org.apache.flink.api.scala.DataSet at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>
>>>
>>>
>>> Greetings,
>>>
>>>
>>>
>>> Juan
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
>>> juan.rodriguez.hort...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>>
>>>
>>> I'm using the Flink interpreter and the benv environment. I'm reading
>>> some csv files using benv.readCsvFile and it works ok. I have also defined
>>> a case class C for the csv records. The problem happens when I apply a
>>> map operation on the DataSet of tuples returned by benv.readCsvFile, to
>>> convert it into a DataSet[C].
>>>
>>>    - If I define the case class C in some cell I get this error:
>>>
>>> java.lang.IllegalArgumentException: requirement failed: The class C is
>>> an instance class, meaning it is not a member of a toplevel object, or of
>>> an object contained in a toplevel object, therefore it requires an outer
>>> instance to be instantiated, but we don't have a reference to the outer
>>> instance. Please consider changing the outer class to an object.
>>>
>>>
>>>
>>>    - That sounds related to this
>>>    
>>> https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
>>>    it looks like the zeppelin flink interpreter is wrapping the case class
>>>    definition as an inner class. I tried defining the case class C inside an
>>>    object Types that I define in another cell. With that I also get a
>>>    serialization exception.
>>>
>>> org.apache.flink.api.common.InvalidProgramException: Task not
>>> serializable
>>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>>> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
>>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
>>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
>>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
>>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>>>
>>>
>>>
>>> I guess that didn't work because the object Types is still defined
>>> inside some class implicitly defined by the interpreter.
>>>
>>>
>>>
>>> Any thoughs about how can I fix this? Also, I understand $line163 etc
>>> refer to the code in the cells, is there some convention I can use to
>>> understand to which line in the notebook those error messages are referring
>>> to?
>>>
>>>
>>>
>>> Thanks in advance,
>>>
>>>
>>>
>>> Juan
>>>
>>>

-- 
Best Regards

Jeff Zhang

Reply via email to