Yeah that too. Agree with Paris here. especially you are not only doing a windowed aggregation but a join step.
Using graph processing engine [1] might be the best idea here. Another possible way is to create a rich agg function over a combination of the <label, vertices, intermediate_results> tuple, this way you are complete in control on (1) what is stored as intermediate results, (2) when to emit output, (3) how to loop. Rong [1] https://flink.apache.org/news/2015/08/24/introducing-flink-gelly.html On Fri, May 11, 2018 at 3:32 AM, Paris Carbone <par...@kth.se> wrote: > Hey! > > I would recommend against using iterations with windows for that problem > at the moment. > Alongside loop scoping and backpressure that will be addressed by FLIP-15 > [1] I think you also need the notion of stream supersteps, which is > experimental work in progress for now, from my side at least. > > Until these features are added on Flink I would recommend trying out > gelly-streams [2], a Flink API for graph stream computation which supports > connected components in a single pass. > All you need to do is to convert your stream into edge additions. You can > try it and let us know what you think [2]. > > Paris > > [1] https://cwiki.apache.org/confluence/pages/viewpage. > action?pageId=66853132 > [2] https://github.com/vasia/gelly-streaming > > > On 11 May 2018, at 10:08, Henrique Colao Zanuz <henrique.co...@gmail.com> > wrote: > > Hi, > thank you for your reply > > Actually, I'm doing something very similar to your code. The problem I'm > having is that this structure is not generating any loop. For instance, If > I print *labelsVerticesGroup*, I only see the initial set of tuples, the > one from *updated**LabelsVerticesGroup* (at the end of the first > iteration) and nothing more. So, it means that the content of *updated* > *LabelsVerticesGroup* is indeed being correctly assigned to > *labelsVerticesGroup*, but the loop itself is not happening. > > For simplicity sake, here I'm omitting the logic behind the separation of > the tuples that need to be fed back to the loop. I do understand that both > codes we commented here are expected to loop indefinitely. On the complete > version of mine, I use the JoinFunction, a ProcessAllWindowFunction, a > Filter Function and a Map to create a flag that indicates if there was a > change on the label of a vertex during the join function, then the > ProcessAllWindowFunction to spread this flag to the whole window, in case > any tuple had a change. Finally I filter the tuples by this flag. This > whole mechanism is separating the tuples as expected. However, even if I > remove this logic from the code, in order to get an infinite loop of the > tuples (as we get on the code we've written in the previous emails), the > iteration does not work. > > PS. I've been using Flink 1.3.3 > > Best, > Henrique > > Em sex, 11 de mai de 2018 às 00:01, Rong Rong <walter...@gmail.com> > escreveu: > >> Based on the pseudo code. Seems like you are trying to do the loop by >> yourself and not suing the iterative.map() function[1]. >> >> I think you would need to specify the "map" function in order to use the >> iterative stream. and there should be a clear definition on >> which data is iterative. In this case you have label & vertices >> interlacing each other but no specific loop back. >> >> I would suggest something close to the example in [1], like >> *labelsVerticesGroup* = DataStream<initial_label, *vertices*> >> >> *labels* = *labelsVerticesGroup*.map(...) >> .keyBy(VertexID) >> .window(...) >> .min(label); >> >> *vertices* = *labelsVerticesGroup*.map(...) >> >> *updatedLabelsVerticesGroup* = *vertices*.join(*labels*).where( >> VertexId).equalTo(VertexId) >> .windowAll(...) >> .agg(...) >> >> *labelsVerticesGroup*.closeWith(*updatedLabelsVerticesGroup**)* >> >> Is this what you are looking for? >> >> -- >> Rong >> >> Reference: >> [1] https://ci.apache.org/projects/flink/flink-docs- >> master/dev/datastream_api.html#iterations >> >> On Thu, May 10, 2018 at 9:50 AM, Henrique Colao Zanuz < >> henrique.co...@gmail.com> wrote: >> >>> Hi, >>> >>> I am trying to implement a connected components algorithm using >>> DataStream. For this algorithm, I'm separating the data by tumbling >>> windows. So, for each window, I'm trying to compute it independently. >>> This algorithm is iterative because the labels (colors) of the vertices >>> need to be propagated. Basically, I need to iterate over the following >>> steps: >>> >>> Input: *vertices *= Datastream of <VertexId, [list of neighbor >>> vertices], label> >>> >>> Loop: >>> *labels *= *vertices*.flatmap (emiting a tupple <VertexID, label> >>> for every vertices.f0 and every element on vertices.f1) >>> .keyBy(VertexID) >>> .window(...) >>> .min(label); >>> >>> *updatedVertices *= *vertices*. join(labels).where(VertexId). >>> equalTo(VertexId) >>> .windowAll(...) >>> .apply(re-emit original * vertices >>> *stream >>> tuples, but keeping the new labels) >>> >>> End loop >>> >>> I am trying to use IterativeStreams to do so. However, despite >>> successfully separating the tuples that need to be fed back to the loop (by >>> using filters and closeWith), the subsequent iterations are not happening. >>> So, what I get is only the first iteration. >>> I suppose this might come from the fact that I'm creating a new stream >>> (labels) based on the original IterativeStream, joining it with the >>> original one (vertices) and only then closing the loop with it. >>> Do you know whether Flink has some limitation in this respect? and if >>> so, would you have a hint about a different approach I could take for this >>> algorithm to avoid this? >>> >>> thank you in advance, >>> Henrique Colao >>> >>> >>> >> >