Ravi did your issue ever get solved for this? I think i've been hitting the same thing, it looks like the spark.sql.autoBroadcastJoinThreshold stuff isn't kicking in as expected, if I set that to -1 then the computation proceeds successfully.
On Tue, Jun 14, 2016 at 12:28 AM, Ravi Aggarwal <raagg...@adobe.com> wrote: > Hi, > > > > Is there any breakthrough here? > > > > I had one more observation while debugging the issue > > Here are the 4 types of data I had: > > > > Da -> stored in parquet > > Di -> stored in parquet > > Dl1 -> parquet version of lookup > > Dl2 -> hbase version of lookup > > > > Joins performed and type of join done by spark: > > Da and Di Sort-merge failed (OOM) > > Da and Dl1 B-H passed > > Da and Dl2 Sort-Merge passed > > Di and Dl1 B-H passed > > Di and Dl2 Sort-Merge failed (OOM) > > > > From entries I can deduce that problem is with sort-merge join involving > Di. > > So the hbase thing is out of equation, that is not the culprit. > > In physical plan I could see there are only two operations that are done > additionally in sort-merge as compared to Broadcast-hash. > > è Exchange Hashpartitioning > > è Sort > > And finally sort-merge join. > > > > Can we deduce anything from this? > > > > Thanks > > Ravi > > *From:* Ravi Aggarwal > *Sent:* Friday, June 10, 2016 12:31 PM > *To:* 'Ted Yu' <yuzhih...@gmail.com> > *Cc:* user <user@spark.apache.org> > *Subject:* RE: OutOfMemory when doing joins in spark 2.0 while same code > runs fine in spark 1.5.2 > > > > Hi Ted, > > Thanks for the reply. > > > > Here is the code > > Btw – df.count is running fine on dataframe generated from this default > source. I think it is something in the combination of join and hbase data > source that is creating issue. But not sure entirely. > > I have also dumped the physical plans of both approaches s3a/s3a join and > s3a/hbase join, In case you want that let me know. > > > > import org.apache.hadoop.fs.FileStatus > > import org.apache.hadoop.hbase.client._ > > import org.apache.hadoop.hbase.io.ImmutableBytesWritable > > import org.apache.hadoop.hbase.mapreduce.{TableInputFormat} > > import org.apache.hadoop.hbase._ > > import org.apache.hadoop.mapreduce.Job > > import org.apache.spark.rdd.RDD > > import org.apache.spark.sql.Row > > import org.apache.spark.sql.catalyst.CatalystTypeConverters > > import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} > > import org.apache.spark.sql.execution.datasources.{OutputWriterFactory, > FileFormat} > > import org.apache.spark.sql.sources._ > > import org.apache.spark.sql.types._ > > import org.apache.spark.sql._ > > import org.slf4j.LoggerFactory > > > > class DefaultSource extends SchemaRelationProvider with FileFormat { > > > > override def createRelation(sqlContext: SQLContext, parameters: > Map[String, String], schema: StructType) = { > > new HBaseRelation(schema, parameters)(sqlContext) > > } > > > > def inferSchema(sparkSession: SparkSession, > > options: Map[String, String], > > files: Seq[FileStatus]): Option[StructType] = ??? > > > > def prepareWrite(sparkSession: SparkSession, > > job: Job, > > options: Map[String, String], > > dataSchema: StructType): OutputWriterFactory = ??? > > } > > > > object HBaseConfigurationUtil { > > lazy val logger = LoggerFactory.getLogger("HBaseConfigurationUtil") > > val hbaseConfiguration = (tableName: String, hbaseQuorum: String) => { > > val conf = HBaseConfiguration.create() > > conf.set(TableInputFormat.INPUT_TABLE, tableName) > > conf.set("hbase.mapred.outputtable", tableName) > > conf.set("hbase.zookeeper.quorum", hbaseQuorum) > > conf > > } > > } > > > > class HBaseRelation(val schema: StructType, parameters: Map[String, > String]) > > (@transient val sqlContext: SQLContext) extends > BaseRelation with TableScan { > > > > import sqlContext.sparkContext > > > > override def buildScan(): RDD[Row] = { > > > > val bcDataSchema = sparkContext.broadcast(schema) > > > > val tableName = parameters.get("path") match { > > case Some(t) => t > > case _ => throw new RuntimeException("Table name (path) not provided > in parameters") > > } > > > > val hbaseQuorum = parameters.get("hbaseQuorum") match { > > case Some(s: String) => s > > case _ => throw new RuntimeException("hbaseQuorum not provided in > options") > > } > > > > val rdd = sparkContext.newAPIHadoopRDD( > > HBaseConfigurationUtil.hbaseConfiguration(tableName, hbaseQuorum), > > classOf[TableInputFormat], > > classOf[ImmutableBytesWritable], > > classOf[Result] > > ) > > > > val rowRdd = rdd > > .map(tuple => tuple._2) > > .map { record => > > > > val cells: java.util.List[Cell] = record.listCells() > > > > val splitRec = > cells.toArray.foldLeft(Array(CellUtil.cloneRow(cells.get(0)))) {(a, b) => > > a :+ CellUtil.cloneValue(b.asInstanceOf[Cell]) > > } > > > > val keyFieldName = bcDataSchema.value.fields.filter(e => > e.metadata.contains("isPrimary") && > e.metadata.getBoolean("isPrimary"))(0).name > > > > val schemaArr = cells.toArray.foldLeft(Array(keyFieldName)) {(a, b) > => { > > val fieldCell = b.asInstanceOf[Cell] > > a :+ new > String(fieldCell.getQualifierArray).substring(fieldCell.getQualifierOffset, > fieldCell.getQualifierLength + fieldCell.getQualifierOffset) > > } > > } > > > > val res = Map(schemaArr.zip(splitRec).toArray: _*) > > > > val recordFields = res.map(value => { > > val colDataType = > > try { > > bcDataSchema.value.fields.filter(_.name == > value._1)(0).dataType > > } catch { > > case e: ArrayIndexOutOfBoundsException => throw new > RuntimeException("Schema doesn't contain the fieldname") > > } > > CatalystTypeConverters.convertToScala( > > Cast(Literal(value._2), colDataType).eval(), > > colDataType) > > }).toArray > > Row(recordFields: _*) > > } > > > > rowRdd > > } > > } > > > > Thanks > > Ravi > > > > *From:* Ted Yu [mailto:yuzhih...@gmail.com <yuzhih...@gmail.com>] > *Sent:* Thursday, June 9, 2016 7:56 PM > *To:* Ravi Aggarwal <raagg...@adobe.com> > *Cc:* user <user@spark.apache.org> > *Subject:* Re: OutOfMemory when doing joins in spark 2.0 while same code > runs fine in spark 1.5.2 > > > > bq. Read data from hbase using custom DefaultSource (implemented using > TableScan) > > > > Did you use the DefaultSource from hbase-spark module in hbase master > branch ? > > If you wrote your own, mind sharing related code ? > > > > Thanks > > > > On Thu, Jun 9, 2016 at 2:53 AM, raaggarw <raagg...@adobe.com> wrote: > > Hi, > > I was trying to port my code from spark 1.5.2 to spark 2.0 however i faced > some outofMemory issues. On drilling down i could see that OOM is because > of > join, because removing join fixes the issue. I then created a small > spark-app to reproduce this: > > (48 cores, 300gb ram - divided among 4 workers) > > line1 :df1 = Read a set a of parquet files into dataframe > line2: df1.count > line3: df2 = Read data from hbase using custom DefaultSource (implemented > using TableScan) > line4: df2.count > line5: df3 = df1.join(df2, df1("field1") === df2("field2"), "inner") > line6: df3.count -> *this is where it fails in Spark 2.0 and runs fine in > spark 1.5.2* > > Problem: > First lot of WARN messages > 2016-06-09 08:14:18,884 WARN [broadcast-exchange-0] > memory.TaskMemoryManager (TaskMemoryManager.java:allocatePage(264)) - > Failed > to allocate a page (1048576 bytes), try again. > And then OOM > > I then tried to dump data fetched from hbase into s3 and then created df2 > from s3 rather than hbase, then it worked fine in spark 2.0 as well. > > Could someone please guide me through next steps? > > Thanks > Ravi > Computer Scientist @ Adobe > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-when-doing-joins-in-spark-2-0-while-same-code-runs-fine-in-spark-1-5-2-tp27124.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > >