Another possible alternative is to register a StreamingListener and then
reference the BatchInfo.numRecords; good example here,
https://gist.github.com/akhld/b10dc491aad1a2007183.
After registering the listener, Simply implement the appropriate "onEvent"
method where onEvent is onBatchStarted, onBatchCompleted, ..., for example:
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted)
{ System.out.println("Batch completed, Total records :" + batchCompleted.
batchInfo().numRecords().get().toString()); } That should be very efficient
and avoid any collects(), just to obtain the count of records on the
DStream.
HTH.
-Todd
On Wed, Dec 16, 2015 at 3:34 PM, Bryan Cutler <[email protected]> wrote:
> To follow up with your other issue, if you are just trying to count
> elements in a DStream, you can do that without an Accumulator. foreachRDD
> is meant to be an output action, it does not return anything and it is
> actually run in the driver program. Because Java (before 8) handles
> closures a little differently, it might be easiest to implement the
> function to pass to foreachRDD as something like this:
>
> class MyFunc implements VoidFunction<JavaRDD<Integer>> {
>
> public long total = 0;
>
> @Override
> public void call(JavaRDD<Integer> rdd) {
> System.out.println("foo " + rdd.collect().toString());
> total += rdd.count();
> }
> }
>
> MyFunc f = new MyFunc();
>
> inputStream.foreachRDD(f);
>
> // f.total will have the count of all RDDs
>
> Hope that helps some!
>
> -bryan
>
> On Wed, Dec 16, 2015 at 8:37 AM, Bryan Cutler <[email protected]> wrote:
>
>> Hi Andy,
>>
>> Regarding the foreachrdd return value, this Jira that will be in 1.6
>> should take care of that https://issues.apache.org/jira/browse/SPARK-4557
>> and make things a little simpler.
>> On Dec 15, 2015 6:55 PM, "Andy Davidson" <[email protected]>
>> wrote:
>>
>>> I am writing a JUnit test for some simple streaming code. I want to
>>> make assertions about how many things are in a given JavaDStream. I wonder
>>> if there is an easier way in Java to get the count?
>>>
>>> I think there are two points of friction.
>>>
>>>
>>> 1. is it easy to create an accumulator of type double or int, How
>>> ever Long is not supported
>>> 2. We need to use javaDStream.foreachRDD. The Function interface
>>> must return void. I was not able to define an accumulator in my driver
>>> and use a lambda function. (I am new to lambda in Java)
>>>
>>> Here is a little lambda example that logs my test objects. I was not
>>> able to figure out how to get to return a value or access a accumulator
>>>
>>> data.foreachRDD(rdd -> {
>>>
>>> logger.info(“Begin data.foreachRDD" );
>>>
>>> for (MyPojo pojo : rdd.collect()) {
>>>
>>> logger.info("\n{}", pojo.toString());
>>>
>>> }
>>>
>>> return null;
>>>
>>> });
>>>
>>>
>>> Any suggestions would be greatly appreciated
>>>
>>> Andy
>>>
>>> This following code works in my driver but is a lot of code for such a
>>> trivial computation. Because it needs to the JavaSparkContext I do not
>>> think it would work inside a closure. I assume the works do not have access
>>> to the context as a global and that it shipping it in the closure is not a
>>> good idea?
>>>
>>> public class JavaDStreamCount<T> implements Serializable {
>>>
>>> private static final long serialVersionUID = -3600586183332429887L;
>>>
>>> public static Logger logger =
>>> LoggerFactory.getLogger(JavaDStreamCount.class);
>>>
>>>
>>>
>>> public Double hack(JavaSparkContext sc, JavaDStream<T> javaDStream)
>>> {
>>>
>>> Count c = new Count(sc);
>>>
>>> javaDStream.foreachRDD(c);
>>>
>>> return c.getTotal().value();
>>>
>>> }
>>>
>>>
>>>
>>> class Count implements Function<JavaRDD<T>,Void> {
>>>
>>> private static final long serialVersionUID =
>>> -5239727633710162488L;
>>>
>>> Accumulator<Double> total;
>>>
>>>
>>>
>>> public Count(JavaSparkContext sc) {
>>>
>>> total = sc.accumulator(0.0);
>>>
>>> }
>>>
>>>
>>>
>>> @Override
>>>
>>> public java.lang.Void call(JavaRDD<T> rdd) throws Exception {
>>>
>>> List<T> data = rdd.collect();
>>>
>>> int dataSize = data.size();
>>>
>>> logger.error("data.size:{}", dataSize);
>>>
>>> long num = rdd.count();
>>>
>>> logger.error("num:{}", num);
>>>
>>> total.add(new Double(num));
>>>
>>> return null;
>>>
>>> }
>>>
>>>
>>> public Accumulator<Double> getTotal() {
>>>
>>> return total;
>>>
>>> }
>>>
>>> }
>>>
>>> }
>>>
>>>
>>>
>>>
>>>
>