We have a similar case in which we don't want to save data to Cassandra if the data is empty. In our case, we filter the initial DStream to process messages that go to a given table.
To do so, we're using something like this: dstream.foreachRDD{ (rdd,time) => tables.foreach{ table => val filteredRdd = rdd.filter(record => predicate to assign records to tables) filteredRdd.cache if (filteredRdd.count>0) { filteredRdd.saveAsFoo(...) // we do here saveToCassandra, you could do saveAsTextFile(s"$path/$time") } filteredRdd.unpersist } Using the 'time' parameter you can implement an unique name based on the timestamp for the saveAsTextfile(filename) call which is what the Dstream.saveAsTextFile(...) gives you. (so it boils down to what Sean said... you implement the saveAs yourself) -kr, Gerard. @maasg On Tue, Dec 9, 2014 at 1:56 PM, Sean Owen <so...@cloudera.com> wrote: > I don't believe you can do this unless you implement the save to HDFS > logic yourself. To keep the semantics consistent, these saveAs* > methods will always output a file per partition. > > On Mon, Dec 8, 2014 at 11:53 PM, Hafiz Mujadid <hafizmujadi...@gmail.com> > wrote: > > Hi Experts! > > > > I want to save DStream to HDFS only if it is not empty such that it > contains > > some kafka messages to be stored. What is an efficient way to do this. > > > > var data = KafkaUtils.createStream[Array[Byte], Array[Byte], > > DefaultDecoder, DefaultDecoder] (ssc, params, topicMap, > > StorageLevel.MEMORY_ONLY).map(_._2) > > > > > > val streams = data.window(Seconds(interval*4), > > Seconds(interval*2)).map(x => new String(x)) > > //streams.foreachRDD(rdd=>rdd.foreach(println)) > > > > //what condition can be applied here to store only non empty DStream > > streams.saveAsTextFiles(sink, "msg") > > Thanks > > > > > > > > > > -- > > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Saving-Data-only-if-Dstream-is-not-empty-tp20587.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > For additional commands, e-mail: user-h...@spark.apache.org > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >