As you said, create a folder for each different minute, you can use the rdd.time also as a timestamp.
Also you might want to have a look at the window function for the batching On Tue, 22 Mar 2016, 17:43 vetal king, <greenve...@gmail.com> wrote: > Hi Cody, > > Thanks for your reply. > > Five seconds batch and one min publishing interval is just a > representative example. What we want is, to group data over a certain > frequency. That frequency is configurable. One way we think it can be > achieved is "directory" will be created per this frequency, and in this > directory we will create folders at when the stream receives data. > Something like > > rddByKey.saveAsNewAPIHadoopFile(directory + "-" + <current time in > milliseconds OR some random number>, String.class, String.class, > TextOutputFormat.class). > > But I think it will be too much of nested directory structure, and it > sounds too inefficient as well. since there will be a lot of small files. > > Shridhar > > > > > On Tue, Mar 22, 2016 at 11:00 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> If you want 1 minute granularity, why not use a 1 minute batch time? >> >> Also, HDFS is not a great match for this kind of thing, because of the >> small files issue. >> >> On Tue, Mar 22, 2016 at 12:26 PM, vetal king <greenve...@gmail.com> >> wrote: >> > We are using Spark 1.4 for Spark Streaming. Kafka is data source for the >> > Spark Stream. >> > >> > Records are published on Kafka every second. Our requirement is to store >> > records published on Kafka in a single folder per minute. The stream >> will >> > read records every five seconds. For instance records published during >> 1200 >> > PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in >> > folder "1201" and so on. >> > >> > The code I wrote is as follows >> > >> > //First Group records in RDD by date >> > stream.foreachRDD (rddWithinStream -> { >> > JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory = >> > rddWithinStream.mapToPair(t -> { >> > return new Tuple2<String, String> (targetHadoopFolder, t._2()); >> > }).groupByKey(); >> > // All records grouped by folders they will be stored in >> > >> > >> > // Create RDD for each target folder. >> > for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) { >> > JavaPairRDD <String, Iterable<String>> rddByKey = >> > rddGroupedByDirectory.filter(groupedTuples -> { >> > return groupedTuples._1().equals(hadoopFolder); >> > }); >> > >> > // And store it in Hadoop >> > rddByKey.saveAsNewAPIHadoopFile(directory, String.class, >> String.class, >> > TextOutputFormat.class); >> > } >> > >> > Since the Stream processes data every five seconds, >> saveAsNewAPIHadoopFile >> > gets invoked multiple times in a minute. This causes "Part-00000" file >> to be >> > overwritten every time. >> > >> > I was expecting that in the directory specified by "directory" >> parameter, >> > saveAsNewAPIHadoopFile will keep creating part-0000N file even when >> I've a >> > sinlge worker node. >> > >> > Any help/alternatives are greatly appreciated. >> > >> > Thanks. >> > >