As you said, create a folder for each different minute, you can use the
rdd.time also as a timestamp.

Also you might want to have a look at the window function for the batching


On Tue, 22 Mar 2016, 17:43 vetal king, <greenve...@gmail.com> wrote:

> Hi Cody,
>
> Thanks for your reply.
>
> Five seconds batch and one min publishing interval is just a
> representative example. What we want is, to group data over a certain
> frequency. That frequency is configurable. One way we think it can be
> achieved is "directory"  will be created per this frequency, and in this
> directory we will create folders at when the stream receives data.
> Something like
>
> rddByKey.saveAsNewAPIHadoopFile(directory + "-" + <current time in
> milliseconds OR some random number>, String.class, String.class,
> TextOutputFormat.class).
>
> But I think it will be too much of nested directory structure, and it
> sounds too inefficient as well. since there will be a lot of small files.
>
> Shridhar
>
>
>
>
> On Tue, Mar 22, 2016 at 11:00 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> If you want 1 minute granularity, why not use a 1 minute batch time?
>>
>> Also, HDFS is not a great match for this kind of thing, because of the
>> small files issue.
>>
>> On Tue, Mar 22, 2016 at 12:26 PM, vetal king <greenve...@gmail.com>
>> wrote:
>> > We are using Spark 1.4 for Spark Streaming. Kafka is data source for the
>> > Spark Stream.
>> >
>> > Records are published on Kafka every second. Our requirement is to store
>> > records published on Kafka in a single folder per minute. The stream
>> will
>> > read records every five seconds. For instance records published during
>> 1200
>> > PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in
>> > folder "1201" and so on.
>> >
>> > The code I wrote is as follows
>> >
>> > //First Group records in RDD by date
>> > stream.foreachRDD (rddWithinStream -> {
>> >     JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory =
>> > rddWithinStream.mapToPair(t -> {
>> >     return new Tuple2<String, String> (targetHadoopFolder, t._2());
>> > }).groupByKey();
>> > // All records grouped by folders they will be stored in
>> >
>> >
>> > // Create RDD for each target folder.
>> > for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) {
>> >     JavaPairRDD <String, Iterable<String>> rddByKey =
>> > rddGroupedByDirectory.filter(groupedTuples -> {
>> >     return groupedTuples._1().equals(hadoopFolder);
>> >     });
>> >
>> > // And store it in Hadoop
>> >     rddByKey.saveAsNewAPIHadoopFile(directory, String.class,
>> String.class,
>> > TextOutputFormat.class);
>> > }
>> >
>> > Since the Stream processes data every five seconds,
>> saveAsNewAPIHadoopFile
>> > gets invoked multiple times in a minute. This causes "Part-00000" file
>> to be
>> > overwritten every time.
>> >
>> > I was expecting that in the directory specified by "directory"
>> parameter,
>> > saveAsNewAPIHadoopFile will keep creating part-0000N file even when
>> I've a
>> > sinlge worker node.
>> >
>> > Any help/alternatives are greatly appreciated.
>> >
>> > Thanks.
>>
>
>

Reply via email to