You may wish to have a look at Apache avro project. It shows you how to Generare source code for schema classes for creating objects to be used in reading csv files etc. for the purpose of serialisation and deserialization.
https://avro.apache.org/docs/current/gettingstartedjava.html The avro project is mentioned in the flink documents. On Sun, 19 Apr 2020, 09:05 Juan Rodríguez Hortalá, < juan.rodriguez.hort...@gmail.com> wrote: > Hi, > > I'm using the Flink interpreter and the benv environment. I'm reading some > csv files using benv.readCsvFile and it works ok. I have also defined a > case class C for the csv records. The problem happens when I apply a > map operation on the DataSet of tuples returned by benv.readCsvFile, to > convert it into a DataSet[C]. > > - If I define the case class C in some cell I get this error: > > java.lang.IllegalArgumentException: requirement failed: The class C is an > instance class, meaning it is not a member of a toplevel object, or of an > object contained in a toplevel object, therefore it requires an outer > instance to be instantiated, but we don't have a reference to the outer > instance. Please consider changing the outer class to an object. > > > - That sounds related to this > > https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink, > it looks like the zeppelin flink interpreter is wrapping the case class > definition as an inner class. I tried defining the case class C inside an > object Types that I define in another cell. With that I also get a > serialization exception. > > org.apache.flink.api.common.InvalidProgramException: Task not serializable > org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408) > org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400) > org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168) > org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) > org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) > org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) > $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135) > $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133) > > I guess that didn't work because the object Types is still defined inside > some class implicitly defined by the interpreter. > > Any thoughs about how can I fix this? Also, I understand $line163 etc > refer to the code in the cells, is there some convention I can use to > understand to which line in the notebook those error messages are referring > to? > > Thanks in advance, > > Juan >