Hi Juan


I see what your problem is. You have declared class C with field x of type
Int. When you map to class fields you have to specify the type. Try the
below code it works.



case class C(x: Int)

val xs = benv.fromCollection(1 to 10).map(y => C(y.toInt))

xs.count



benv.execute("Count Collection")



Cheers

Ravi Pullareddy



*From:* Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com>
*Sent:* Monday, 20 April 2020 2:44 AM
*To:* users@zeppelin.apache.org
*Subject:* Re: Serialization issues with case classes with Flink



Hi Jeff,



I’ll try using POJO clases instead.



Thanks,



Juan



El El dom, 19 abr 2020 a las 15:51, Jeff Zhang <zjf...@gmail.com> escribió:

Hi Juan,



This is an issue of flink, I have created ticket in flink community,
https://issues.apache.org/jira/browse/FLINK-16969

The workaround is to use POJO class instead of case class.



Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> 于2020年4月19日周日 下午
7:58写道:

Thanks also to Som and Zahid for your answers. But as I show in my previous
answer, this issue happens without using the CSV source, and it doesn't
show in the Flink Scala shell, so it looks like an issue with Zeppelin
interpreter for Flink



On Sun, Apr 19, 2020 at 1:56 PM Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

Hi Ravi,



I posted another message with the minimal reproduction, I repeat it here:



```scala

case class C(x: Int)

val xs = benv.fromCollection(1 to 10)
val cs = xs.map{C(_)}

cs.count

```



defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
org.apache.flink.api.scala.DataSet@39c713c6 cs:
org.apache.flink.api.scala.DataSet[C] =
org.apache.flink.api.scala.DataSet@205a35a
java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object. at
scala.Predef$.require(Predef.scala:224) at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
... 125 elided



As you can see, this minimal example doesn't use CSV files, so I don't
think the CSV connector it the problem.



I don't think this is a Flink issue either, as that example works fine in
the Flink shell:



```

root@df7fa87580ed:/opt/flink/latest# bin/start-scala-shell.sh  local

scala> val xs = benv.fromCollection(1 to 10)
xs: org.apache.flink.api.scala.DataSet[Int] =
org.apache.flink.api.scala.DataSet@6012bee8

scala> xs.collect
res0: Seq[Int] = Buffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val cs = xs.map{C(_)}
cs: org.apache.flink.api.scala.DataSet[C] =
org.apache.flink.api.scala.DataSet@36f807f5

scala> cs.collect
res1: Seq[C] = Buffer(C(1), C(2), C(3), C(4), C(5), C(6), C(7), C(8), C(9),
C(10))

scala>

```



Note I was running Zeppelin in a docker container in my laptop, also using
Flink's local mode



I think this is a problem with the Zeppelin integratin with Flink, and how
it processes case class definitions in a cell.



Thanks for your answer,



Juan





On Sun, Apr 19, 2020 at 12:22 PM Ravi Pullareddy <
ravi.pullare...@minlog.com.au> wrote:

Hi Juan



I have written various applications for continuous processing of csv files.
Please post your entire code and how you are mapping. It becomes easy to
highlight the issue.



Thanks

Ravi Pullareddy





*From:* Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com>
*Sent:* Sunday, 19 April 2020 6:25 PM
*To:* users@zeppelin.apache.org
*Subject:* Re: Serialization issues with case classes with Flink



Just for the record, the Spark version of that works fine:



```
%spark

case class C2(x: Int)

val xs = sc.parallelize(1 to 10)
val csSpark = xs.map{C2(_)}

csSpark.collect



res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7),
C2(8), C2(9), C2(10))

```



Thanks,



Juan



On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

Minimal reproduction:

   - Fist option

```scala

case class C(x: Int)

val xs = benv.fromCollection(1 to 10)
val cs = xs.map{C(_)}

cs.count

```



defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
org.apache.flink.api.scala.DataSet@39c713c6 cs:
org.apache.flink.api.scala.DataSet[C] =
org.apache.flink.api.scala.DataSet@205a35a
java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object. at
scala.Predef$.require(Predef.scala:224) at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
at
org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
... 125 elided

   - Second option

```scala

object Types {
    case class C(x: Int)
}

val cs2 = xs.map{Types.C(_)}

cs2.count

```



defined object Types org.apache.flink.api.common.InvalidProgramException:
Task not serializable at
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
at
org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
at
org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
Caused by: java.io.NotSerializableException:
org.apache.flink.api.scala.DataSet at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)



Greetings,



Juan





On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

Hi,



I'm using the Flink interpreter and the benv environment. I'm reading some
csv files using benv.readCsvFile and it works ok. I have also defined a
case class C for the csv records. The problem happens when I apply a
map operation on the DataSet of tuples returned by benv.readCsvFile, to
convert it into a DataSet[C].

   - If I define the case class C in some cell I get this error:

java.lang.IllegalArgumentException: requirement failed: The class C is an
instance class, meaning it is not a member of a toplevel object, or of an
object contained in a toplevel object, therefore it requires an outer
instance to be instantiated, but we don't have a reference to the outer
instance. Please consider changing the outer class to an object.



   - That sounds related to this
   
https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
   it looks like the zeppelin flink interpreter is wrapping the case class
   definition as an inner class. I tried defining the case class C inside an
   object Types that I define in another cell. With that I also get a
   serialization exception.

org.apache.flink.api.common.InvalidProgramException: Task not serializable
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
$line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
$line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)



I guess that didn't work because the object Types is still defined inside
some class implicitly defined by the interpreter.



Any thoughs about how can I fix this? Also, I understand $line163 etc refer
to the code in the cells, is there some convention I can use to understand
to which line in the notebook those error messages are referring to?



Thanks in advance,



Juan




-- 

Best Regards

Jeff Zhang

Reply via email to