Getting error for the following code snippet:
object SparkTaskTry extends Logging {
63 /**
64 * Extends the normal Try constructor to allow TaskKilledExceptions
to propagate
65 */
66 def apply[T](r: => T): Try[T] =
67 try scala.util.Success(r) catch {
68 case e: TaskKilledException => throw e
69 case NonFatal(e) =>
70 logInfo("Caught and Ignored Exception: " + e.toString)
71 e.printStackTrace()
72 Failure(e)
73 }
74 }
override def buildScan(
349 requiredColumns: Array[String],
350 filters: Array[Filter],
351 inputFiles: Array[FileStatus],
352 broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row]
= {
353 val useMetadataCache =
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
354 val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
355 val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
356 val assumeInt96IsTimestamp =
sqlContext.conf.isParquetINT96AsTimestamp
357 val followParquetFormatSpec =
sqlContext.conf.followParquetFormatSpec
358
359 // When merging schemas is enabled and the column of the given
filter does not exist,
360 // Parquet emits an exception which is an issue of Parquet
(PARQUET-389).
361 val safeParquetFilterPushDown = !shouldMergeSchemas &&
parquetFilterPushDown
362
363 // Parquet row group size. We will use this value as the value for
364 // mapreduce.input.fileinputformat.split.minsize and
mapred.min.split.size if the value
365 // of these flags are smaller than the parquet row group size.
366 val parquetBlockSize =
ParquetOutputFormat.getLongBlockSize(broadcastedConf.value.value)
367
368 // Create the function to set variable Parquet confs at both driver
and executor side.
369 val initLocalJobFuncOpt =
370 ParquetRelation.initializeLocalJobFunc(
371 requiredColumns,
372 filters,
373 dataSchema,
374 parquetBlockSize,
375 useMetadataCache,
376 safeParquetFilterPushDown,
377 assumeBinaryIsString,
378 assumeInt96IsTimestamp,
379 followParquetFormatSpec) _
380
381 // Create the function to set input paths at the driver side.
382 val setInputPaths =
383 ParquetRelation.initializeDriverSideJobFunc(inputFiles,
parquetBlockSize) _
384
385 Utils.withDummyCallSite(sqlContext.sparkContext) {
386 new RDD[Try[InternalRow]](sqlContext.sparkContext, Nil) with
Logging {
387
388 override def getPartitions: Array[SparkPartition] =
internalRDD.getPartitions
389
390 override def getPreferredLocations(split: SparkPartition):
Seq[String] =
391 internalRDD.getPreferredLocations(split)
392
393 override def checkpoint() {
394 // Do nothing. Hadoop RDD should not be checkpointed.
395 }
396
397 override def persist(storageLevel: StorageLevel): this.type = {
398 super.persist(storageLevel)
399 }
400
401 val internalRDD: SqlNewHadoopRDD[InternalRow] = new
SqlNewHadoopRDD(
402 sc = sqlContext.sparkContext,
403 broadcastedConf = broadcastedConf,
404 initDriverSideJobFuncOpt = Some(setInputPaths),
405 initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
406 inputFormatClass = if (isSplittable) {
407 classOf[ParquetInputFormat[InternalRow]]
408 } else {
409 classOf[ParquetRowInputFormatIndivisible]
410 },
411 valueClass = classOf[InternalRow]) {
412
413 val cacheMetadata = useMetadataCache
414
415 @transient val cachedStatuses = inputFiles.map { f =>
416 // In order to encode the authority of a Path containing
special characters such as '/'
417 // (which does happen in some S3N credentials), we need to
use the string returned by the
418 // URI of the path to create a new Path.
419 val pathWithEscapedAuthority = escapePathUserInfo(f.getPath)
420 new FileStatus(
421 f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
f.getModificationTime,
422 f.getAccessTime, f.getPermission, f.getOwner, f.getGroup,
pathWithEscapedAuthority)
423 }.toSeq
424
425 private def escapePathUserInfo(path: Path): Path = {
426 val uri = path.toUri
427 new Path(new URI(
428 uri.getScheme, uri.getRawUserInfo, uri.getHost,
uri.getPort, uri.getPath,
429 uri.getQuery, uri.getFragment))
430 }
431
432 // Overridden so we can inject our own cached files statuses.
433 override def getPartitions: Array[SparkPartition] = {
434 val inputFormat = new ParquetInputFormat[InternalRow] {
435 override def listStatus(jobContext: JobContext):
JList[FileStatus] = {
436 if (cacheMetadata) cachedStatuses else
super.listStatus(jobContext)
437 }
438 }
439
440 val jobContext = newJobContext(getConf(isDriverSide = true),
jobId)
441 val rawSplits = inputFormat.getSplits(jobContext)
442
443 Array.tabulate[SparkPartition](rawSplits.size) { i =>
444 new SqlNewHadoopPartition(id, i,
rawSplits(i).asInstanceOf[InputSplit with Writable])
445 }
446 }
447 }
448
449 override def compute(part: SparkPartition, context:
TaskContext):
450 InterruptibleIterator[Try[InternalRow]] = {
451 val iter: Iterator[InternalRow] =
internalRDD.constructIter(part, context)
452 val tryIter = new Iterator[Try[InternalRow]] {
453 override def next(): Try[InternalRow] = {
454 val readAttempt = SparkTaskTry(iter.next())
455 readAttempt
456 }
457
458 override def hasNext: Boolean = {
459 SparkTaskTry[Boolean](iter.hasNext) match {
460 case scala.util.Success(r) => r
461 case _ => false
462 }
463 }
464 }
465 new InterruptibleIterator[Try[InternalRow]](context, tryIter)
466 }
467
468 }.filter(_.isSuccess).map(_.get)
469 .asInstanceOf[RDD[Row]] // type erasure hack to pass
RDD[InternalRow] as RDD[Row]
470 }
471 }
Error StackTrace :
Caused by: java.io.NotSerializableException:
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
Serialization stack:
- object not serializable (class:
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation, value:
ParquetRelation)
- field (class:
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1,
name: $outer, type: class
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation)
- object (class
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1,
<function0>)
- field (class:
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$2,
name: $outer, type: class
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1)
- object (class
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$2,
$anonfun$buildInternalScan$1$$anon$2[2] at )
- field (class: org.apache.spark.NarrowDependency, name: _rdd, type:
class
org.apache.spark.rdd.RDD)
- object (class org.apache.spark.OneToOneDependency,
org.apache.spark.OneToOneDependency@47293a0b)
- writeObject data (class: scala.collection.immutable.$colon$colon)
- object (class scala.collection.immutable.$colon$colon,
List(org.apache.spark.OneToOneDependency@47293a0b))
- field (class: org.apache.spark.rdd.RDD, name:
org$apache$spark$rdd$RDD$$dependencies_, type: interface
scala.collection.Seq)
- object (class org.apache.spark.rdd.MapPartitionsRDD,
MapPartitionsRDD[4]
at )
- field (class: org.apache.spark.rdd.MapPartitionsRDD, name: prev, type:
class org.apache.spark.rdd.RDD)
- object (class org.apache.spark.rdd.MapPartitionsRDD,
MapPartitionsRDD[5]
at )
- field (class: org.apache.spark.sql.execution.PhysicalRDD, name: rdd,
type: class org.apache.spark.rdd.RDD)
- object (class org.apache.spark.sql.execution.PhysicalRDD, Scan
ParquetRelation[_1#0] InputPaths:
hdfs://CRUX2-SETUP:9000/data/testdir/data1.parquet
)
- field (class: org.apache.spark.sql.execution.ConvertToSafe, name:
child,
type: class org.apache.spark.sql.execution.SparkPlan)
- object (class org.apache.spark.sql.execution.ConvertToSafe,
ConvertToSafe
+- Scan ParquetRelation[_1#0] InputPaths:
hdfs://CRUX2-SETUP:9000/data/testdir/data1.parquet
)
- field (class: org.apache.spark.sql.execution.ConvertToSafe$$anonfun$2,
name: $outer, type: class org.apache.spark.sql.execution.ConvertToSafe)
- object (class org.apache.spark.sql.execution.ConvertToSafe$$anonfun$2,
<function1>)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 78 more
Please help!
--
View this message in context:
http://apache-spark-developers-list.1001551.n3.nabble.com/Task-not-Serializable-Exception-tp20417.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]