Yiannis,
sorry for late response,
It is indeed not possible to create new RDD inside of foreachPartitions, so you
have to write data manually. I haven’t tried that and haven’t got such an
exception, but I’d assume you might try to write locally and them upload it
into HDFS. FileSystem has a specific method for that “copyFromLocalFile”.
Another approach would be to try to split RDD into multiple RDDs by key. You
can get distinct keys, collect them on driver and have a loop over they keys
and filter out new RDD out of the original one by that key.
for( key : keys ) {
RDD.filter( key ).saveAsTextfile()
}
It might help to cache original rdd.
On 16 Jul 2015, at 12:21, Yiannis Gkoufas <[email protected]> wrote:
> Hi Eugene,
>
> thanks for your response!
> Your recommendation makes sense, that's what I more or less tried.
> The problem that I am facing is that inside foreachPartition() I cannot
> create a new rdd and use saveAsTextFile.
> It would probably make sense to write directly to HDFS using the Java API.
> When I tried that I was getting errors similar to this:
>
> Failed on local exception: java.io.InterruptedIOException: Interruped while
> waiting for IO on channel java.nio.channels.SocketChannel
>
> Probably it's hitting a race condition.
>
> Has anyone else faced this situation? Any suggestions?
>
> Thanks a lot!
>
> On 15 July 2015 at 14:04, Eugene Morozov <[email protected]> wrote:
> Yiannis ,
>
> It looks like you might explore other approach.
>
> sc.textFile("input/path")
> .map() // your own implementation
> .partitionBy(new HashPartitioner(num))
> .groupBy() //your own implementation, as a result - PairRDD of key vs
> Iterable of values
> .foreachPartition()
>
> On the last step you could sort all values for the key and store them into
> separate file even into the same directory of all other files for other keys.
> HashParititoner must guarantee that all values for specific key will reside
> in just one partition, but it might happen that one partition might contain
> more, than one key (with values). This I’m not sure, but that shouldn’t be a
> big deal as you would iterate over tuple<key, Iterable<value>> and store one
> key to a specific file.
>
> On 15 Jul 2015, at 03:23, Yiannis Gkoufas <[email protected]> wrote:
>
>> Hi there,
>>
>> I have been using the approach described here:
>>
>> http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job
>>
>> In addition to that, I was wondering if there is a way to set the customize
>> the order of those values contained in each file.
>>
>> Thanks a lot!
>
> Eugene Morozov
> [email protected]
>
>
>
>
>
Eugene Morozov
[email protected]