Looking at your original message
Your first error was
java.lang.IllegalArgumentException: requirement failed:

Not serialisation error.

As a golden rule of thumb always look at the first error , irrespective of
programming language.


On Mon, 20 Apr 2020, 03:13 Ravi Pullareddy, <ravi.pullare...@minlog.com.au>
wrote:

> Hi Juan
>
>
>
> I see what your problem is. You have declared class C with field x of type
> Int. When you map to class fields you have to specify the type. Try the
> below code it works.
>
>
>
> case class C(x: Int)
>
> val xs = benv.fromCollection(1 to 10).map(y => C(y.toInt))
>
> xs.count
>
>
>
> benv.execute("Count Collection")
>
>
>
> Cheers
>
> Ravi Pullareddy
>
>
>
> *From:* Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com>
> *Sent:* Monday, 20 April 2020 2:44 AM
> *To:* users@zeppelin.apache.org
> *Subject:* Re: Serialization issues with case classes with Flink
>
>
>
> Hi Jeff,
>
>
>
> I’ll try using POJO clases instead.
>
>
>
> Thanks,
>
>
>
> Juan
>
>
>
> El El dom, 19 abr 2020 a las 15:51, Jeff Zhang <zjf...@gmail.com>
> escribió:
>
> Hi Juan,
>
>
>
> This is an issue of flink, I have created ticket in flink community,
> https://issues.apache.org/jira/browse/FLINK-16969
>
> The workaround is to use POJO class instead of case class.
>
>
>
> Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> 于2020年4月19日周日 下午
> 7:58写道:
>
> Thanks also to Som and Zahid for your answers. But as I show in my
> previous answer, this issue happens without using the CSV source, and it
> doesn't show in the Flink Scala shell, so it looks like an issue with
> Zeppelin interpreter for Flink
>
>
>
> On Sun, Apr 19, 2020 at 1:56 PM Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
> Hi Ravi,
>
>
>
> I posted another message with the minimal reproduction, I repeat it here:
>
>
>
> ```scala
>
> case class C(x: Int)
>
> val xs = benv.fromCollection(1 to 10)
> val cs = xs.map{C(_)}
>
> cs.count
>
> ```
>
>
>
> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@39c713c6 cs:
> org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@205a35a
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object. at
> scala.Predef$.require(Predef.scala:224) at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
> at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
> ... 125 elided
>
>
>
> As you can see, this minimal example doesn't use CSV files, so I don't
> think the CSV connector it the problem.
>
>
>
> I don't think this is a Flink issue either, as that example works fine in
> the Flink shell:
>
>
>
> ```
>
> root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh  local
>
> scala> val xs = benv.fromCollection(1 to 10)
> xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@6012bee8
>
> scala> xs.collect
> res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>
> scala> val cs = xs.map{C(_)}
> cs: org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@36f807f5
>
> scala> cs.collect
> res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8),
> C(9), C(10))
>
> scala>
>
> ```
>
>
>
> Note I was running Zeppelin in a docker container in my laptop, also using
> Flink's local mode
>
>
>
> I think this is a problem with the Zeppelin integratin with Flink, and how
> it processes case class definitions in a cell.
>
>
>
> Thanks for your answer,
>
>
>
> Juan
>
>
>
>
>
> On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy <
> ravi.pullare...@minlog.com.au> wrote:
>
> Hi Juan
>
>
>
> I have written various applications for continuous processing of csv
> files. Please post your entire code and how you are mapping. It becomes
> easy to highlight the issue.
>
>
>
> Thanks
>
> Ravi Pullareddy
>
>
>
>
>
> *From:* Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com>
> *Sent:* Sunday, 19 April 2020 6:25 PM
> *To:* users@zeppelin.apache.org
> *Subject:* Re: Serialization issues with case classes with Flink
>
>
>
> Just for the record, the Spark version of that works fine:
>
>
>
> ```
> %spark
>
> case class C2(x: Int)
>
> val xs = sc.parallelize(1 to 10)
> val csSpark = xs.map{C2(_)}
>
> csSpark.collect
>
>
>
> res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7),
> C2(8), C2(9), C2(10))
>
> ```
>
>
>
> Thanks,
>
>
>
> Juan
>
>
>
> On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
> Minimal reproduction:
>
>    - Fist option
>
> ```scala
>
> case class C(x: Int)
>
> val xs = benv.fromCollection(1 to 10)
> val cs = xs.map{C(_)}
>
> cs.count
>
> ```
>
>
>
> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@39c713c6 cs:
> org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@205a35a
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object. at
> scala.Predef$.require(Predef.scala:224) at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
> at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
> ... 125 elided
>
>    - Second option
>
> ```scala
>
> object Types {
>     case class C(x: Int)
> }
>
> val cs2 = xs.map{Types.C(_)}
>
> cs2.count
>
> ```
>
>
>
> defined object Types org.apache.flink.api.common.InvalidProgramException:
> Task not serializable at
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> at
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> at
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
> Caused by: java.io.NotSerializableException:
> org.apache.flink.api.scala.DataSet at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
>
>
> Greetings,
>
>
>
> Juan
>
>
>
>
>
> On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
> Hi,
>
>
>
> I'm using the Flink interpreter and the benv environment. I'm reading some
> csv files using benv.readCsvFile and it works ok. I have also defined a
> case class C for the csv records. The problem happens when I apply a
> map operation on the DataSet of tuples returned by benv.readCsvFile, to
> convert it into a DataSet[C].
>
>    - If I define the case class C in some cell I get this error:
>
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object.
>
>
>
>    - That sounds related to this
>    
> https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
>    it looks like the zeppelin flink interpreter is wrapping the case class
>    definition as an inner class. I tried defining the case class C inside an
>    object Types that I define in another cell. With that I also get a
>    serialization exception.
>
> org.apache.flink.api.common.InvalidProgramException: Task not serializable
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>
>
>
> I guess that didn't work because the object Types is still defined inside
> some class implicitly defined by the interpreter.
>
>
>
> Any thoughs about how can I fix this? Also, I understand $line163 etc
> refer to the code in the cells, is there some convention I can use to
> understand to which line in the notebook those error messages are referring
> to?
>
>
>
> Thanks in advance,
>
>
>
> Juan
>
>
>
>
> --
>
> Best Regards
>
> Jeff Zhang
>
>

Reply via email to