Another possible alternative is to register a StreamingListener and then
reference the BatchInfo.numRecords; good example here,
https://gist.github.com/akhld/b10dc491aad1a2007183.

After registering the listener, Simply implement the appropriate "onEvent"
method where onEvent is onBatchStarted, onBatchCompleted, ..., for example:

public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted)
{ System.out.println("Batch completed, Total records :" + batchCompleted.
batchInfo().numRecords().get().toString()); } That should be very efficient
and avoid any collects(), just to obtain the count of records on the
DStream.

HTH.

-Todd

On Wed, Dec 16, 2015 at 3:34 PM, Bryan Cutler <cutl...@gmail.com> wrote:

> To follow up with your other issue, if you are just trying to count
> elements in a DStream, you can do that without an Accumulator.  foreachRDD
> is meant to be an output action, it does not return anything and it is
> actually run in the driver program.  Because Java (before 8) handles
> closures a little differently, it might be easiest to implement the
> function to pass to foreachRDD as something like this:
>
> class MyFunc implements VoidFunction<JavaRDD<Integer>> {
>
>       public long total = 0;
>
>       @Override
>       public void call(JavaRDD<Integer> rdd) {
>         System.out.println("foo " + rdd.collect().toString());
>         total += rdd.count();
>       }
>     }
>
> MyFunc f = new MyFunc();
>
> inputStream.foreachRDD(f);
>
> // f.total will have the count of all RDDs
>
> Hope that helps some!
>
> -bryan
>
> On Wed, Dec 16, 2015 at 8:37 AM, Bryan Cutler <cutl...@gmail.com> wrote:
>
>> Hi Andy,
>>
>> Regarding the foreachrdd return value, this Jira that will be in 1.6
>> should take care of that https://issues.apache.org/jira/browse/SPARK-4557
>> and make things a little simpler.
>> On Dec 15, 2015 6:55 PM, "Andy Davidson" <a...@santacruzintegration.com>
>> wrote:
>>
>>> I am writing  a JUnit test for some simple streaming code. I want to
>>> make assertions about how many things are in a given JavaDStream. I wonder
>>> if there is an easier way in Java to get the count?
>>>
>>> I think there are two points of friction.
>>>
>>>
>>>    1. is it easy to create an accumulator of type double or int, How
>>>    ever Long is not supported
>>>    2. We need to use javaDStream.foreachRDD. The Function interface
>>>    must return void. I was not able to define an accumulator in my driver
>>>    and use a lambda function. (I am new to lambda in Java)
>>>
>>> Here is a little lambda example that logs my test objects. I was not
>>> able to figure out how to get  to return a value or access a accumulator
>>>
>>>        data.foreachRDD(rdd -> {
>>>
>>>             logger.info(“Begin data.foreachRDD" );
>>>
>>>             for (MyPojo pojo : rdd.collect()) {
>>>
>>>                 logger.info("\n{}", pojo.toString());
>>>
>>>             }
>>>
>>>             return null;
>>>
>>>         });
>>>
>>>
>>> Any suggestions would be greatly appreciated
>>>
>>> Andy
>>>
>>> This following code works in my driver but is a lot of code for such a
>>> trivial computation. Because it needs to the JavaSparkContext I do not
>>> think it would work inside a closure. I assume the works do not have access
>>> to the context as a global and that it shipping it in the closure is not a
>>> good idea?
>>>
>>> public class JavaDStreamCount<T> implements Serializable {
>>>
>>>     private static final long serialVersionUID = -3600586183332429887L;
>>>
>>>     public static Logger logger =
>>> LoggerFactory.getLogger(JavaDStreamCount.class);
>>>
>>>
>>>
>>>     public Double hack(JavaSparkContext sc, JavaDStream<T> javaDStream)
>>> {
>>>
>>>         Count c = new Count(sc);
>>>
>>>         javaDStream.foreachRDD(c);
>>>
>>>         return c.getTotal().value();
>>>
>>>     }
>>>
>>>
>>>
>>>     class Count implements Function<JavaRDD<T>,Void> {
>>>
>>>         private static final long serialVersionUID =
>>> -5239727633710162488L;
>>>
>>>         Accumulator<Double> total;
>>>
>>>
>>>
>>>         public Count(JavaSparkContext sc) {
>>>
>>>             total = sc.accumulator(0.0);
>>>
>>>         }
>>>
>>>
>>>
>>>         @Override
>>>
>>>         public java.lang.Void call(JavaRDD<T> rdd) throws Exception {
>>>
>>>             List<T> data = rdd.collect();
>>>
>>>             int dataSize = data.size();
>>>
>>>             logger.error("data.size:{}", dataSize);
>>>
>>>             long num = rdd.count();
>>>
>>>             logger.error("num:{}", num);
>>>
>>>             total.add(new Double(num));
>>>
>>>             return null;
>>>
>>>         }
>>>
>>>
>>>         public Accumulator<Double> getTotal() {
>>>
>>>             return total;
>>>
>>>         }
>>>
>>>     }
>>>
>>> }
>>>
>>>
>>>
>>>
>>>
>

Reply via email to