Based on the pseudo code. Seems like you are trying to do the loop by
yourself and not suing the iterative.map() function[1].

I think you would need to specify the "map" function in order to use the
iterative stream. and there should be a clear definition on
which data is iterative. In this case you have label & vertices interlacing
each other but no specific loop back.

I would suggest something close to the example in [1], like
     *labelsVerticesGroup* = DataStream<initial_label, *vertices*>

     *labels* = *labelsVerticesGroup*.map(...)
                  .keyBy(VertexID)
                  .window(...)
                  .min(label);

     *vertices* = *labelsVerticesGroup*.map(...)

     *updatedLabelsVerticesGroup* = *vertices*.join(*labels*
).where(VertexId).equalTo(VertexId)
                  .windowAll(...)
                  .agg(...)

     *labelsVerticesGroup*.closeWith(*updatedLabelsVerticesGroup**)*

Is this what you are looking for?

--
Rong

Reference:
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#iterations

On Thu, May 10, 2018 at 9:50 AM, Henrique Colao Zanuz <
henrique.co...@gmail.com> wrote:

> Hi,
>
> I am trying to implement a connected components algorithm using
> DataStream. For this algorithm, I'm separating the data by tumbling
> windows. So, for each window, I'm trying to compute it independently.
> This algorithm is iterative because the labels (colors) of the vertices
> need to be propagated. Basically, I need to iterate over the following
> steps:
>
> Input: *vertices *= Datastream of <VertexId, [list of neighbor vertices],
> label>
>
> Loop:
>      *labels *= *vertices*.flatmap (emiting a tupple <VertexID, label>
> for every  vertices.f0 and every element on vertices.f1)
>                   .keyBy(VertexID)
>                   .window(...)
>                   .min(label);
>
>      *updatedVertices *= *vertices*. join(labels).where(VertexId).
> equalTo(VertexId)
>                                    .windowAll(...)
>                                    .apply(re-emit original *vertices *stream
> tuples, but keeping the new labels)
>
> End loop
>
> I am trying to use IterativeStreams to do so. However, despite
> successfully separating the tuples that need to be fed back to the loop (by
> using filters and closeWith), the subsequent iterations are not happening.
> So, what I get is only the first iteration.
> I suppose this might come from the fact that I'm creating a new stream
> (labels) based on the original IterativeStream, joining it with the
> original one (vertices) and only then closing the loop with it.
> Do you know whether Flink has some limitation in this respect? and if so,
> would you have a hint about a different approach I could take for this
> algorithm to avoid this?
>
> thank you in advance,
> Henrique Colao
>
>
>

Reply via email to