Thanks also to Som and Zahid for your answers. But as I show in my previous
answer, this issue happens without using the CSV source, and it doesn't
show in the Flink Scala shell, so it looks like an issue with Zeppelin
interpreter for Flink

On Sun, Apr 19, 2020 at 1:56 PM Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Hi Ravi,
>
> I posted another message with the minimal reproduction, I repeat it here:
>
> ```scala
> case class C(x: Int)
>
> val xs = benv.fromCollection(1 to 10)
> val cs = xs.map{C(_)}
>
> cs.count
> ```
>
> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@39c713c6 cs:
> org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@205a35a
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object. at
> scala.Predef$.require(Predef.scala:224) at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
> at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
> ... 125 elided
>
> As you can see, this minimal example doesn't use CSV files, so I don't
> think the CSV connector it the problem.
>
> I don't think this is a Flink issue either, as that example works fine in
> the Flink shell:
>
> ```
> root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh  local
> scala> val xs = benv.fromCollection(1 to 10)
> xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@6012bee8
>
> scala> xs.collect
> res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>
> scala> val cs = xs.map{C(_)}
> cs: org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@36f807f5
>
> scala> cs.collect
> res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8),
> C(9), C(10))
>
> scala>
> ```
>
> Note I was running Zeppelin in a docker container in my laptop, also using
> Flink's local mode
>
> I think this is a problem with the Zeppelin integratin with Flink, and how
> it processes case class definitions in a cell.
>
> Thanks for your answer,
>
> Juan
>
>
> On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy <
> ravi.pullare...@minlog.com.au> wrote:
>
>> Hi Juan
>>
>>
>>
>> I have written various applications for continuous processing of csv
>> files. Please post your entire code and how you are mapping. It becomes
>> easy to highlight the issue.
>>
>>
>>
>> Thanks
>>
>> Ravi Pullareddy
>>
>>
>>
>>
>>
>> *From:* Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com>
>> *Sent:* Sunday, 19 April 2020 6:25 PM
>> *To:* users@zeppelin.apache.org
>> *Subject:* Re: Serialization issues with case classes with Flink
>>
>>
>>
>> Just for the record, the Spark version of that works fine:
>>
>>
>>
>> ```
>> %spark
>>
>> case class C2(x: Int)
>>
>> val xs = sc.parallelize(1 to 10)
>> val csSpark = xs.map{C2(_)}
>>
>> csSpark.collect
>>
>>
>>
>> res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7),
>> C2(8), C2(9), C2(10))
>>
>> ```
>>
>>
>>
>> Thanks,
>>
>>
>>
>> Juan
>>
>>
>>
>> On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
>> juan.rodriguez.hort...@gmail.com> wrote:
>>
>> Minimal reproduction:
>>
>>    - Fist option
>>
>> ```scala
>>
>> case class C(x: Int)
>>
>> val xs = benv.fromCollection(1 to 10)
>> val cs = xs.map{C(_)}
>>
>> cs.count
>>
>> ```
>>
>>
>>
>> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
>> org.apache.flink.api.scala.DataSet@39c713c6 cs:
>> org.apache.flink.api.scala.DataSet[C] =
>> org.apache.flink.api.scala.DataSet@205a35a
>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>> instance class, meaning it is not a member of a toplevel object, or of an
>> object contained in a toplevel object, therefore it requires an outer
>> instance to be instantiated, but we don't have a reference to the outer
>> instance. Please consider changing the outer class to an object. at
>> scala.Predef$.require(Predef.scala:224) at
>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
>> at
>> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
>> ... 125 elided
>>
>>    - Second option
>>
>> ```scala
>>
>> object Types {
>>     case class C(x: Int)
>> }
>>
>> val cs2 = xs.map{Types.C(_)}
>>
>> cs2.count
>>
>> ```
>>
>>
>>
>> defined object Types org.apache.flink.api.common.InvalidProgramException:
>> Task not serializable at
>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>> at
>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>> at
>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>> at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
>> Caused by: java.io.NotSerializableException:
>> org.apache.flink.api.scala.DataSet at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>
>>
>>
>> Greetings,
>>
>>
>>
>> Juan
>>
>>
>>
>>
>>
>> On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
>> juan.rodriguez.hort...@gmail.com> wrote:
>>
>> Hi,
>>
>>
>>
>> I'm using the Flink interpreter and the benv environment. I'm reading
>> some csv files using benv.readCsvFile and it works ok. I have also defined
>> a case class C for the csv records. The problem happens when I apply a
>> map operation on the DataSet of tuples returned by benv.readCsvFile, to
>> convert it into a DataSet[C].
>>
>>    - If I define the case class C in some cell I get this error:
>>
>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>> instance class, meaning it is not a member of a toplevel object, or of an
>> object contained in a toplevel object, therefore it requires an outer
>> instance to be instantiated, but we don't have a reference to the outer
>> instance. Please consider changing the outer class to an object.
>>
>>
>>
>>    - That sounds related to this
>>    
>> https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
>>    it looks like the zeppelin flink interpreter is wrapping the case class
>>    definition as an inner class. I tried defining the case class C inside an
>>    object Types that I define in another cell. With that I also get a
>>    serialization exception.
>>
>> org.apache.flink.api.common.InvalidProgramException: Task not
>> serializable
>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>>
>>
>>
>> I guess that didn't work because the object Types is still defined inside
>> some class implicitly defined by the interpreter.
>>
>>
>>
>> Any thoughs about how can I fix this? Also, I understand $line163 etc
>> refer to the code in the cells, is there some convention I can use to
>> understand to which line in the notebook those error messages are referring
>> to?
>>
>>
>>
>> Thanks in advance,
>>
>>
>>
>> Juan
>>
>>

Reply via email to