Aah yes, that makes sense. You could write first to HDFS, and when
that works, copy from HDFS to S3. That should work as it wont depend
on the temporary files to be in S3.
I am not sure how much you can customize just for S3 in Spark code. In
Spark, since we just use Hadoop API to write there isnt much, if at
all any, code that is customized to a particular file system. An
alternative is to see whether you could do retries efficiently for all
file systems.

TD

On Thu, Dec 11, 2014 at 6:50 AM, Flávio Santos
<bar...@chaordicsystems.com> wrote:
> Hello guys,
>
> Thank you for your prompt reply.
> I followed Akhil suggestion with no success. Then, I tried again replacing
> S3 by HDFS and the job seems to work properly.
> TD, I'm not using speculative execution.
>
> I think I've just realized what is happening. Due to S3 eventual
> consistency, these temporary files sometimes are found, sometimes they are
> not. I confirmed this hypothesis via s3cmd.
> So, I come up with two questions/suggestions:
>
> 1. Does Spark support these temporary files to be written on HDFS and my
> final output on S3?
> 2. What do you think about adding a property like 'spark.s3.maxRetries' that
> determines the number of retries before assuming that a file is indeed not
> found? I can contribute with this patch if you want.
> (Hadoop already have a similar property 'fs.s3.maxRetries', but for
> IOException and S3Exception.)
>
> Thanks again and I look forward for your comments,
>
> --
> Flávio R. Santos
>
> Chaordic | Platform
> www.chaordic.com.br
> +55 48 3232.3200
>
> On Thu, Dec 11, 2014 at 10:03 AM, Tathagata Das
> <tathagata.das1...@gmail.com> wrote:
>>
>> Following Gerard's thoughts, here are possible things that could be
>> happening.
>>
>> 1. Is there another process in the background that is deleting files
>> in the directory where you are trying to write? Seems like the
>> temporary file generated by one of the tasks is getting delete before
>> it is renamed to the final output file. I suggest trying to not write
>> to S3, rather just count and print (with rest of the computation
>> staying exactly same) and see if the error still occurs. That would
>> narrow down the culprit to what Gerard suggested.
>> 2. Do you have speculative execution turned on? If so, could you turn
>> it off and try?
>>
>> TD
>>
>> On Thu, Dec 11, 2014 at 1:42 AM, Gerard Maas <gerard.m...@gmail.com>
>> wrote:
>> > If the timestamps in the logs are to be trusted It looks like your
>> > driver is
>> > dying with that java.io.FileNotFoundException: and therefore the workers
>> > loose their connection and close down.
>> >
>> > -kr, Gerard.
>> >
>> > On Thu, Dec 11, 2014 at 7:39 AM, Akhil Das <ak...@sigmoidanalytics.com>
>> > wrote:
>> >>
>> >> Try to add the following to the sparkConf
>> >>
>> >>  .set("spark.core.connection.ack.wait.timeout","6000")
>> >>
>> >>       .set("spark.akka.frameSize","60")
>> >>
>> >> Used to face that issue with spark 1.1.0
>> >>
>> >> Thanks
>> >> Best Regards
>> >>
>> >> On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos
>> >> <bar...@chaordicsystems.com> wrote:
>> >>>
>> >>> Dear Spark'ers,
>> >>>
>> >>> I'm trying to run a simple job using Spark Streaming (version 1.1.1)
>> >>> and
>> >>> YARN (yarn-cluster), but unfortunately I'm facing many issues. In
>> >>> short, my
>> >>> job does the following:
>> >>> - Consumes a specific Kafka topic
>> >>> - Writes its content to S3 or HDFS
>> >>>
>> >>> Records in Kafka are in the form:
>> >>> {"key": "someString"}
>> >>>
>> >>> This is important because I use the value of "key" to define the
>> >>> output
>> >>> file name in S3.
>> >>> Here are the Spark and Kafka parameters I'm using:
>> >>>
>> >>>> val sparkConf = new SparkConf()
>> >>>>   .setAppName("MyDumperApp")
>> >>>>   .set("spark.task.maxFailures", "100")
>> >>>>   .set("spark.hadoop.validateOutputSpecs", "false")
>> >>>>   .set("spark.serializer",
>> >>>> "org.apache.spark.serializer.KryoSerializer")
>> >>>>   .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops")
>> >>>> val kafkaParams = Map(
>> >>>>   "zookeeper.connect" -> zkQuorum,
>> >>>>   "zookeeper.session.timeout.ms" -> "10000",
>> >>>>   "rebalance.backoff.ms" -> "8000",
>> >>>>   "rebalance.max.retries" -> "10",
>> >>>>   "group.id" -> group,
>> >>>>   "auto.offset.reset" -> "largest"
>> >>>> )
>> >>>
>> >>>
>> >>> My application is the following:
>> >>>
>> >>>> KafkaUtils.createStream[String, String, StringDecoder,
>> >>>> StringDecoder](ssc, kafkaParams, Map(topic -> 1),
>> >>>> StorageLevel.MEMORY_AND_DISK_SER_2)
>> >>>>   .foreachRDD((rdd, time) =>
>> >>>>     rdd.map {
>> >>>>       case (_, line) =>
>> >>>>         val json = parse(line)
>> >>>>         val key = extract(json, "key").getOrElse("key_not_found")
>> >>>>         (key, dateFormatter.format(time.milliseconds)) -> line
>> >>>>     }
>> >>>>       .partitionBy(new HashPartitioner(10))
>> >>>>       .saveAsHadoopFile[KeyBasedOutput[(String,String),
>> >>>> String]]("s3://BUCKET", classOf[BZip2Codec])
>> >>>>   )
>> >>>
>> >>>
>> >>> And the last piece:
>> >>>
>> >>>> class KeyBasedOutput[T >: Null, V <: AnyRef] extends
>> >>>> MultipleTextOutputFormat[T , V] {
>> >>>>   override protected def generateFileNameForKeyValue(key: T, value:
>> >>>> V,
>> >>>> leaf: String) = key match {
>> >>>>     case (myKey, batchId) =>
>> >>>>       "somedir" + "/" + myKey + "/" +
>> >>>>         "prefix-" + myKey + "_" + batchId + "_" + leaf
>> >>>>   }
>> >>>>   override protected def generateActualKey(key: T, value: V) = null
>> >>>> }
>> >>>
>> >>>
>> >>> I use batch sizes of 5 minutes with checkpoints activated.
>> >>> The job fails nondeterministically (I think it never ran longer than
>> >>> ~5
>> >>> hours). I have no clue why, it simply fails.
>> >>> Please find below the exceptions thrown by my application.
>> >>>
>> >>> I really appreciate any kind of hint.
>> >>> Thank you very much in advance.
>> >>>
>> >>> Regards,
>> >>> -- Flávio
>> >>>
>> >>> ==== Executor 1
>> >>>
>> >>> 2014-12-10 19:05:15,150 INFO  [handle-read-write-executor-3]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection
>> >>>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>> >>> 2014-12-10 19:05:15,201 INFO  [Thread-6] storage.MemoryStore
>> >>> (Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with
>> >>> curMem=194463488,
>> >>>  maxMem=4445479895
>> >>> 2014-12-10 19:05:15,202 INFO  [Thread-6] storage.MemoryStore
>> >>> (Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as
>> >>> bytes in
>> >>> memor
>> >>> y (estimated size 96.4 KB, free 4.0 GB)
>> >>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> ReceivingConnecti
>> >>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>> >>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-1]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection
>> >>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>> >>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection
>> >>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>> >>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> ReceivingConnecti
>> >>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>> >>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-3]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection
>> >>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>> >>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection
>> >>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>> >>> 2014-12-10 19:05:16,650 INFO  [connection-manager-thread]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid
>> >>> ?
>> >>> sun.nio.ch.Se
>> >>> lectionKeyImpl@da2e041
>> >>> 2014-12-10 19:05:16,651 INFO  [connection-manager-thread]
>> >>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
>> >>> cancelled ? sun.n
>> >>> io.ch.SelectionKeyImpl@da2e041
>> >>> java.nio.channels.CancelledKeyException
>> >>>         at
>> >>>
>> >>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:316)
>> >>>         at
>> >>>
>> >>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>> >>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> ReceivingConnection to
>> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444)
>> >>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-2]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection to
>> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444)
>> >>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection to
>> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444)
>> >>> 2014-12-10 19:05:16,680 INFO  [connection-manager-thread]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid
>> >>> ?
>> >>> sun.nio.ch.SelectionKeyImpl@6a0dd98a
>> >>> 2014-12-10 19:05:16,681 INFO  [connection-manager-thread]
>> >>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
>> >>> cancelled ? sun.nio.ch.SelectionKeyImpl@6a0dd98a
>> >>> java.nio.channels.CancelledKeyException
>> >>>         at
>> >>>
>> >>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
>> >>>         at
>> >>>
>> >>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>> >>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> ReceivingConnection to
>> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984)
>> >>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-0]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection to
>> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984)
>> >>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection to
>> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984)
>> >>> 2014-12-10 19:05:17,182 ERROR [SIGTERM handler]
>> >>> executor.CoarseGrainedExecutorBackend (SignalLogger.scala:handle(57))
>> >>> -
>> >>> RECEIVED SIGNAL 15: SIGTERM
>> >>>
>> >>> ==== Executor 2
>> >>>
>> >>> 2014-12-10 19:05:15,010 INFO  [handle-message-executor-11]
>> >>> storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info
>> >>> of
>> >>> block input
>> >>> -0-1418238314800
>> >>> 2014-12-10 19:05:15,157 INFO  [connection-manager-thread]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid
>> >>> ?
>> >>> sun.nio.ch.Se
>> >>> lectionKeyImpl@66ea19c
>> >>> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-2]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection
>> >>>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>> >>> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-0]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> ReceivingConnecti
>> >>> on to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>> >>> 2014-12-10 19:05:15,158 ERROR [handle-read-write-executor-0]
>> >>> network.ConnectionManager (Logging.scala:logError(75)) - Corresponding
>> >>> SendingConn
>> >>> ection to
>> >>> ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>> >>> not found
>> >>> 2014-12-10 19:05:15,158 INFO  [connection-manager-thread]
>> >>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
>> >>> cancelled ? sun.n
>> >>> io.ch.SelectionKeyImpl@66ea19c
>> >>> java.nio.channels.CancelledKeyException
>> >>>         at
>> >>>
>> >>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
>> >>>         at
>> >>>
>> >>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>> >>>
>> >>> ==== Driver
>> >>>
>> >>> 2014-12-10 19:05:13,805 INFO
>> >>> [sparkDriver-akka.actor.default-dispatcher-13]
>> >>> storage.BlockManagerInfo
>> >>> (Logging.scala:logInfo(59)) - Added input
>> >>> -0-1418238313600 in memory on
>> >>> ec2-EXECUTOR.compute-1.amazonaws.com:39444
>> >>> (size: 38.2 KB, free: 4.1 GB)
>> >>> 2014-12-10 19:05:13,823 ERROR
>> >>> [sparkDriver-akka.actor.default-dispatcher-16] scheduler.JobScheduler
>> >>> (Logging.scala:logError(96)) - Error runnin
>> >>> g job streaming job 1418238300000 ms.0
>> >>> java.io.FileNotFoundException: File
>> >>> s3n://BUCKET/_temporary/0/task_201412101900_0039_m_000033 does not
>> >>> exist.
>> >>>         at
>> >>>
>> >>> org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506)
>> >>>         at
>> >>>
>> >>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
>> >>>         at
>> >>>
>> >>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
>> >>>         at
>> >>>
>> >>> org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
>> >>>         at
>> >>>
>> >>> org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
>> >>>         at
>> >>>
>> >>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995)
>> >>>         at
>> >>>
>> >>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878)
>> >>>         at
>> >>>
>> >>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:845)
>> >>>         at
>> >>>
>> >>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:803)
>> >>>         at
>> >>> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:100)
>> >>>         at
>> >>> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:79)
>> >>>         at
>> >>>
>> >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>> >>>         at
>> >>>
>> >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>> >>>         at
>> >>>
>> >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>> >>>         at scala.util.Try$.apply(Try.scala:161)
>> >>>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>> >>>         at
>> >>>
>> >>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>> >>>         at
>> >>>
>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> >>>         at
>> >>>
>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> >>>         at java.lang.Thread.run(Thread.java:745)
>> >>> 2014-12-10 19:05:13,829 INFO  [Driver] yarn.ApplicationMaster
>> >>> (Logging.scala:logInfo(59)) - Unregistering ApplicationMaster with
>> >>> FAILED
>> >>>
>> >>> --
>> >>> Flávio R. Santos
>> >>>
>> >>> Chaordic | Platform
>> >>> www.chaordic.com.br
>> >>> +55 48 3232.3200
>> >>
>> >>
>> >
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to