Yiannis, sorry for late response, It is indeed not possible to create new RDD inside of foreachPartitions, so you have to write data manually. I haven’t tried that and haven’t got such an exception, but I’d assume you might try to write locally and them upload it into HDFS. FileSystem has a specific method for that “copyFromLocalFile”.
Another approach would be to try to split RDD into multiple RDDs by key. You can get distinct keys, collect them on driver and have a loop over they keys and filter out new RDD out of the original one by that key. for( key : keys ) { RDD.filter( key ).saveAsTextfile() } It might help to cache original rdd. On 16 Jul 2015, at 12:21, Yiannis Gkoufas <johngou...@gmail.com> wrote: > Hi Eugene, > > thanks for your response! > Your recommendation makes sense, that's what I more or less tried. > The problem that I am facing is that inside foreachPartition() I cannot > create a new rdd and use saveAsTextFile. > It would probably make sense to write directly to HDFS using the Java API. > When I tried that I was getting errors similar to this: > > Failed on local exception: java.io.InterruptedIOException: Interruped while > waiting for IO on channel java.nio.channels.SocketChannel > > Probably it's hitting a race condition. > > Has anyone else faced this situation? Any suggestions? > > Thanks a lot! > > On 15 July 2015 at 14:04, Eugene Morozov <fathers...@list.ru> wrote: > Yiannis , > > It looks like you might explore other approach. > > sc.textFile("input/path") > .map() // your own implementation > .partitionBy(new HashPartitioner(num)) > .groupBy() //your own implementation, as a result - PairRDD of key vs > Iterable of values > .foreachPartition() > > On the last step you could sort all values for the key and store them into > separate file even into the same directory of all other files for other keys. > HashParititoner must guarantee that all values for specific key will reside > in just one partition, but it might happen that one partition might contain > more, than one key (with values). This I’m not sure, but that shouldn’t be a > big deal as you would iterate over tuple<key, Iterable<value>> and store one > key to a specific file. > > On 15 Jul 2015, at 03:23, Yiannis Gkoufas <johngou...@gmail.com> wrote: > >> Hi there, >> >> I have been using the approach described here: >> >> http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job >> >> In addition to that, I was wondering if there is a way to set the customize >> the order of those values contained in each file. >> >> Thanks a lot! > > Eugene Morozov > fathers...@list.ru > > > > > Eugene Morozov fathers...@list.ru