Hi, When I try to implement https://issues.apache.org/jira/browse/FLINK-5498 via "dataset.coGroup(another dataset)" with a generated CoGroupFunction.(CoGroupFunction interface: public void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out)
I couldn't get the right results, then I saw the backend Iterator did not supply a new instance when invoked the "Iterable.iterator()" after debugging. (see org.apache.flink.api.common.operators.util.ListKeyGroupedIterator, it differs from usual iterable collections in java which will implement the iterator() method that supply a new iterator instance for the collection. And this is not mentioned either in comments or document.) IMO, iterable collections' new iterator instance requirements probably useful for other cases, so is it necessary to add this feature? Greatful if someone can tell me the motivation that ListKeyGroupedIterator didn't supply a new iterator instance. What do you think? Best, Lincoln