Hi Surendra,

Thanks for your suggestion. I tried MultipleOutputForma and
MultipleTextOutputFormat. But the result was the same. The folder would
always contain a single file part-r-00000, and this file gets overwritten
everytime.

This is how I am invoking the API
            dataToWrite.saveAsHadoopFile(directory, String.class,
String.class, MultipleOutputFormat.class);
Am I missing something?

Regards,
Shridhar


On Wed, Mar 23, 2016 at 11:55 AM, Surendra , Manchikanti <
surendra.manchika...@gmail.com> wrote:

> Hi Vetal,
>
> You may try with MultiOutPutFormat instead of TextOutPutFormat in
> saveAsNewAPIHadoopFile().
>
> Regards,
> Surendra M
>
> -- Surendra Manchikanti
>
> On Tue, Mar 22, 2016 at 10:26 AM, vetal king <greenve...@gmail.com> wrote:
>
>> We are using Spark 1.4 for Spark Streaming. Kafka is data source for the
>> Spark Stream.
>>
>> Records are published on Kafka every second. Our requirement is to store
>> records published on Kafka in a single folder per minute. The stream will
>> read records every five seconds. For instance records published during 1200
>> PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in
>> folder "1201" and so on.
>>
>> The code I wrote is as follows
>>
>> //First Group records in RDD by date
>> stream.foreachRDD (rddWithinStream -> {
>>     JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory = 
>> rddWithinStream.mapToPair(t -> {
>>     return new Tuple2<String, String> (targetHadoopFolder, t._2());
>> }).groupByKey();
>> // All records grouped by folders they will be stored in
>>
>>
>> // Create RDD for each target folder.
>> for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) {
>>     JavaPairRDD <String, Iterable<String>> rddByKey = 
>> rddGroupedByDirectory.filter(groupedTuples -> {
>>     return groupedTuples._1().equals(hadoopFolder);
>>     });
>>
>> // And store it in Hadoop
>>     rddByKey.saveAsNewAPIHadoopFile(directory, String.class, String.class, 
>> TextOutputFormat.class);
>> }
>>
>> Since the Stream processes data every five seconds,
>> saveAsNewAPIHadoopFile gets invoked multiple times in a minute. This causes
>> "Part-00000" file to be overwritten every time.
>>
>> I was expecting that in the directory specified by "directory" parameter,
>> saveAsNewAPIHadoopFile will keep creating part-0000N file even when I've a
>> sinlge worker node.
>>
>> Any help/alternatives are greatly appreciated.
>>
>> Thanks.
>>
>
>

Reply via email to