Hello all, I tried to check if it works for tuple but same problem, the collection still shows blank result. I took the id of centroid tuple and printed it, but the collection displays empty.
DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter()); DataStream<Tuple1<String>> centroidId = centroids.map(new TestMethod()); centroidId.print(); Iterator<Tuple1<String>> iter = DataStreamUtils.collect(centroidId); Collection<Tuple1<String>> testCentroids = Lists.newArrayList(iter); for (Tuple1<String> c : testCentroids) { System.out.println(c); } Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(), but no output for System.out.println(c); Best Regards, Subash Basnet On Tue, Jul 19, 2016 at 10:48 AM, subash basnet <yasub...@gmail.com> wrote: > Hello all, > > I am trying to convert datastream to collection, but it's shows blank > result. There is a stream of data which can be viewed on the console on > print(), but the collection of the same stream shows empty after > conversion. Below is the code: > > DataStream<Centroid> centroids = newCentroidDataStream.map(new > TupleCentroidConverter()); > centroids.print(); > Iterator<Centroid> iter = DataStreamUtils.collect(centroids); > Collection<Centroid> testCentroids = Lists.newArrayList(iter); > for(Centroid c: testCentroids){ > System.out.println(c); > } > > The above *centroids.print()* gives the following output in console: > > Mon Jul 18 21:29:01 CEST 2016 119.3701 119.4 119.3701 119.38 27400.0 > Mon Jul 18 21:23:00 CEST 2016 119.3463 119.37 119.315 119.37 48200.0 > Mon Jul 18 21:27:59 CEST 2016 119.3401 119.3401 119.26 119.265 50300.0 > Mon Jul 18 21:36:00 CEST 2016 119.48 119.505 119.47 119.4741 37400.0 > Mon Jul 18 21:33:00 CEST 2016 119.535 119.54 119.445 119.455 152900.0 > > But the next *System.out.println(c) *within the for loop prints nothing. > What could be the problem. > > My maven has following configuration for dataStreamUtils: > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-contrib_2.10</artifactId> > <version>${flink.version}</version> > </dependency> > > > Best Regards, > Subash Basnet > >