Hello guys, Thank you for your prompt reply. I followed Akhil suggestion with no success. Then, I tried again replacing S3 by HDFS and the job seems to work properly. TD, I'm not using speculative execution.
I think I've just realized what is happening. Due to S3 eventual consistency, these temporary files sometimes are found, sometimes they are not. I confirmed this hypothesis via s3cmd. So, I come up with two questions/suggestions: 1. Does Spark support these temporary files to be written on HDFS and my final output on S3? 2. What do you think about adding a property like 'spark.s3.maxRetries' that determines the number of retries before assuming that a file is indeed not found? I can contribute with this patch if you want. (Hadoop already have a similar property 'fs.s3.maxRetries', but for IOException and S3Exception.) Thanks again and I look forward for your comments, *--Flávio R. Santos* Chaordic | *Platform* *www.chaordic.com.br <http://www.chaordic.com.br/>* +55 48 3232.3200 On Thu, Dec 11, 2014 at 10:03 AM, Tathagata Das <tathagata.das1...@gmail.com > wrote: > Following Gerard's thoughts, here are possible things that could be > happening. > > 1. Is there another process in the background that is deleting files > in the directory where you are trying to write? Seems like the > temporary file generated by one of the tasks is getting delete before > it is renamed to the final output file. I suggest trying to not write > to S3, rather just count and print (with rest of the computation > staying exactly same) and see if the error still occurs. That would > narrow down the culprit to what Gerard suggested. > 2. Do you have speculative execution turned on? If so, could you turn > it off and try? > > TD > > On Thu, Dec 11, 2014 at 1:42 AM, Gerard Maas <gerard.m...@gmail.com> > wrote: > > If the timestamps in the logs are to be trusted It looks like your > driver is > > dying with that java.io.FileNotFoundException: and therefore the workers > > loose their connection and close down. > > > > -kr, Gerard. > > > > On Thu, Dec 11, 2014 at 7:39 AM, Akhil Das <ak...@sigmoidanalytics.com> > > wrote: > >> > >> Try to add the following to the sparkConf > >> > >> .set("spark.core.connection.ack.wait.timeout","6000") > >> > >> .set("spark.akka.frameSize","60") > >> > >> Used to face that issue with spark 1.1.0 > >> > >> Thanks > >> Best Regards > >> > >> On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos > >> <bar...@chaordicsystems.com> wrote: > >>> > >>> Dear Spark'ers, > >>> > >>> I'm trying to run a simple job using Spark Streaming (version 1.1.1) > and > >>> YARN (yarn-cluster), but unfortunately I'm facing many issues. In > short, my > >>> job does the following: > >>> - Consumes a specific Kafka topic > >>> - Writes its content to S3 or HDFS > >>> > >>> Records in Kafka are in the form: > >>> {"key": "someString"} > >>> > >>> This is important because I use the value of "key" to define the output > >>> file name in S3. > >>> Here are the Spark and Kafka parameters I'm using: > >>> > >>>> val sparkConf = new SparkConf() > >>>> .setAppName("MyDumperApp") > >>>> .set("spark.task.maxFailures", "100") > >>>> .set("spark.hadoop.validateOutputSpecs", "false") > >>>> .set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > >>>> .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops") > >>>> val kafkaParams = Map( > >>>> "zookeeper.connect" -> zkQuorum, > >>>> "zookeeper.session.timeout.ms" -> "10000", > >>>> "rebalance.backoff.ms" -> "8000", > >>>> "rebalance.max.retries" -> "10", > >>>> "group.id" -> group, > >>>> "auto.offset.reset" -> "largest" > >>>> ) > >>> > >>> > >>> My application is the following: > >>> > >>>> KafkaUtils.createStream[String, String, StringDecoder, > >>>> StringDecoder](ssc, kafkaParams, Map(topic -> 1), > >>>> StorageLevel.MEMORY_AND_DISK_SER_2) > >>>> .foreachRDD((rdd, time) => > >>>> rdd.map { > >>>> case (_, line) => > >>>> val json = parse(line) > >>>> val key = extract(json, "key").getOrElse("key_not_found") > >>>> (key, dateFormatter.format(time.milliseconds)) -> line > >>>> } > >>>> .partitionBy(new HashPartitioner(10)) > >>>> .saveAsHadoopFile[KeyBasedOutput[(String,String), > >>>> String]]("s3://BUCKET", classOf[BZip2Codec]) > >>>> ) > >>> > >>> > >>> And the last piece: > >>> > >>>> class KeyBasedOutput[T >: Null, V <: AnyRef] extends > >>>> MultipleTextOutputFormat[T , V] { > >>>> override protected def generateFileNameForKeyValue(key: T, value: V, > >>>> leaf: String) = key match { > >>>> case (myKey, batchId) => > >>>> "somedir" + "/" + myKey + "/" + > >>>> "prefix-" + myKey + "_" + batchId + "_" + leaf > >>>> } > >>>> override protected def generateActualKey(key: T, value: V) = null > >>>> } > >>> > >>> > >>> I use batch sizes of 5 minutes with checkpoints activated. > >>> The job fails nondeterministically (I think it never ran longer than ~5 > >>> hours). I have no clue why, it simply fails. > >>> Please find below the exceptions thrown by my application. > >>> > >>> I really appreciate any kind of hint. > >>> Thank you very much in advance. > >>> > >>> Regards, > >>> -- Flávio > >>> > >>> ==== Executor 1 > >>> > >>> 2014-12-10 19:05:15,150 INFO [handle-read-write-executor-3] > >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > >>> SendingConnection > >>> to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163) > >>> 2014-12-10 19:05:15,201 INFO [Thread-6] storage.MemoryStore > >>> (Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with > >>> curMem=194463488, > >>> maxMem=4445479895 > >>> 2014-12-10 19:05:15,202 INFO [Thread-6] storage.MemoryStore > >>> (Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as > bytes in > >>> memor > >>> y (estimated size 96.4 KB, free 4.0 GB) > >>> 2014-12-10 19:05:16,506 INFO [handle-read-write-executor-2] > >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > >>> ReceivingConnecti > >>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443) > >>> 2014-12-10 19:05:16,506 INFO [handle-read-write-executor-1] > >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > >>> SendingConnection > >>> to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443) > >>> 2014-12-10 19:05:16,506 INFO [handle-read-write-executor-2] > >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > >>> SendingConnection > >>> to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443) > >>> 2014-12-10 19:05:16,649 INFO [handle-read-write-executor-0] > >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > >>> ReceivingConnecti > >>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644) > >>> 2014-12-10 19:05:16,649 INFO [handle-read-write-executor-3] > >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > >>> SendingConnection > >>> to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644) > >>> 2014-12-10 19:05:16,649 INFO [handle-read-write-executor-0] > >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > >>> SendingConnection > >>> to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644) > >>> 2014-12-10 19:05:16,650 INFO [connection-manager-thread] > >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ? > >>> sun.nio.ch.Se > >>> lectionKeyImpl@da2e041 > >>> 2014-12-10 19:05:16,651 INFO [connection-manager-thread] > >>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already > >>> cancelled ? sun.n > >>> io.ch.SelectionKeyImpl@da2e041 > >>> java.nio.channels.CancelledKeyException > >>> at > >>> > org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:316) > >>> at > >>> > org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145) > >>> 2014-12-10 19:05:16,679 INFO [handle-read-write-executor-1] > >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > >>> ReceivingConnection to > >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444) > >>> 2014-12-10 19:05:16,679 INFO [handle-read-write-executor-2] > >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > >>> SendingConnection to > >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444) > >>> 2014-12-10 19:05:16,679 INFO [handle-read-write-executor-1] > >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > >>> SendingConnection to > >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444) > >>> 2014-12-10 19:05:16,680 INFO [connection-manager-thread] > >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ? > >>> sun.nio.ch.SelectionKeyImpl@6a0dd98a > >>> 2014-12-10 19:05:16,681 INFO [connection-manager-thread] > >>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already > >>> cancelled ? sun.nio.ch.SelectionKeyImpl@6a0dd98a > >>> java.nio.channels.CancelledKeyException > >>> at > >>> > org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392) > >>> at > >>> > org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145) > >>> 2014-12-10 19:05:16,717 INFO [handle-read-write-executor-3] > >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > >>> ReceivingConnection to > >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984) > >>> 2014-12-10 19:05:16,717 INFO [handle-read-write-executor-0] > >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > >>> SendingConnection to > >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984) > >>> 2014-12-10 19:05:16,717 INFO [handle-read-write-executor-3] > >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > >>> SendingConnection to > >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984) > >>> 2014-12-10 19:05:17,182 ERROR [SIGTERM handler] > >>> executor.CoarseGrainedExecutorBackend (SignalLogger.scala:handle(57)) - > >>> RECEIVED SIGNAL 15: SIGTERM > >>> > >>> ==== Executor 2 > >>> > >>> 2014-12-10 19:05:15,010 INFO [handle-message-executor-11] > >>> storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info > of > >>> block input > >>> -0-1418238314800 > >>> 2014-12-10 19:05:15,157 INFO [connection-manager-thread] > >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ? > >>> sun.nio.ch.Se > >>> lectionKeyImpl@66ea19c > >>> 2014-12-10 19:05:15,157 INFO [handle-read-write-executor-2] > >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > >>> SendingConnection > >>> to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163) > >>> 2014-12-10 19:05:15,157 INFO [handle-read-write-executor-0] > >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing > >>> ReceivingConnecti > >>> on to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163) > >>> 2014-12-10 19:05:15,158 ERROR [handle-read-write-executor-0] > >>> network.ConnectionManager (Logging.scala:logError(75)) - Corresponding > >>> SendingConn > >>> ection to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com > ,49163) > >>> not found > >>> 2014-12-10 19:05:15,158 INFO [connection-manager-thread] > >>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already > >>> cancelled ? sun.n > >>> io.ch.SelectionKeyImpl@66ea19c > >>> java.nio.channels.CancelledKeyException > >>> at > >>> > org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392) > >>> at > >>> > org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145) > >>> > >>> ==== Driver > >>> > >>> 2014-12-10 19:05:13,805 INFO > >>> [sparkDriver-akka.actor.default-dispatcher-13] storage.BlockManagerInfo > >>> (Logging.scala:logInfo(59)) - Added input > >>> -0-1418238313600 in memory on > ec2-EXECUTOR.compute-1.amazonaws.com:39444 > >>> (size: 38.2 KB, free: 4.1 GB) > >>> 2014-12-10 19:05:13,823 ERROR > >>> [sparkDriver-akka.actor.default-dispatcher-16] scheduler.JobScheduler > >>> (Logging.scala:logError(96)) - Error runnin > >>> g job streaming job 1418238300000 ms.0 > >>> java.io.FileNotFoundException: File > >>> s3n://BUCKET/_temporary/0/task_201412101900_0039_m_000033 does not > exist. > >>> at > >>> > org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506) > >>> at > >>> > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360) > >>> at > >>> > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) > >>> at > >>> > org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136) > >>> at > >>> > org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126) > >>> at > >>> > org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995) > >>> at > >>> > org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878) > >>> at > >>> > org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:845) > >>> at > >>> > org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:803) > >>> at > >>> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:100) > >>> at > >>> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:79) > >>> at > >>> > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) > >>> at > >>> > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > >>> at > >>> > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > >>> at scala.util.Try$.apply(Try.scala:161) > >>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) > >>> at > >>> > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) > >>> at > >>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > >>> at > >>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > >>> at java.lang.Thread.run(Thread.java:745) > >>> 2014-12-10 19:05:13,829 INFO [Driver] yarn.ApplicationMaster > >>> (Logging.scala:logInfo(59)) - Unregistering ApplicationMaster with > FAILED > >>> > >>> -- > >>> Flávio R. Santos > >>> > >>> Chaordic | Platform > >>> www.chaordic.com.br > >>> +55 48 3232.3200 > >> > >> > > >