I asked a similar question a day or so ago but this is a much more concrete
example showing the difficulty I am running into
I am trying to use DataSets. I have an object which I want to encode with
its fields as columns. The object is a well behaved Java Bean.
However one field is an object (or a collection of objects) which are not
beans.
My simple code case is like this.
What I want is a DataSet of MyBeans with columns count,name and unBean
/**
* This class is a good Java bean but one field holds an object
* which is not a bean
*/
public class MyBean implements Serializable {
private int m_count;
private String m_Name;
private MyUnBean m_UnBean;
public MyBean(int count, String name, MyUnBean unBean) {
m_count = count;
m_Name = name;
m_UnBean = unBean;
}
public int getCount() {return m_count; }
public void setCount(int count) {m_count = count;}
public String getName() {return m_Name;}
public void setName(String name) {m_Name = name;}
public MyUnBean getUnBean() {return m_UnBean;}
public void setUnBean(MyUnBean unBean) {m_UnBean = unBean;}
}
/**
* This is a Java object which is not a bean
* no getters or setters but is serializable
*/
public class MyUnBean implements Serializable {
public final int count;
public final String name;
public MyUnBean(int count, String name) {
this.count = count;
this.name = name;
}
}
**
* This code creates a list of objects containing MyBean -
* a Java Bean containing one field which is not bean
* It then attempts and fails to use a bean encoder
* to make a DataSet
*/
public class DatasetTest {
public static final Random RND = new Random();
public static final int LIST_SIZE = 100;
public static String makeName() {
return Integer.toString(RND.nextInt());
}
public static MyUnBean makeUnBean() {
return new MyUnBean(RND.nextInt(), makeName());
}
public static MyBean makeBean() {
return new MyBean(RND.nextInt(), makeName(), makeUnBean());
}
/**
* Make a list of MyBeans
* @return
*/
public static List<MyBean> makeBeanList() {
List<MyBean> holder = new ArrayList<MyBean>();
for (int i = 0; i < LIST_SIZE; i++) {
holder.add(makeBean());
}
return holder;
}
public static SQLContext getSqlContext() {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("BeanTest") ;
Option<String> option = sparkConf.getOption("spark.master");
if (!option.isDefined()) // use local over nothing
sparkConf.setMaster("local[*]");
JavaSparkContext ctx = new JavaSparkContext(sparkConf) ;
return new SQLContext(ctx);
}
public static void main(String[] args) {
SQLContext sqlContext = getSqlContext();
Encoder<MyBean> evidence = Encoders.bean(MyBean.class);
Encoder<MyUnBean> evidence2 =
Encoders.javaSerialization(MyUnBean.class);
List<MyBean> holder = makeBeanList();
Dataset<MyBean> beanSet = sqlContext.createDataset( holder,
evidence);
long count = beanSet.count();
if(count != LIST_SIZE)
throw new IllegalStateException("bad count");
}
}
This is the last seacion of the log showing the errors I get
16/03/01 09:21:31 INFO SparkUI: Started SparkUI at http://169.254.87.23:4040
16/03/01 09:21:31 INFO Executor: Starting executor ID driver on host
localhost
16/03/01 09:21:31 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 61922.
16/03/01 09:21:31 INFO NettyBlockTransferService: Server created on 61922
16/03/01 09:21:31 INFO BlockManagerMaster: Trying to register BlockManager
16/03/01 09:21:31 INFO BlockManagerMasterEndpoint: Registering block
manager localhost:61922 with 5.1 GB RAM, BlockManagerId(driver, localhost,
61922)
16/03/01 09:21:31 INFO BlockManagerMaster: Registered BlockManager
Exception in thread "main" java.lang.UnsupportedOperationException: no
encoder found for com.lordjoe.testing.MyUnBean
at
org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$extractorFor(JavaTypeInference.scala:403)
at
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$org$apache$spark$sql$catalyst$JavaTypeInference$$extractorFor$1.apply(JavaTypeInference.scala:400)
at
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$org$apache$spark$sql$catalyst$JavaTypeInference$$extractorFor$1.apply(JavaTypeInference.scala:393)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at
org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$extractorFor(JavaTypeInference.scala:393)
at
org.apache.spark.sql.catalyst.JavaTypeInference$.extractorsFor(JavaTypeInference.scala:314)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:75)
at org.apache.spark.sql.Encoders$.bean(Encoder.scala:176)
at org.apache.spark.sql.Encoders.bean(Encoder.scala)
at com.lordjoe.testing.DatasetTest.main(DatasetTest.java:57)