Ah! That sounds very much like what I need. A very basic question (most
likely), why is "rdd.cache()" critical? Isn't it already true that in Spark
Streaming DStream are cached in memory anyway?

Also any experience with minutes long batch interval?

Thanks for the quick answer!

On Sun, Dec 14, 2014 at 11:17 AM, Gerard Maas <gerard.m...@gmail.com> wrote:
>
> Hi Jean-Pascal,
>
> At Virdata we do a similar thing to 'bucketize' our data to different
> keyspaces in Cassandra.
>
> The basic construction would be to filter the DStream (or the underlying
> RDD) for each key and then apply the usual storage operations on that new
> data set.
> Given that, in your case, you need the data within the stream to apply the
> filter, you will need first to collect those keys in order to create the
> buckets.
>
> Something like this:
>
> val kafkaStream =  ???
> kafkaStream.foreachRDD{rdd  =>
>     rdd.cache() // very important!
>     val keys = rdd.map(elem => key(elem)).distinct.collect  // where
> key(...) is a function to get the desired key from each record
>     keys.foreach{ key =>
>         rdd.filter(elem=> key(elem) == key).saveAsObjectFile(...)
>     }
>     rdd.unpersist()
> }
>
> -kr, Gerard.
>
>
>
>
> On Sun, Dec 14, 2014 at 7:50 PM, Jean-Pascal Billaud <j...@tellapart.com>
> wrote:
>>
>> Hey,
>>
>> I am doing an experiment with Spark Streaming consisting of moving data
>> from Kafka to S3 locations while partitioning by date. I have already
>> looked into Linked Camus and Pinterest Secor and while both are workable
>> solutions, it just feels that Spark Streaming should be able to be on par
>> with those without having to manage yet another application in our stack
>> since we already have a Spark Streaming cluster in production.
>>
>> So what I am trying to do is very simple really. Each message in Kafka is
>> thrift serialized, and the corresponding thrift objects have a timestamp
>> field. What I'd like is to do is something like that:
>>
>> JavaPairDStream stream = KafkaUtils.createRawStream(...)
>> stream = stream.map(new PairFunction<Tuple2<Void, Log>, String, Log> {
>>   public Tuple2<String, Log> call(Tuple2<Void, Log> tuple) {
>>     return new Tuple2<>(tuple._2().getDate(), tuple._2());
>>   }
>> }
>>
>> At this point, I'd like to do some partitioning on the resulting DStream
>> to have multiple DStream each with a single common string Date... So for
>> instance in one DStream I would have all the entries from 12/01 and on
>> another the entries from 12/02. Once I have this list of DStream, for each
>> of them I would call saveAsObjectFiles() basically. I unfortunately did not
>> find a way to demultiplex DStream based on a key. Obviously the reduce
>> operation families does some of that but the result is still a single
>> DStream.
>>
>> An alternative approach would be to call forEachRDD() on the DStream and
>> demultiplex the entries into multiple new RDDs based on the timestamp to
>> bucketize the entries with the same day date in the same RDD and finally
>> call saveAsObjectFiles(). I am not sure if I can use parallelize() to
>> create those RDDs?
>>
>> Another thing that I am gonna be experimenting with is to use much longer
>> batching interval. I am talking in minutes because I don't want to have
>> bunch of tiny files. I might simply use a bigger Duration or use one of the
>> window operation. Not sure if anybody tries running Spark Streaming in that
>> way.
>>
>> Any thoughts on that would be much appreciated,
>>
>> Thanks!
>>
>

Reply via email to