Hello all, I am trying to replicate the code in the Docs ( https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html#combinable-groupreducefunctions )
But I keep getting the following exception: The Iterable can be iterated over only once. Only the first call to 'iterator()' will succeed. This is what I have: class MyCombinableGroupReducer extends GroupReduceFunction[(Double, Double), Double] with GroupCombineFunction[(Double, Double), (Double, Double)] { import collection.JavaConverters._ override def reduce( in: java.lang.Iterable[(Double, Double)], out: Collector[Double]): Unit = { val r = in.asScala.reduce ( (a, b) => ///ERROR HAPPENS HERE (a._1, a._2 + b._2) ) out.collect(r._1 + r._2) } override def combine( in: lang.Iterable[(Double, Double)], out: Collector[(Double, Double)]): Unit = { ??? } } Where am I transversing `in` a second time? may be is the call to `asScala`? Bests *-- Alejandro Alcalde - elbauldelprogramador.com <http://elbauldelprogramador.com>*