Another possible alternative is to register a StreamingListener and then reference the BatchInfo.numRecords; good example here, https://gist.github.com/akhld/b10dc491aad1a2007183.
After registering the listener, Simply implement the appropriate "onEvent" method where onEvent is onBatchStarted, onBatchCompleted, ..., for example: public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) { System.out.println("Batch completed, Total records :" + batchCompleted. batchInfo().numRecords().get().toString()); } That should be very efficient and avoid any collects(), just to obtain the count of records on the DStream. HTH. -Todd On Wed, Dec 16, 2015 at 3:34 PM, Bryan Cutler <cutl...@gmail.com> wrote: > To follow up with your other issue, if you are just trying to count > elements in a DStream, you can do that without an Accumulator. foreachRDD > is meant to be an output action, it does not return anything and it is > actually run in the driver program. Because Java (before 8) handles > closures a little differently, it might be easiest to implement the > function to pass to foreachRDD as something like this: > > class MyFunc implements VoidFunction<JavaRDD<Integer>> { > > public long total = 0; > > @Override > public void call(JavaRDD<Integer> rdd) { > System.out.println("foo " + rdd.collect().toString()); > total += rdd.count(); > } > } > > MyFunc f = new MyFunc(); > > inputStream.foreachRDD(f); > > // f.total will have the count of all RDDs > > Hope that helps some! > > -bryan > > On Wed, Dec 16, 2015 at 8:37 AM, Bryan Cutler <cutl...@gmail.com> wrote: > >> Hi Andy, >> >> Regarding the foreachrdd return value, this Jira that will be in 1.6 >> should take care of that https://issues.apache.org/jira/browse/SPARK-4557 >> and make things a little simpler. >> On Dec 15, 2015 6:55 PM, "Andy Davidson" <a...@santacruzintegration.com> >> wrote: >> >>> I am writing a JUnit test for some simple streaming code. I want to >>> make assertions about how many things are in a given JavaDStream. I wonder >>> if there is an easier way in Java to get the count? >>> >>> I think there are two points of friction. >>> >>> >>> 1. is it easy to create an accumulator of type double or int, How >>> ever Long is not supported >>> 2. We need to use javaDStream.foreachRDD. The Function interface >>> must return void. I was not able to define an accumulator in my driver >>> and use a lambda function. (I am new to lambda in Java) >>> >>> Here is a little lambda example that logs my test objects. I was not >>> able to figure out how to get to return a value or access a accumulator >>> >>> data.foreachRDD(rdd -> { >>> >>> logger.info(“Begin data.foreachRDD" ); >>> >>> for (MyPojo pojo : rdd.collect()) { >>> >>> logger.info("\n{}", pojo.toString()); >>> >>> } >>> >>> return null; >>> >>> }); >>> >>> >>> Any suggestions would be greatly appreciated >>> >>> Andy >>> >>> This following code works in my driver but is a lot of code for such a >>> trivial computation. Because it needs to the JavaSparkContext I do not >>> think it would work inside a closure. I assume the works do not have access >>> to the context as a global and that it shipping it in the closure is not a >>> good idea? >>> >>> public class JavaDStreamCount<T> implements Serializable { >>> >>> private static final long serialVersionUID = -3600586183332429887L; >>> >>> public static Logger logger = >>> LoggerFactory.getLogger(JavaDStreamCount.class); >>> >>> >>> >>> public Double hack(JavaSparkContext sc, JavaDStream<T> javaDStream) >>> { >>> >>> Count c = new Count(sc); >>> >>> javaDStream.foreachRDD(c); >>> >>> return c.getTotal().value(); >>> >>> } >>> >>> >>> >>> class Count implements Function<JavaRDD<T>,Void> { >>> >>> private static final long serialVersionUID = >>> -5239727633710162488L; >>> >>> Accumulator<Double> total; >>> >>> >>> >>> public Count(JavaSparkContext sc) { >>> >>> total = sc.accumulator(0.0); >>> >>> } >>> >>> >>> >>> @Override >>> >>> public java.lang.Void call(JavaRDD<T> rdd) throws Exception { >>> >>> List<T> data = rdd.collect(); >>> >>> int dataSize = data.size(); >>> >>> logger.error("data.size:{}", dataSize); >>> >>> long num = rdd.count(); >>> >>> logger.error("num:{}", num); >>> >>> total.add(new Double(num)); >>> >>> return null; >>> >>> } >>> >>> >>> public Accumulator<Double> getTotal() { >>> >>> return total; >>> >>> } >>> >>> } >>> >>> } >>> >>> >>> >>> >>> >