Hi Ted,
Thanks for the reply.
Here is the code
Btw – df.count is running fine on dataframe generated from this default source.
I think it is something in the combination of join and hbase data source that
is creating issue. But not sure entirely.
I have also dumped the physical plans of both approaches s3a/s3a join and
s3a/hbase join, In case you want that let me know.
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat}
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.execution.datasources.{OutputWriterFactory,
FileFormat}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.slf4j.LoggerFactory
class DefaultSource extends SchemaRelationProvider with FileFormat {
override def createRelation(sqlContext: SQLContext, parameters: Map[String,
String], schema: StructType) = {
new HBaseRelation(schema, parameters)(sqlContext)
}
def inferSchema(sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = ???
def prepareWrite(sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = ???
}
object HBaseConfigurationUtil {
lazy val logger = LoggerFactory.getLogger("HBaseConfigurationUtil")
val hbaseConfiguration = (tableName: String, hbaseQuorum: String) => {
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set("hbase.mapred.outputtable", tableName)
conf.set("hbase.zookeeper.quorum", hbaseQuorum)
conf
}
}
class HBaseRelation(val schema: StructType, parameters: Map[String, String])
(@transient val sqlContext: SQLContext) extends BaseRelation
with TableScan {
import sqlContext.sparkContext
override def buildScan(): RDD[Row] = {
val bcDataSchema = sparkContext.broadcast(schema)
val tableName = parameters.get("path") match {
case Some(t) => t
case _ => throw new RuntimeException("Table name (path) not provided in
parameters")
}
val hbaseQuorum = parameters.get("hbaseQuorum") match {
case Some(s: String) => s
case _ => throw new RuntimeException("hbaseQuorum not provided in
options")
}
val rdd = sparkContext.newAPIHadoopRDD(
HBaseConfigurationUtil.hbaseConfiguration(tableName, hbaseQuorum),
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
val rowRdd = rdd
.map(tuple => tuple._2)
.map { record =>
val cells: java.util.List[Cell] = record.listCells()
val splitRec =
cells.toArray.foldLeft(Array(CellUtil.cloneRow(cells.get(0)))) {(a, b) =>
a :+ CellUtil.cloneValue(b.asInstanceOf[Cell])
}
val keyFieldName = bcDataSchema.value.fields.filter(e =>
e.metadata.contains("isPrimary") && e.metadata.getBoolean("isPrimary"))(0).name
val schemaArr = cells.toArray.foldLeft(Array(keyFieldName)) {(a, b) => {
val fieldCell = b.asInstanceOf[Cell]
a :+ new
String(fieldCell.getQualifierArray).substring(fieldCell.getQualifierOffset,
fieldCell.getQualifierLength + fieldCell.getQualifierOffset)
}
}
val res = Map(schemaArr.zip(splitRec).toArray: _*)
val recordFields = res.map(value => {
val colDataType =
try {
bcDataSchema.value.fields.filter(_.name == value._1)(0).dataType
} catch {
case e: ArrayIndexOutOfBoundsException => throw new
RuntimeException("Schema doesn't contain the fieldname")
}
CatalystTypeConverters.convertToScala(
Cast(Literal(value._2), colDataType).eval(),
colDataType)
}).toArray
Row(recordFields: _*)
}
rowRdd
}
}
Thanks
Ravi
From: Ted Yu [mailto:[email protected]]
Sent: Thursday, June 9, 2016 7:56 PM
To: Ravi Aggarwal <[email protected]>
Cc: user <[email protected]>
Subject: Re: OutOfMemory when doing joins in spark 2.0 while same code runs
fine in spark 1.5.2
bq. Read data from hbase using custom DefaultSource (implemented using
TableScan)
Did you use the DefaultSource from hbase-spark module in hbase master branch ?
If you wrote your own, mind sharing related code ?
Thanks
On Thu, Jun 9, 2016 at 2:53 AM, raaggarw
<[email protected]<mailto:[email protected]>> wrote:
Hi,
I was trying to port my code from spark 1.5.2 to spark 2.0 however i faced
some outofMemory issues. On drilling down i could see that OOM is because of
join, because removing join fixes the issue. I then created a small
spark-app to reproduce this:
(48 cores, 300gb ram - divided among 4 workers)
line1 :df1 = Read a set a of parquet files into dataframe
line2: df1.count
line3: df2 = Read data from hbase using custom DefaultSource (implemented
using TableScan)
line4: df2.count
line5: df3 = df1.join(df2, df1("field1") === df2("field2"), "inner")
line6: df3.count -> *this is where it fails in Spark 2.0 and runs fine in
spark 1.5.2*
Problem:
First lot of WARN messages
2016-06-09 08:14:18,884 WARN [broadcast-exchange-0]
memory.TaskMemoryManager (TaskMemoryManager.java:allocatePage(264)) - Failed
to allocate a page (1048576 bytes), try again.
And then OOM
I then tried to dump data fetched from hbase into s3 and then created df2
from s3 rather than hbase, then it worked fine in spark 2.0 as well.
Could someone please guide me through next steps?
Thanks
Ravi
Computer Scientist @ Adobe
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-when-doing-joins-in-spark-2-0-while-same-code-runs-fine-in-spark-1-5-2-tp27124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail:
[email protected]<mailto:[email protected]>
For additional commands, e-mail:
[email protected]<mailto:[email protected]>