Ravi did your issue ever get solved for this?

I think i've been hitting the same thing, it looks like
the spark.sql.autoBroadcastJoinThreshold stuff isn't kicking in as
expected, if I set that to -1 then the computation proceeds successfully.

On Tue, Jun 14, 2016 at 12:28 AM, Ravi Aggarwal <raagg...@adobe.com> wrote:

> Hi,
>
>
>
> Is there any breakthrough here?
>
>
>
> I had one more observation while debugging the issue
>
> Here are the 4 types of data I had:
>
>
>
> Da -> stored in parquet
>
> Di -> stored in parquet
>
> Dl1 -> parquet version of lookup
>
> Dl2 -> hbase version of lookup
>
>
>
> Joins performed and type of join done by spark:
>
> Da and Di             Sort-merge         failed (OOM)
>
> Da and Dl1           B-H                         passed
>
> Da and Dl2           Sort-Merge        passed
>
> Di and Dl1            B-H                         passed
>
> Di and Dl2            Sort-Merge        failed (OOM)
>
>
>
> From entries I can deduce that problem is with sort-merge join involving
> Di.
>
> So the hbase thing is out of equation, that is not the culprit.
>
> In physical plan I could see there are only two operations that are done
> additionally in sort-merge as compared to Broadcast-hash.
>
> è Exchange Hashpartitioning
>
> è Sort
>
> And finally sort-merge join.
>
>
>
> Can we deduce anything from this?
>
>
>
> Thanks
>
> Ravi
>
> *From:* Ravi Aggarwal
> *Sent:* Friday, June 10, 2016 12:31 PM
> *To:* 'Ted Yu' <yuzhih...@gmail.com>
> *Cc:* user <user@spark.apache.org>
> *Subject:* RE: OutOfMemory when doing joins in spark 2.0 while same code
> runs fine in spark 1.5.2
>
>
>
> Hi Ted,
>
> Thanks for the reply.
>
>
>
> Here is the code
>
> Btw – df.count is running fine on dataframe generated from this default
> source. I think it is something in the combination of join and hbase data
> source that is creating issue. But not sure entirely.
>
> I have also dumped the physical plans of both approaches s3a/s3a join and
> s3a/hbase join, In case you want that let me know.
>
>
>
> import org.apache.hadoop.fs.FileStatus
>
> import org.apache.hadoop.hbase.client._
>
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
>
> import org.apache.hadoop.hbase.mapreduce.{TableInputFormat}
>
> import org.apache.hadoop.hbase._
>
> import org.apache.hadoop.mapreduce.Job
>
> import org.apache.spark.rdd.RDD
>
> import org.apache.spark.sql.Row
>
> import org.apache.spark.sql.catalyst.CatalystTypeConverters
>
> import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
>
> import org.apache.spark.sql.execution.datasources.{OutputWriterFactory,
> FileFormat}
>
> import org.apache.spark.sql.sources._
>
> import org.apache.spark.sql.types._
>
> import org.apache.spark.sql._
>
> import org.slf4j.LoggerFactory
>
>
>
> class DefaultSource extends SchemaRelationProvider with FileFormat {
>
>
>
>   override def createRelation(sqlContext: SQLContext, parameters:
> Map[String, String], schema: StructType) = {
>
>     new HBaseRelation(schema, parameters)(sqlContext)
>
>   }
>
>
>
>   def inferSchema(sparkSession: SparkSession,
>
>                   options: Map[String, String],
>
>                   files: Seq[FileStatus]): Option[StructType] = ???
>
>
>
>   def prepareWrite(sparkSession: SparkSession,
>
>                    job: Job,
>
>                    options: Map[String, String],
>
>                    dataSchema: StructType): OutputWriterFactory = ???
>
> }
>
>
>
> object HBaseConfigurationUtil {
>
>   lazy val logger = LoggerFactory.getLogger("HBaseConfigurationUtil")
>
>   val hbaseConfiguration = (tableName: String, hbaseQuorum: String) => {
>
>     val conf = HBaseConfiguration.create()
>
>     conf.set(TableInputFormat.INPUT_TABLE, tableName)
>
>     conf.set("hbase.mapred.outputtable", tableName)
>
>     conf.set("hbase.zookeeper.quorum", hbaseQuorum)
>
>     conf
>
>   }
>
> }
>
>
>
> class HBaseRelation(val schema: StructType, parameters: Map[String,
> String])
>
>                    (@transient val sqlContext: SQLContext) extends
> BaseRelation with TableScan {
>
>
>
>   import sqlContext.sparkContext
>
>
>
>   override def buildScan(): RDD[Row] = {
>
>
>
>     val bcDataSchema = sparkContext.broadcast(schema)
>
>
>
>     val tableName = parameters.get("path") match {
>
>       case Some(t) => t
>
>       case _ => throw new RuntimeException("Table name (path) not provided
> in parameters")
>
>     }
>
>
>
>     val hbaseQuorum = parameters.get("hbaseQuorum") match {
>
>       case Some(s: String) => s
>
>       case _ => throw new RuntimeException("hbaseQuorum not provided in
> options")
>
>     }
>
>
>
>     val rdd = sparkContext.newAPIHadoopRDD(
>
>       HBaseConfigurationUtil.hbaseConfiguration(tableName, hbaseQuorum),
>
>       classOf[TableInputFormat],
>
>       classOf[ImmutableBytesWritable],
>
>       classOf[Result]
>
>     )
>
>
>
>     val rowRdd = rdd
>
>       .map(tuple => tuple._2)
>
>       .map { record =>
>
>
>
>       val cells: java.util.List[Cell] = record.listCells()
>
>
>
>       val splitRec =
> cells.toArray.foldLeft(Array(CellUtil.cloneRow(cells.get(0)))) {(a, b) =>
>
>         a :+ CellUtil.cloneValue(b.asInstanceOf[Cell])
>
>       }
>
>
>
>       val keyFieldName = bcDataSchema.value.fields.filter(e =>
> e.metadata.contains("isPrimary") &&
> e.metadata.getBoolean("isPrimary"))(0).name
>
>
>
>       val schemaArr = cells.toArray.foldLeft(Array(keyFieldName)) {(a, b)
> => {
>
>         val fieldCell = b.asInstanceOf[Cell]
>
>         a :+ new
> String(fieldCell.getQualifierArray).substring(fieldCell.getQualifierOffset,
> fieldCell.getQualifierLength + fieldCell.getQualifierOffset)
>
>       }
>
>       }
>
>
>
>       val res = Map(schemaArr.zip(splitRec).toArray: _*)
>
>
>
>       val recordFields = res.map(value => {
>
>         val colDataType =
>
>           try {
>
>             bcDataSchema.value.fields.filter(_.name ==
> value._1)(0).dataType
>
>           } catch {
>
>             case e: ArrayIndexOutOfBoundsException => throw new
> RuntimeException("Schema doesn't contain the fieldname")
>
>           }
>
>         CatalystTypeConverters.convertToScala(
>
>           Cast(Literal(value._2), colDataType).eval(),
>
>           colDataType)
>
>       }).toArray
>
>       Row(recordFields: _*)
>
>     }
>
>
>
>     rowRdd
>
>   }
>
> }
>
>
>
> Thanks
>
> Ravi
>
>
>
> *From:* Ted Yu [mailto:yuzhih...@gmail.com <yuzhih...@gmail.com>]
> *Sent:* Thursday, June 9, 2016 7:56 PM
> *To:* Ravi Aggarwal <raagg...@adobe.com>
> *Cc:* user <user@spark.apache.org>
> *Subject:* Re: OutOfMemory when doing joins in spark 2.0 while same code
> runs fine in spark 1.5.2
>
>
>
> bq. Read data from hbase using custom DefaultSource (implemented using
> TableScan)
>
>
>
> Did you use the DefaultSource from hbase-spark module in hbase master
> branch ?
>
> If you wrote your own, mind sharing related code ?
>
>
>
> Thanks
>
>
>
> On Thu, Jun 9, 2016 at 2:53 AM, raaggarw <raagg...@adobe.com> wrote:
>
> Hi,
>
> I was trying to port my code from spark 1.5.2 to spark 2.0 however i faced
> some outofMemory issues. On drilling down i could see that OOM is because
> of
> join, because removing join fixes the issue. I then created a small
> spark-app to reproduce this:
>
> (48 cores, 300gb ram - divided among 4 workers)
>
> line1 :df1 = Read a set a of parquet files into dataframe
> line2: df1.count
> line3: df2 = Read data from hbase using custom DefaultSource (implemented
> using TableScan)
> line4: df2.count
> line5: df3 = df1.join(df2, df1("field1") === df2("field2"), "inner")
> line6: df3.count -> *this is where it fails in Spark 2.0 and runs fine in
> spark 1.5.2*
>
> Problem:
> First lot of WARN messages
> 2016-06-09 08:14:18,884 WARN  [broadcast-exchange-0]
> memory.TaskMemoryManager (TaskMemoryManager.java:allocatePage(264)) -
> Failed
> to allocate a page (1048576 bytes), try again.
> And then OOM
>
> I then tried to dump data fetched from hbase into s3 and then created df2
> from s3 rather than hbase, then it worked fine in spark 2.0 as well.
>
> Could someone please guide me through next steps?
>
> Thanks
> Ravi
> Computer Scientist @ Adobe
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-when-doing-joins-in-spark-2-0-while-same-code-runs-fine-in-spark-1-5-2-tp27124.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>

Reply via email to