Hello guys,

Thank you for your prompt reply.
I followed Akhil suggestion with no success. Then, I tried again replacing
S3 by HDFS and the job seems to work properly.
TD, I'm not using speculative execution.

I think I've just realized what is happening. Due to S3 eventual
consistency, these temporary files sometimes are found, sometimes they are
not. I confirmed this hypothesis via s3cmd.
So, I come up with two questions/suggestions:

1. Does Spark support these temporary files to be written on HDFS and my
final output on S3?
2. What do you think about adding a property like 'spark.s3.maxRetries'
that determines the number of retries before assuming that a file is indeed
not found? I can contribute with this patch if you want.
(Hadoop already have a similar property 'fs.s3.maxRetries', but for
IOException and S3Exception.)

Thanks again and I look forward for your comments,


*--Flávio R. Santos*

Chaordic | *Platform*
*www.chaordic.com.br <http://www.chaordic.com.br/>*
+55 48 3232.3200

On Thu, Dec 11, 2014 at 10:03 AM, Tathagata Das <tathagata.das1...@gmail.com
> wrote:

> Following Gerard's thoughts, here are possible things that could be
> happening.
>
> 1. Is there another process in the background that is deleting files
> in the directory where you are trying to write? Seems like the
> temporary file generated by one of the tasks is getting delete before
> it is renamed to the final output file. I suggest trying to not write
> to S3, rather just count and print (with rest of the computation
> staying exactly same) and see if the error still occurs. That would
> narrow down the culprit to what Gerard suggested.
> 2. Do you have speculative execution turned on? If so, could you turn
> it off and try?
>
> TD
>
> On Thu, Dec 11, 2014 at 1:42 AM, Gerard Maas <gerard.m...@gmail.com>
> wrote:
> > If the timestamps in the logs are to be trusted It looks like your
> driver is
> > dying with that java.io.FileNotFoundException: and therefore the workers
> > loose their connection and close down.
> >
> > -kr, Gerard.
> >
> > On Thu, Dec 11, 2014 at 7:39 AM, Akhil Das <ak...@sigmoidanalytics.com>
> > wrote:
> >>
> >> Try to add the following to the sparkConf
> >>
> >>  .set("spark.core.connection.ack.wait.timeout","6000")
> >>
> >>       .set("spark.akka.frameSize","60")
> >>
> >> Used to face that issue with spark 1.1.0
> >>
> >> Thanks
> >> Best Regards
> >>
> >> On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos
> >> <bar...@chaordicsystems.com> wrote:
> >>>
> >>> Dear Spark'ers,
> >>>
> >>> I'm trying to run a simple job using Spark Streaming (version 1.1.1)
> and
> >>> YARN (yarn-cluster), but unfortunately I'm facing many issues. In
> short, my
> >>> job does the following:
> >>> - Consumes a specific Kafka topic
> >>> - Writes its content to S3 or HDFS
> >>>
> >>> Records in Kafka are in the form:
> >>> {"key": "someString"}
> >>>
> >>> This is important because I use the value of "key" to define the output
> >>> file name in S3.
> >>> Here are the Spark and Kafka parameters I'm using:
> >>>
> >>>> val sparkConf = new SparkConf()
> >>>>   .setAppName("MyDumperApp")
> >>>>   .set("spark.task.maxFailures", "100")
> >>>>   .set("spark.hadoop.validateOutputSpecs", "false")
> >>>>   .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> >>>>   .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops")
> >>>> val kafkaParams = Map(
> >>>>   "zookeeper.connect" -> zkQuorum,
> >>>>   "zookeeper.session.timeout.ms" -> "10000",
> >>>>   "rebalance.backoff.ms" -> "8000",
> >>>>   "rebalance.max.retries" -> "10",
> >>>>   "group.id" -> group,
> >>>>   "auto.offset.reset" -> "largest"
> >>>> )
> >>>
> >>>
> >>> My application is the following:
> >>>
> >>>> KafkaUtils.createStream[String, String, StringDecoder,
> >>>> StringDecoder](ssc, kafkaParams, Map(topic -> 1),
> >>>> StorageLevel.MEMORY_AND_DISK_SER_2)
> >>>>   .foreachRDD((rdd, time) =>
> >>>>     rdd.map {
> >>>>       case (_, line) =>
> >>>>         val json = parse(line)
> >>>>         val key = extract(json, "key").getOrElse("key_not_found")
> >>>>         (key, dateFormatter.format(time.milliseconds)) -> line
> >>>>     }
> >>>>       .partitionBy(new HashPartitioner(10))
> >>>>       .saveAsHadoopFile[KeyBasedOutput[(String,String),
> >>>> String]]("s3://BUCKET", classOf[BZip2Codec])
> >>>>   )
> >>>
> >>>
> >>> And the last piece:
> >>>
> >>>> class KeyBasedOutput[T >: Null, V <: AnyRef] extends
> >>>> MultipleTextOutputFormat[T , V] {
> >>>>   override protected def generateFileNameForKeyValue(key: T, value: V,
> >>>> leaf: String) = key match {
> >>>>     case (myKey, batchId) =>
> >>>>       "somedir" + "/" + myKey + "/" +
> >>>>         "prefix-" + myKey + "_" + batchId + "_" + leaf
> >>>>   }
> >>>>   override protected def generateActualKey(key: T, value: V) = null
> >>>> }
> >>>
> >>>
> >>> I use batch sizes of 5 minutes with checkpoints activated.
> >>> The job fails nondeterministically (I think it never ran longer than ~5
> >>> hours). I have no clue why, it simply fails.
> >>> Please find below the exceptions thrown by my application.
> >>>
> >>> I really appreciate any kind of hint.
> >>> Thank you very much in advance.
> >>>
> >>> Regards,
> >>> -- Flávio
> >>>
> >>> ==== Executor 1
> >>>
> >>> 2014-12-10 19:05:15,150 INFO  [handle-read-write-executor-3]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection
> >>>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
> >>> 2014-12-10 19:05:15,201 INFO  [Thread-6] storage.MemoryStore
> >>> (Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with
> >>> curMem=194463488,
> >>>  maxMem=4445479895
> >>> 2014-12-10 19:05:15,202 INFO  [Thread-6] storage.MemoryStore
> >>> (Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as
> bytes in
> >>> memor
> >>> y (estimated size 96.4 KB, free 4.0 GB)
> >>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> ReceivingConnecti
> >>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
> >>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-1]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection
> >>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
> >>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection
> >>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
> >>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> ReceivingConnecti
> >>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
> >>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-3]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection
> >>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
> >>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection
> >>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
> >>> 2014-12-10 19:05:16,650 INFO  [connection-manager-thread]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
> >>> sun.nio.ch.Se
> >>> lectionKeyImpl@da2e041
> >>> 2014-12-10 19:05:16,651 INFO  [connection-manager-thread]
> >>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
> >>> cancelled ? sun.n
> >>> io.ch.SelectionKeyImpl@da2e041
> >>> java.nio.channels.CancelledKeyException
> >>>         at
> >>>
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:316)
> >>>         at
> >>>
> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
> >>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> ReceivingConnection to
> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444)
> >>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-2]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection to
> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444)
> >>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection to
> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444)
> >>> 2014-12-10 19:05:16,680 INFO  [connection-manager-thread]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
> >>> sun.nio.ch.SelectionKeyImpl@6a0dd98a
> >>> 2014-12-10 19:05:16,681 INFO  [connection-manager-thread]
> >>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
> >>> cancelled ? sun.nio.ch.SelectionKeyImpl@6a0dd98a
> >>> java.nio.channels.CancelledKeyException
> >>>         at
> >>>
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
> >>>         at
> >>>
> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
> >>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> ReceivingConnection to
> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984)
> >>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-0]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection to
> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984)
> >>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection to
> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984)
> >>> 2014-12-10 19:05:17,182 ERROR [SIGTERM handler]
> >>> executor.CoarseGrainedExecutorBackend (SignalLogger.scala:handle(57)) -
> >>> RECEIVED SIGNAL 15: SIGTERM
> >>>
> >>> ==== Executor 2
> >>>
> >>> 2014-12-10 19:05:15,010 INFO  [handle-message-executor-11]
> >>> storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info
> of
> >>> block input
> >>> -0-1418238314800
> >>> 2014-12-10 19:05:15,157 INFO  [connection-manager-thread]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
> >>> sun.nio.ch.Se
> >>> lectionKeyImpl@66ea19c
> >>> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-2]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection
> >>>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
> >>> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-0]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> ReceivingConnecti
> >>> on to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
> >>> 2014-12-10 19:05:15,158 ERROR [handle-read-write-executor-0]
> >>> network.ConnectionManager (Logging.scala:logError(75)) - Corresponding
> >>> SendingConn
> >>> ection to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com
> ,49163)
> >>> not found
> >>> 2014-12-10 19:05:15,158 INFO  [connection-manager-thread]
> >>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
> >>> cancelled ? sun.n
> >>> io.ch.SelectionKeyImpl@66ea19c
> >>> java.nio.channels.CancelledKeyException
> >>>         at
> >>>
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
> >>>         at
> >>>
> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
> >>>
> >>> ==== Driver
> >>>
> >>> 2014-12-10 19:05:13,805 INFO
> >>> [sparkDriver-akka.actor.default-dispatcher-13] storage.BlockManagerInfo
> >>> (Logging.scala:logInfo(59)) - Added input
> >>> -0-1418238313600 in memory on
> ec2-EXECUTOR.compute-1.amazonaws.com:39444
> >>> (size: 38.2 KB, free: 4.1 GB)
> >>> 2014-12-10 19:05:13,823 ERROR
> >>> [sparkDriver-akka.actor.default-dispatcher-16] scheduler.JobScheduler
> >>> (Logging.scala:logError(96)) - Error runnin
> >>> g job streaming job 1418238300000 ms.0
> >>> java.io.FileNotFoundException: File
> >>> s3n://BUCKET/_temporary/0/task_201412101900_0039_m_000033 does not
> exist.
> >>>         at
> >>>
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506)
> >>>         at
> >>>
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
> >>>         at
> >>>
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
> >>>         at
> >>>
> org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
> >>>         at
> >>>
> org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
> >>>         at
> >>>
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995)
> >>>         at
> >>>
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878)
> >>>         at
> >>>
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:845)
> >>>         at
> >>>
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:803)
> >>>         at
> >>> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:100)
> >>>         at
> >>> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:79)
> >>>         at
> >>>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
> >>>         at
> >>>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> >>>         at
> >>>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> >>>         at scala.util.Try$.apply(Try.scala:161)
> >>>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
> >>>         at
> >>>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
> >>>         at
> >>>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> >>>         at
> >>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> >>>         at java.lang.Thread.run(Thread.java:745)
> >>> 2014-12-10 19:05:13,829 INFO  [Driver] yarn.ApplicationMaster
> >>> (Logging.scala:logInfo(59)) - Unregistering ApplicationMaster with
> FAILED
> >>>
> >>> --
> >>> Flávio R. Santos
> >>>
> >>> Chaordic | Platform
> >>> www.chaordic.com.br
> >>> +55 48 3232.3200
> >>
> >>
> >
>

Reply via email to