I am writing  a JUnit test for some simple streaming code. I want to make
assertions about how many things are in a given JavaDStream. I wonder if
there is an easier way in Java to get the count?

I think there are two points of friction.

1. is it easy to create an accumulator of type double or int, How ever Long
is not supported
2. We need to use javaDStream.foreachRDD. The Function interface must return
void. I was not able to define an accumulator in my driver and use a lambda
function. (I am new to lambda in Java)
Here is a little lambda example that logs my test objects. I was not able to
figure out how to get  to return a value or access a accumulator
       data.foreachRDD(rdd -> {

            logger.info(³Begin data.foreachRDD" );

            for (MyPojo pojo : rdd.collect()) {

                logger.info("\n{}", pojo.toString());

            }

            return null;

        });



Any suggestions would be greatly appreciated

Andy

This following code works in my driver but is a lot of code for such a
trivial computation. Because it needs to the JavaSparkContext I do not think
it would work inside a closure. I assume the works do not have access to the
context as a global and that it shipping it in the closure is not a good
idea?

public class JavaDStreamCount<T> implements Serializable {

    private static final long serialVersionUID = -3600586183332429887L;

    public static Logger logger =
LoggerFactory.getLogger(JavaDStreamCount.class);

    

    public Double hack(JavaSparkContext sc, JavaDStream<T> javaDStream) {

        Count c = new Count(sc);

        javaDStream.foreachRDD(c);

        return c.getTotal().value();

    }

    

    class Count implements Function<JavaRDD<T>,Void> {

        private static final long serialVersionUID = -5239727633710162488L;

        Accumulator<Double> total;

        

        public Count(JavaSparkContext sc) {

            total = sc.accumulator(0.0);

        }

        

        @Override

        public java.lang.Void call(JavaRDD<T> rdd) throws Exception {

            List<T> data = rdd.collect();

            int dataSize = data.size();

            logger.error("data.size:{}", dataSize);

            long num = rdd.count();

            logger.error("num:{}", num);

            total.add(new Double(num));

            return null;

        }



        public Accumulator<Double> getTotal() {

            return total;

        }

    }

}








Reply via email to