I haven't done anything else than performance tuning on Spark Streaming for the past weeks. rdd.cache makes a huge difference. A must in this case where you want to iterate over the same RDD several times.
Intuitively, I also thought that all data was in memory already so that wouldn't make a difference and I was very surprised to see stage times dropping from seconds to ms when cache() was present. Our intervals are 10-12 seconds long. I've not tried batches of minutes yet. Probably the best way would be to use window functions for that. Although something in the 1-5 minute range should be doable as well. -kr, Gerard. On Sun, Dec 14, 2014 at 8:25 PM, Jean-Pascal Billaud <j...@tellapart.com> wrote: > > Ah! That sounds very much like what I need. A very basic question (most > likely), why is "rdd.cache()" critical? Isn't it already true that in Spark > Streaming DStream are cached in memory anyway? > > Also any experience with minutes long batch interval? > > Thanks for the quick answer! > > On Sun, Dec 14, 2014 at 11:17 AM, Gerard Maas <gerard.m...@gmail.com> > wrote: >> >> Hi Jean-Pascal, >> >> At Virdata we do a similar thing to 'bucketize' our data to different >> keyspaces in Cassandra. >> >> The basic construction would be to filter the DStream (or the underlying >> RDD) for each key and then apply the usual storage operations on that new >> data set. >> Given that, in your case, you need the data within the stream to apply >> the filter, you will need first to collect those keys in order to create >> the buckets. >> >> Something like this: >> >> val kafkaStream = ??? >> kafkaStream.foreachRDD{rdd => >> rdd.cache() // very important! >> val keys = rdd.map(elem => key(elem)).distinct.collect // where >> key(...) is a function to get the desired key from each record >> keys.foreach{ key => >> rdd.filter(elem=> key(elem) == key).saveAsObjectFile(...) >> } >> rdd.unpersist() >> } >> >> -kr, Gerard. >> >> >> >> >> On Sun, Dec 14, 2014 at 7:50 PM, Jean-Pascal Billaud <j...@tellapart.com> >> wrote: >>> >>> Hey, >>> >>> I am doing an experiment with Spark Streaming consisting of moving data >>> from Kafka to S3 locations while partitioning by date. I have already >>> looked into Linked Camus and Pinterest Secor and while both are workable >>> solutions, it just feels that Spark Streaming should be able to be on par >>> with those without having to manage yet another application in our stack >>> since we already have a Spark Streaming cluster in production. >>> >>> So what I am trying to do is very simple really. Each message in Kafka >>> is thrift serialized, and the corresponding thrift objects have a timestamp >>> field. What I'd like is to do is something like that: >>> >>> JavaPairDStream stream = KafkaUtils.createRawStream(...) >>> stream = stream.map(new PairFunction<Tuple2<Void, Log>, String, Log> { >>> public Tuple2<String, Log> call(Tuple2<Void, Log> tuple) { >>> return new Tuple2<>(tuple._2().getDate(), tuple._2()); >>> } >>> } >>> >>> At this point, I'd like to do some partitioning on the resulting DStream >>> to have multiple DStream each with a single common string Date... So for >>> instance in one DStream I would have all the entries from 12/01 and on >>> another the entries from 12/02. Once I have this list of DStream, for each >>> of them I would call saveAsObjectFiles() basically. I unfortunately did not >>> find a way to demultiplex DStream based on a key. Obviously the reduce >>> operation families does some of that but the result is still a single >>> DStream. >>> >>> An alternative approach would be to call forEachRDD() on the DStream and >>> demultiplex the entries into multiple new RDDs based on the timestamp to >>> bucketize the entries with the same day date in the same RDD and finally >>> call saveAsObjectFiles(). I am not sure if I can use parallelize() to >>> create those RDDs? >>> >>> Another thing that I am gonna be experimenting with is to use much >>> longer batching interval. I am talking in minutes because I don't want to >>> have bunch of tiny files. I might simply use a bigger Duration or use one >>> of the window operation. Not sure if anybody tries running Spark Streaming >>> in that way. >>> >>> Any thoughts on that would be much appreciated, >>> >>> Thanks! >>> >>