I somehow still suspect that iterations might work for your use case. Note,
that in the streaming API, iterations are currently nothing more than a
back-edge in the topology, i.e. a low-level tool to create a cyclic
topology, like as you say with your hypothetical setter syntax. (It's quite
different from the iterations of the batch API.)

The tricky part for your use-case is that you would want a ConnectedStream
as your iteration head, which should get the elements from the back-edge in
a separated way from the normal input. You could simulate this by using not
ConnectedStream.flatMap, but a just a simple Stream.flatMap whose input
element type is an Either type, whose two components would be the normal
input and the back-edge input. (And you add maps before the closeWith and
to your input1, which would appropriately wrap into the two alternatives of
the Either type.)

Best,
Gábor



2017-01-29 15:39 GMT+01:00 Matt <dromitl...@gmail.com>:

> Check this image for clarification, this is what I'm trying to do:
> http://i.imgur.com/iZxPv04.png
>
> [image: Inline image 1]
>
> The rectangles are the two CoFlatMapFunction, sharing a state between
> process and update (map1 and map2). It's clear from the image that I need
> input1 and the green box to create the blue box, and input2 and the blue
> box to create the green one.
>
> ---
> *blue*  = *input1*.connect(*green*).keyBy(...).flatMap(...);
> *green* = *input2*.connect(*blue*).keyBy(...).flatMap(...);
> ---
>
> As you can see there's no cycle in the flow of data so I guess this
> topology is valid. The problem is not having a way to define such flow.
>
> For instance, with the appropriate setters we would be able to do this:
>
> ---
> *blue*  = *input1*.connect();
> *green* = *input2*.connect();
>
> *blue.*setConnection(*green*);
> *green*.setConnection(*blue*);
>
> *blue*.keyBy(...).flatMap(...);
> *green*.keyBy(...).flatMap(...);
> ---
>
> Any idea is welcome.
>
> Matt
>
> On Sat, Jan 28, 2017 at 5:31 PM, Matt <dromitl...@gmail.com> wrote:
>
>> I'm aware of IterativeStream but I don't think it's useful in this case.
>>
>> As shown in the example above, my use case is "cyclic" in that the same
>> object goes from *Input* to *predictionStream* (flatMap1), then to
>> *statsStream* (flatMap2, where it's updated with an object from *Input2*)
>> and finally to *predictionStream* (flatMap2).
>>
>> The same operator is never applied twice to the object, thus I would say
>> this dataflow is cyclic only in the dependencies of the stream
>> (predictionStream depends on statsStream, but it depends on
>> predictionStream in the first place).
>>
>> I hope it is clear now.
>>
>> Matt
>>
>> On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay <gga...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Cyclic dataflows can be built using iterations:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>>> dev/datastream_api.html#iterations
>>>
>>> Best,
>>> Gábor
>>>
>>>
>>>
>>>
>>> 2017-01-28 18:39 GMT+01:00 Matt <dromitl...@gmail.com>:
>>> > I have a ConnectedStream (A) that depends on another ConnectedStream
>>> (B),
>>> > which depends on the first one (A).
>>> >
>>> > Simplified code:
>>> >
>>> > predictionStream = input
>>> >   .connect(statsStream)
>>> >   .keyBy(...)
>>> >   .flatMap(CoFlatMapFunction {
>>> >      flatMap1(obj, output) {
>>> >          p = prediction(obj)
>>> >          output.collect(p)
>>> >      }
>>> >      flatMap2(stat, output) {
>>> >          updateModel(stat)
>>> >      }
>>> >   })
>>> >
>>> > statsStream = input2
>>> >   .connect(predictionStream)
>>> >   .keyBy(...)
>>> >   .flatMap(CoFlatMapFunction {
>>> >      flatMap1(obj2, output) {
>>> >         s = getStats(obj2, p)
>>> >         output.collect(s)
>>> >      }
>>> >      flatMap2(prediction, output) {
>>> >         p = prediction
>>> >      }
>>> >   })
>>> >
>>> > I'm guessing this should be possible to achieve, one way would be to
>>> add a
>>> > sink on statsStream to save the elements into Kafka and read from that
>>> topic
>>> > on predictionStream instead of initializing it with a reference of
>>> > statsStream. But I would rather avoid writing unnecessarily into kafka.
>>> >
>>> > Is there any other way to achieve this?
>>> >
>>> > Thanks,
>>> > Matt
>>>
>>
>>
>

Reply via email to