Thanks All.
Finally the works code is below:
object PlayRecord {
def getUserActions(accounts: RDD[String], idType: Int, timeStart: Long,
timeStop: Long, cacheSize: Int,
filterSongDays: Int, filterPlaylistDays: Int):
RDD[(String, (Int, Set[Long], Set[Long]))] = {
accounts.mapPartitions(iterator => {
if (iterator.nonEmpty) {
val conf = HBaseConfiguration.create()
val table = new HTable(conf, "user_action")
val filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE)
filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes("stat"),
Bytes.toBytes("song_id"), CompareOp.EQUAL, new RegexStringComparator("^\\d+$")))
filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes("stat"),
Bytes.toBytes("module"), CompareOp.EQUAL, new
BinaryComparator(Bytes.toBytes("displayed"))))
iterator.map(id => {
val scan = new Scan()
scan.setCaching(cacheSize)
scan.addFamily(Bytes.toBytes("stat"))
scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("module"))
scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("song_id"))
scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("playlist_ids"))
scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("time"))
val rowKeyRange = getUserRowKeyRange(id, idType, timeStart, timeStop)
scan.setStartRow(rowKeyRange._1)
scan.setStopRow(rowKeyRange._2)
scan.setFilter(filterList)
val userData = table.getScanner(scan).iterator().asScala.map(r => {
val module = Bytes.toString(r.getValue(Bytes.toBytes("stat"),
Bytes.toBytes("module")))
val time = Bytes.toLong(r.getValue(Bytes.toBytes("stat"),
Bytes.toBytes("time")))
module match {
case "listen" =>
val songId = Bytes.toString(r.getValue(Bytes.toBytes("stat"),
Bytes.toBytes("song_id")))
(module, (time / DAY_MILLIS, songId))
case "displayed" =>
val playlistIds =
Bytes.toString(r.getValue(Bytes.toBytes("stat"), Bytes.toBytes("playlist_ids")))
(module, (time / DAY_MILLIS, playlistIds))
case _ =>
(module, (0L, ""))
}
}).toList.groupBy(_._1)
val playRecordData = userData.get("listen")
val playRecords = if (playRecordData.nonEmpty)
playRecordData.get.map(_._2).groupBy(_._1).toList.sortBy(-_._1).take(filterSongDays).flatMap(_._2).map(_._2.toLong).toSet
else Set[Long]()
val playlistDisPlayData = userData.get("displayed")
val playlistRecords = if (playlistDisPlayData.nonEmpty)
playlistDisPlayData.get.map(_._2).groupBy(_._1).toList.sortBy(_._1).take(filterPlaylistDays).flatMap(_._2).flatMap(_._2.split(',')).map(_.toLong).toSet
else Set[Long]()
val result = (id, (idType, playRecords, playlistRecords))
if (!iterator.hasNext) {
table.close()
}
result
})
} else {
iterator.map(id => {
(id, (idType, Set[Long](), Set[Long]()))
})
}
})
}
}
As Shixiong mentioned Sean Owen's post:
http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/,
I close table when iterator.hasNext is false, otherwise the application will
be hung. And there is also another interesting project
http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/, will
try it later.
At 2014-12-15 17:52:47, "Aniket Bhatnagar" <[email protected]> wrote:
"The reason not using sc.newAPIHadoopRDD is it only support one scan each time."
I am not sure is that's true. You can use multiple scans as following:
val scanStrings = scans.map(scan => convertScanToString(scan))
conf.setStrings(MultiTableInputFormat.SCANS, scanStrings : _*)
where convertScanToString is implemented as:
/**
* Serializes a HBase scan into string.
* @param scan Scan to serialize.
* @return Base64 encoded serialized scan.
*/
private def convertScanToString(scan: Scan) = {
val proto: ClientProtos.Scan = ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
Thanks,
Aniket
On Mon Dec 15 2014 at 13:31:03 Shixiong Zhu <[email protected]> wrote:
Just point out a bug in your codes. You should not use `mapPartitions` like
that. For details, I recommend Section "setup() and cleanup()" in Sean Owen's
post:
http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/
Best Regards,
Shixiong Zhu
2014-12-14 16:35 GMT+08:00 Yanbo <[email protected]>:In #1, class HTable can
not be serializable.
You also need to check you self defined function getUserActions and make sure
it is a member function of one class who implement serializable interface.
发自我的 iPad
> 在 2014年12月12日,下午4:35,yangliuyu <[email protected]> 写道:
>
> The scenario is using HTable instance to scan multiple rowkey range in Spark
> tasks look likes below:
> Option 1:
> val users = input
> .map { case (deviceId, uid) =>
> uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
> val conf = HBaseConfiguration.create()
> val table = new HTable(conf, "actions")
> val result = iterator.map{ userId=>
> (userId, getUserActions(table, userId, timeStart, timeStop))
> }
> table.close()
> result
> })
>
> But got the exception:
> org.apache.spark.SparkException: Task not serializable
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
> at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)...
> ...
> Caused by: java.io.NotSerializableException:
> org.apache.hadoop.conf.Configuration
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
> The reason not using sc.newAPIHadoopRDD is it only support one scan each
> time.
> val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
> classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
> classOf[org.apache.hadoop.hbase.client.Result])
>
> And if using MultiTableInputFormat, driver is not possible put all rowkeys
> into HBaseConfiguration
> Option 2:
> sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
> classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
> classOf[org.apache.hadoop.hbase.client.Result])
>
> It may divide all rowkey ranges into several parts then use option 2, but I
> prefer option 1. So is there any solution for option 1?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]