Just point out a bug in your codes. You should not use `mapPartitions` like
that. For details, I recommend Section "setup() and cleanup()" in Sean
Owen's post:
http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

Best Regards,
Shixiong Zhu

2014-12-14 16:35 GMT+08:00 Yanbo <yanboha...@gmail.com>:
>
> In #1, class HTable can not be serializable.
> You also need to check you self defined function getUserActions and make
> sure it is a member function of one class who implement serializable
> interface.
>
> 发自我的 iPad
>
> > 在 2014年12月12日,下午4:35,yangliuyu <yangli...@163.com> 写道:
> >
> > The scenario is using HTable instance to scan multiple rowkey range in
> Spark
> > tasks look likes below:
> > Option 1:
> > val users = input
> >      .map { case (deviceId, uid) =>
> > uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
> >      val conf = HBaseConfiguration.create()
> >      val table = new HTable(conf, "actions")
> >      val result = iterator.map{ userId=>
> >        (userId, getUserActions(table, userId, timeStart, timeStop))
> >      }
> >      table.close()
> >      result
> >    })
> >
> > But got the exception:
> > org.apache.spark.SparkException: Task not serializable
> >        at
> >
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
> >        at
> > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
> >        at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
> >        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
> >        at
> >
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)...
> > ...
> > Caused by: java.io.NotSerializableException:
> > org.apache.hadoop.conf.Configuration
> >        at
> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> >        at
> >
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> >        at
> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> >
> > The reason not using sc.newAPIHadoopRDD is it only support one scan each
> > time.
> > val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
> >      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
> >      classOf[org.apache.hadoop.hbase.client.Result])
> >
> > And if using MultiTableInputFormat, driver is not possible put all
> rowkeys
> > into HBaseConfiguration
> > Option 2:
> > sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
> >      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
> >      classOf[org.apache.hadoop.hbase.client.Result])
> >
> > It may divide all rowkey ranges into several parts then use option 2,
> but I
> > prefer option 1. So is there any solution for option 1?
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to