Greeting!
Thank you very much for taking the time to respond.
My apologies, but at the moment I don't have an example that I feel comfortable 
posting.  Frankly, I've been struggling with variantsof this for the last two 
weeks and probably won't be able to work on this particular issue for a few 
days.
However, I am intrigued by your comment.  You mention "when I closethe fs 
object inside map/mapPartition etc."  Where else can one close theobject?  If I 
don't close it, the output file is generally truncated.
Again, the code seems to work for  a few hundred files, then I get theseweird 
errors. Is this something subtle related to the shipping of the closure thatI'm 
not aware of?  
Can you give a general idea of how you handled this?Is it necessary to create a 
custom OutputFormat class?I was looking at the OutputFormat code and it looks 
like it also createsan "fs" object and starts writing, but perhaps there is 
some subtle difference in the context?
Thank you.
Sincerely, Mike

      From: Akhil Das <ak...@sigmoidanalytics.com>
 To: Michael Albert <m_albert...@yahoo.com> 
Cc: "user@spark.apache.org" <user@spark.apache.org> 
 Sent: Monday, January 5, 2015 1:21 AM
 Subject: Re: a vague question, but perhaps it might ring a bell
   
What are you trying to do? Can you paste the whole code? I used to see this 
sort of Exception when i close the fs object inside map/mapPartition etc.
ThanksBest Regards


On Mon, Jan 5, 2015 at 6:43 AM, Michael Albert <m_albert...@yahoo.com.invalid> 
wrote:

Greetings!
So, I think I have data saved so that each partition (part-r-00000, etc)is 
exactly what I wan to translate into an output file of a format not related to 
hadoop.
I believe I've figured out how to tell Spark to read the data set without 
re-partitioning (in another post I mentioned this -- I have a non-splitable 
InputFormat).
I do something like   mapPartitionWithIndex( (partId, iter) =>            conf 
= new Configuration()           fs = Filesystem.get(conf)           strm = 
fs.create(new Path(...))            //  write data to stream          
strm.close() // in finally block }
This runs for a few hundred input files (so each executors sees 10's of 
files),and it chugs along nicely, then suddenly everything shuts down.I can 
restart (telling it to skip the partIds which it has already completed), and 
itchugs along again for a while (going past the previous stopping point) and 
again dies.
I am a t a loss.  This work for the first 10's of files (so it runs for about 
1hr) then quits,and I see no useful error information (no Exceptions except the 
stuff below.I'm not shutting it down.
Any idea what I might check? I've bumped up the memory multiple times (16G 
currently)and fiddled with increasing other parameters.
Thanks.Exception in thread "main" org.apache.spark.SparkException: Job 
cancelled because SparkContext was shut down    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694)
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)    at 
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399)
    at 
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
    at 
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)    at 
akka.actor.ActorCell.terminate(ActorCell.scala:338)    at 
akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)    at 
akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)    at 
akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
           



  

Reply via email to