Hi Surendra, Thanks for your suggestion. I tried MultipleOutputForma and MultipleTextOutputFormat. But the result was the same. The folder would always contain a single file part-r-00000, and this file gets overwritten everytime.
This is how I am invoking the API dataToWrite.saveAsHadoopFile(directory, String.class, String.class, MultipleOutputFormat.class); Am I missing something? Regards, Shridhar On Wed, Mar 23, 2016 at 11:55 AM, Surendra , Manchikanti < surendra.manchika...@gmail.com> wrote: > Hi Vetal, > > You may try with MultiOutPutFormat instead of TextOutPutFormat in > saveAsNewAPIHadoopFile(). > > Regards, > Surendra M > > -- Surendra Manchikanti > > On Tue, Mar 22, 2016 at 10:26 AM, vetal king <greenve...@gmail.com> wrote: > >> We are using Spark 1.4 for Spark Streaming. Kafka is data source for the >> Spark Stream. >> >> Records are published on Kafka every second. Our requirement is to store >> records published on Kafka in a single folder per minute. The stream will >> read records every five seconds. For instance records published during 1200 >> PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in >> folder "1201" and so on. >> >> The code I wrote is as follows >> >> //First Group records in RDD by date >> stream.foreachRDD (rddWithinStream -> { >> JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory = >> rddWithinStream.mapToPair(t -> { >> return new Tuple2<String, String> (targetHadoopFolder, t._2()); >> }).groupByKey(); >> // All records grouped by folders they will be stored in >> >> >> // Create RDD for each target folder. >> for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) { >> JavaPairRDD <String, Iterable<String>> rddByKey = >> rddGroupedByDirectory.filter(groupedTuples -> { >> return groupedTuples._1().equals(hadoopFolder); >> }); >> >> // And store it in Hadoop >> rddByKey.saveAsNewAPIHadoopFile(directory, String.class, String.class, >> TextOutputFormat.class); >> } >> >> Since the Stream processes data every five seconds, >> saveAsNewAPIHadoopFile gets invoked multiple times in a minute. This causes >> "Part-00000" file to be overwritten every time. >> >> I was expecting that in the directory specified by "directory" parameter, >> saveAsNewAPIHadoopFile will keep creating part-0000N file even when I've a >> sinlge worker node. >> >> Any help/alternatives are greatly appreciated. >> >> Thanks. >> > >