Based on the pseudo code. Seems like you are trying to do the loop by yourself and not suing the iterative.map() function[1].
I think you would need to specify the "map" function in order to use the iterative stream. and there should be a clear definition on which data is iterative. In this case you have label & vertices interlacing each other but no specific loop back. I would suggest something close to the example in [1], like *labelsVerticesGroup* = DataStream<initial_label, *vertices*> *labels* = *labelsVerticesGroup*.map(...) .keyBy(VertexID) .window(...) .min(label); *vertices* = *labelsVerticesGroup*.map(...) *updatedLabelsVerticesGroup* = *vertices*.join(*labels* ).where(VertexId).equalTo(VertexId) .windowAll(...) .agg(...) *labelsVerticesGroup*.closeWith(*updatedLabelsVerticesGroup**)* Is this what you are looking for? -- Rong Reference: [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#iterations On Thu, May 10, 2018 at 9:50 AM, Henrique Colao Zanuz < henrique.co...@gmail.com> wrote: > Hi, > > I am trying to implement a connected components algorithm using > DataStream. For this algorithm, I'm separating the data by tumbling > windows. So, for each window, I'm trying to compute it independently. > This algorithm is iterative because the labels (colors) of the vertices > need to be propagated. Basically, I need to iterate over the following > steps: > > Input: *vertices *= Datastream of <VertexId, [list of neighbor vertices], > label> > > Loop: > *labels *= *vertices*.flatmap (emiting a tupple <VertexID, label> > for every vertices.f0 and every element on vertices.f1) > .keyBy(VertexID) > .window(...) > .min(label); > > *updatedVertices *= *vertices*. join(labels).where(VertexId). > equalTo(VertexId) > .windowAll(...) > .apply(re-emit original *vertices *stream > tuples, but keeping the new labels) > > End loop > > I am trying to use IterativeStreams to do so. However, despite > successfully separating the tuples that need to be fed back to the loop (by > using filters and closeWith), the subsequent iterations are not happening. > So, what I get is only the first iteration. > I suppose this might come from the fact that I'm creating a new stream > (labels) based on the original IterativeStream, joining it with the > original one (vertices) and only then closing the loop with it. > Do you know whether Flink has some limitation in this respect? and if so, > would you have a hint about a different approach I could take for this > algorithm to avoid this? > > thank you in advance, > Henrique Colao > > >