Yeah that too. Agree with Paris here. especially you are not only doing a
windowed aggregation but a join step.

Using graph processing engine [1] might be the best idea here.

Another possible way is to create a rich agg function over a combination of
the <label, vertices, intermediate_results> tuple, this way you are
complete in control on (1) what is stored as intermediate results, (2) when
to emit output, (3) how to loop.

Rong

[1] https://flink.apache.org/news/2015/08/24/introducing-flink-gelly.html

On Fri, May 11, 2018 at 3:32 AM, Paris Carbone <par...@kth.se> wrote:

> Hey!
>
> I would recommend against using iterations with windows for that problem
> at the moment.
> Alongside loop scoping and backpressure that will be addressed by FLIP-15
> [1] I think you also need the notion of stream supersteps, which is
> experimental work in progress for now, from my side at least.
>
> Until these features are added on Flink I would recommend trying out
> gelly-streams [2],  a Flink API for graph stream computation which supports
> connected components in a single pass.
> All you need to do is to convert your stream into edge additions. You can
> try it and let us know what you think [2].
>
> Paris
>
> [1] https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=66853132
> [2] https://github.com/vasia/gelly-streaming
>
>
> On 11 May 2018, at 10:08, Henrique Colao Zanuz <henrique.co...@gmail.com>
> wrote:
>
> Hi,
> thank you for your reply
>
> Actually, I'm doing something very similar to your code. The problem I'm
> having is that this structure is not generating any loop. For instance, If
> I print *labelsVerticesGroup*, I only see the initial set of tuples, the
> one from *updated**LabelsVerticesGroup* (at the end of the first
> iteration) and nothing more. So, it means that the content of *updated*
> *LabelsVerticesGroup* is indeed being correctly assigned to
> *labelsVerticesGroup*, but the loop itself is not happening.
>
> For simplicity sake, here I'm omitting the logic behind the separation of
> the tuples that need to be fed back to the loop. I do understand that both
> codes we commented here are expected to loop indefinitely. On the complete
> version of mine, I use the JoinFunction, a ProcessAllWindowFunction, a
> Filter Function and a Map to create a flag that indicates if there was a
> change on the label of a vertex during the join function, then the
> ProcessAllWindowFunction to spread this flag to the whole window, in case
> any tuple had a change. Finally I filter the tuples by this flag. This
> whole mechanism is separating the tuples as expected. However, even if I
> remove this logic from the code, in order to get an infinite loop of the
> tuples (as we get on the code we've written in the previous emails), the
> iteration does not work.
>
> PS. I've been using Flink 1.3.3
>
> Best,
> Henrique
>
> Em sex, 11 de mai de 2018 às 00:01, Rong Rong <walter...@gmail.com>
> escreveu:
>
>> Based on the pseudo code. Seems like you are trying to do the loop by
>> yourself and not suing the iterative.map() function[1].
>>
>> I think you would need to specify the "map" function in order to use the
>> iterative stream. and there should be a clear definition on
>> which data is iterative. In this case you have label & vertices
>> interlacing each other but no specific loop back.
>>
>> I would suggest something close to the example in [1], like
>>      *labelsVerticesGroup* = DataStream<initial_label, *vertices*>
>>
>>      *labels* = *labelsVerticesGroup*.map(...)
>>                   .keyBy(VertexID)
>>                   .window(...)
>>                   .min(label);
>>
>>      *vertices* = *labelsVerticesGroup*.map(...)
>>
>>      *updatedLabelsVerticesGroup* = *vertices*.join(*labels*).where(
>> VertexId).equalTo(VertexId)
>>                   .windowAll(...)
>>                   .agg(...)
>>
>>      *labelsVerticesGroup*.closeWith(*updatedLabelsVerticesGroup**)*
>>
>> Is this what you are looking for?
>>
>> --
>> Rong
>>
>> Reference:
>> [1] https://ci.apache.org/projects/flink/flink-docs-
>> master/dev/datastream_api.html#iterations
>>
>> On Thu, May 10, 2018 at 9:50 AM, Henrique Colao Zanuz <
>> henrique.co...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to implement a connected components algorithm using
>>> DataStream. For this algorithm, I'm separating the data by tumbling
>>> windows. So, for each window, I'm trying to compute it independently.
>>> This algorithm is iterative because the labels (colors) of the vertices
>>> need to be propagated. Basically, I need to iterate over the following
>>> steps:
>>>
>>> Input: *vertices *= Datastream of <VertexId, [list of neighbor
>>> vertices], label>
>>>
>>> Loop:
>>>      *labels *= *vertices*.flatmap (emiting a tupple <VertexID, label>
>>> for every  vertices.f0 and every element on vertices.f1)
>>>                   .keyBy(VertexID)
>>>                   .window(...)
>>>                   .min(label);
>>>
>>>      *updatedVertices *= *vertices*. join(labels).where(VertexId).
>>> equalTo(VertexId)
>>>                                    .windowAll(...)
>>>                                    .apply(re-emit original * vertices 
>>> *stream
>>> tuples, but keeping the new labels)
>>>
>>> End loop
>>>
>>> I am trying to use IterativeStreams to do so. However, despite
>>> successfully separating the tuples that need to be fed back to the loop (by
>>> using filters and closeWith), the subsequent iterations are not happening.
>>> So, what I get is only the first iteration.
>>> I suppose this might come from the fact that I'm creating a new stream
>>> (labels) based on the original IterativeStream, joining it with the
>>> original one (vertices) and only then closing the loop with it.
>>> Do you know whether Flink has some limitation in this respect? and if
>>> so, would you have a hint about a different approach I could take for this
>>> algorithm to avoid this?
>>>
>>> thank you in advance,
>>> Henrique Colao
>>>
>>>
>>>
>>
>

Reply via email to