Hi Alejandro, asScala calls iterator() the first time and reduce() another time. These iterators can only be iterated once because they are possibly backed by multiple sorted files which have been spilled to disk and are merge-sorted while iterating.
I'm actually surprised that you found this code in the documentation. Maybe this worked for Scala 2.10... Anyway, there's a few ways to work around this. 1) use a ReduceFunction (after all you are calling reduce() in the GroupReduceFunction. 2) define the group reduce function as function with native Scala iterators: val myReduceFunc = (in: Iterator[(Double, Double)], out: Collector[Double]) => { val r = in.reduce ( (a, b) => (a._1, a._2 + b._2) ) out.collect(r._1 + r._2) } I've created a Jira issue to report the broken docs. [1] Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-10359 2018-09-14 18:52 GMT+02:00 Alejandro <algu...@gmail.com>: > Then how I am suppose to implement that function? > > On 09/14/2018 05:29 PM, 杨力 wrote: > > A java.util.Iterable is expected to provide iterators again and again. > > > > On Fri, Sep 14, 2018 at 10:53 PM Alejandro Alcalde <algu...@gmail.com> > > wrote: > > > >> Hello all, > >> > >> I am trying to replicate the code in the Docs ( > >> https://ci.apache.org/projects/flink/flink-docs- > release-1.6/dev/batch/dataset_transformations.html#combinable- > groupreducefunctions > >> ) > >> > >> But I keep getting the following exception: > >> > >> The Iterable can be iterated over only once. Only the first call to > >> 'iterator()' will succeed. > >> > >> This is what I have: > >> > >> class MyCombinableGroupReducer > >> extends GroupReduceFunction[(Double, Double), Double] > >> with GroupCombineFunction[(Double, Double), (Double, Double)] { > >> import collection.JavaConverters._ > >> override def reduce( > >> in: java.lang.Iterable[(Double, Double)], > >> out: Collector[Double]): Unit = > >> { > >> val r = in.asScala.reduce ( (a, b) => ///ERROR HAPPENS HERE > >> (a._1, a._2 + b._2) > >> ) > >> out.collect(r._1 + r._2) > >> } > >> > >> override def combine( > >> in: lang.Iterable[(Double, Double)], > >> out: Collector[(Double, Double)]): Unit = { > >> ??? > >> } > >> } > >> > >> Where am I transversing `in` a second time? may be is the call to > >> `asScala`? > >> > >> Bests > >> > >> *-- Alejandro Alcalde - elbauldelprogramador.com > >> <http://elbauldelprogramador.com>* > >> > > > > -- > elbauldelprogramador.com >