Hello Robert, Yup thank you, I used array in place of collection to rid of that error.
Best Regards, Subash Basnet On Thu, Jul 14, 2016 at 4:03 PM, Robert Metzger <rmetz...@apache.org> wrote: > [image: Boxbe] <https://www.boxbe.com/overview> This message is eligible > for Automatic Cleanup! (rmetz...@apache.org) Add cleanup rule > <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Fkey%3DCZ%252B98%252FGcph9i5Tf4IcCS5WdTsMyaPwNQgoT5v2mDPck%253D%26token%3DnGu4AwzA3Rvy6YJDWxK4%252F91R1%252FrQTup%252B0VJe4FmMlw8o9fGVjAdY0i1vFX4uI2iSPw5eL%252F6LBylh50pRjmJllstucFupFLt7zVMaJqGMTqmBb0W9OtL3YrEFHsRxSmf3HrnUSDPzyvc%253D&tc_serial=26086681629&tc_rand=2115848433&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> > | More info > <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=26086681629&tc_rand=2115848433&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> > > Hi Subash, > > The problem you are facing is not related to Flink. The problem is that > the "centroids" field is not initialized, which is general Java issue. > Please keep in mind that this list is not the best forum for Java questions. > > Regards, > Robert > > > > On Wed, Jul 13, 2016 at 6:45 PM, subash basnet <yasub...@gmail.com> wrote: > >> Hello all, >> >> I need to collect the centroids to find out the nearest center for each >> point. >> DataStream<Point> points = newDataStream.map(new getPoints()); >> DataStream<Centroid> *centroids* = newCentroidDataStream.map(new >> TupleCentroidConverter()); >> ConnectedIterativeStreams<Point, Centroid> loop = >> points.iterate().withFeedbackType(Centroid.class); >> DataStream<Centroid> newCentroids = loop.flatMap(new >> SelectNearestCenter()).map(new CountAppender()).keyBy(0) >> .reduce(new CentroidAccumulator()).map(new CentroidAverager()); >> DataStream<Centroid> finalCentroids = >> loop.closeWith(newCentroids.broadcast()); >> >> >> But I am getting null pointer exception for collection of centroids when >> trying to add a centroid in *flatMap2. *Below is my code, how could I >> get rid of this null pointer exception, any other ideas. >> >> >> public static final class SelectNearestCenter implements >> CoFlatMapFunction<Point, Centroid, Tuple2<String, Point>> { >> private Collection<Centroid> *centroids*; >> @Override >> public void flatMap1(Point p, Collector<Tuple2<String, Point>> out) >> throws Exception { >> if (centroids != null) { >> if (centroids.size() > 19) { >> // let's assume minimum size 20 for now >> for (Centroid centroid : *centroids*) { >> ..... >> } >> } >> } >> // emit a new record with the center id and the data >> point. >> out.collect(new Tuple2<String, Point>(closestCentroidId, p)); >> } >> >> @Override >> public void flatMap2(Centroid value, Collector<Tuple2<String, Point>> >> out) throws Exception { >> *centroids*.add(value); >> } >> } >> >> >> >> The instatation as below is not allowed. So it always throws null pointer >> exception. >> private Collection<Centroid> *centroids = * new Collection<Centroid>(); >> >> >> Best Regards, >> Subash Basnet >> > > >