Just point out a bug in your codes. You should not use `mapPartitions` like that. For details, I recommend Section "setup() and cleanup()" in Sean Owen's post: http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/
Best Regards, Shixiong Zhu 2014-12-14 16:35 GMT+08:00 Yanbo <yanboha...@gmail.com>: > > In #1, class HTable can not be serializable. > You also need to check you self defined function getUserActions and make > sure it is a member function of one class who implement serializable > interface. > > 发自我的 iPad > > > 在 2014年12月12日,下午4:35,yangliuyu <yangli...@163.com> 写道: > > > > The scenario is using HTable instance to scan multiple rowkey range in > Spark > > tasks look likes below: > > Option 1: > > val users = input > > .map { case (deviceId, uid) => > > uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{ > > val conf = HBaseConfiguration.create() > > val table = new HTable(conf, "actions") > > val result = iterator.map{ userId=> > > (userId, getUserActions(table, userId, timeStart, timeStop)) > > } > > table.close() > > result > > }) > > > > But got the exception: > > org.apache.spark.SparkException: Task not serializable > > at > > > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) > > at > > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) > > at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) > > at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) > > at > > > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)... > > ... > > Caused by: java.io.NotSerializableException: > > org.apache.hadoop.conf.Configuration > > at > > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > > at > > > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > > at > > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > > > > The reason not using sc.newAPIHadoopRDD is it only support one scan each > > time. > > val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], > > classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], > > classOf[org.apache.hadoop.hbase.client.Result]) > > > > And if using MultiTableInputFormat, driver is not possible put all > rowkeys > > into HBaseConfiguration > > Option 2: > > sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], > > classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], > > classOf[org.apache.hadoop.hbase.client.Result]) > > > > It may divide all rowkey ranges into several parts then use option 2, > but I > > prefer option 1. So is there any solution for option 1? > > > > > > > > -- > > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > For additional commands, e-mail: user-h...@spark.apache.org > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >