Yiannis, 

sorry for late response, 
It is indeed not possible to create new RDD inside of foreachPartitions, so you 
have to write data manually. I haven’t tried that and haven’t got such an 
exception, but I’d assume you might try to write locally and them upload it 
into HDFS. FileSystem has a specific method for that “copyFromLocalFile”.

Another approach would be to try to split RDD into multiple RDDs by key. You 
can get distinct keys, collect them on driver and have a loop over they keys 
and filter out new RDD out of the original one by that key.

for( key : keys ) {
    RDD.filter( key ).saveAsTextfile()
}

It might help to cache original rdd.

On 16 Jul 2015, at 12:21, Yiannis Gkoufas <johngou...@gmail.com> wrote:

> Hi Eugene,
> 
> thanks for your response!
> Your recommendation makes sense, that's what I more or less tried.
> The problem that I am facing is that inside foreachPartition() I cannot 
> create a new rdd and use saveAsTextFile.
> It would probably make sense to write directly to HDFS using the Java API.
> When I tried that I was getting errors similar to this:
> 
> Failed on local exception: java.io.InterruptedIOException: Interruped while 
> waiting for IO on channel java.nio.channels.SocketChannel
> 
> Probably it's hitting a race condition.
> 
> Has anyone else faced this situation? Any suggestions?
> 
> Thanks a lot! 
> 
> On 15 July 2015 at 14:04, Eugene Morozov <fathers...@list.ru> wrote:
> Yiannis ,
> 
> It looks like you might explore other approach.
> 
> sc.textFile("input/path")
> .map() // your own implementation
> .partitionBy(new HashPartitioner(num))
> .groupBy() //your own implementation, as a result - PairRDD of key vs 
> Iterable of values
> .foreachPartition()
> 
> On the last step you could sort all values for the key and store them into 
> separate file even into the same directory of all other files for other keys. 
> HashParititoner must guarantee that all values for specific key will reside 
> in just one partition, but it might happen that one partition might contain 
> more, than one key (with values). This I’m not sure, but that shouldn’t be a 
> big deal as you would iterate over tuple<key, Iterable<value>> and store one 
> key to a specific file.
> 
> On 15 Jul 2015, at 03:23, Yiannis Gkoufas <johngou...@gmail.com> wrote:
> 
>> Hi there,
>> 
>> I have been using the approach described here:
>> 
>> http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job
>> 
>> In addition to that, I was wondering if there is a way to set the customize 
>> the order of those values contained in each file.
>> 
>> Thanks a lot!
> 
> Eugene Morozov
> fathers...@list.ru
> 
> 
> 
> 
> 

Eugene Morozov
fathers...@list.ru




Reply via email to