I dont understand about the race condition comment you mention. Have you seen this somewhere? That timestamp will be the same on each worker for that rdd, and each worker is handling a different partition which will be reflected on the filename, so no data will be overwriting. In fact this is what saveAsNewHadoopFile on a DStream is doing as far as I recall
On Fri, 25 Mar 2016, 11:22 vetal king, <greenve...@gmail.com> wrote: > Hi Surendra, > > Thanks for your suggestion. I tried MultipleOutputForma and > MultipleTextOutputFormat. But the result was the same. The folder would > always contain a single file part-r-00000, and this file gets overwritten > everytime. > > This is how I am invoking the API > dataToWrite.saveAsHadoopFile(directory, String.class, > String.class, MultipleOutputFormat.class); > Am I missing something? > > Regards, > Shridhar > > > On Wed, Mar 23, 2016 at 11:55 AM, Surendra , Manchikanti < > surendra.manchika...@gmail.com> wrote: > >> Hi Vetal, >> >> You may try with MultiOutPutFormat instead of TextOutPutFormat in >> saveAsNewAPIHadoopFile(). >> >> Regards, >> Surendra M >> >> -- Surendra Manchikanti >> >> On Tue, Mar 22, 2016 at 10:26 AM, vetal king <greenve...@gmail.com> >> wrote: >> >>> We are using Spark 1.4 for Spark Streaming. Kafka is data source for the >>> Spark Stream. >>> >>> Records are published on Kafka every second. Our requirement is to store >>> records published on Kafka in a single folder per minute. The stream will >>> read records every five seconds. For instance records published during 1200 >>> PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in >>> folder "1201" and so on. >>> >>> The code I wrote is as follows >>> >>> //First Group records in RDD by date >>> stream.foreachRDD (rddWithinStream -> { >>> JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory = >>> rddWithinStream.mapToPair(t -> { >>> return new Tuple2<String, String> (targetHadoopFolder, t._2()); >>> }).groupByKey(); >>> // All records grouped by folders they will be stored in >>> >>> >>> // Create RDD for each target folder. >>> for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) { >>> JavaPairRDD <String, Iterable<String>> rddByKey = >>> rddGroupedByDirectory.filter(groupedTuples -> { >>> return groupedTuples._1().equals(hadoopFolder); >>> }); >>> >>> // And store it in Hadoop >>> rddByKey.saveAsNewAPIHadoopFile(directory, String.class, String.class, >>> TextOutputFormat.class); >>> } >>> >>> Since the Stream processes data every five seconds, >>> saveAsNewAPIHadoopFile gets invoked multiple times in a minute. This causes >>> "Part-00000" file to be overwritten every time. >>> >>> I was expecting that in the directory specified by "directory" >>> parameter, saveAsNewAPIHadoopFile will keep creating part-0000N file even >>> when I've a sinlge worker node. >>> >>> Any help/alternatives are greatly appreciated. >>> >>> Thanks. >>> >> >> >