I somehow still suspect that iterations might work for your use case. Note, that in the streaming API, iterations are currently nothing more than a back-edge in the topology, i.e. a low-level tool to create a cyclic topology, like as you say with your hypothetical setter syntax. (It's quite different from the iterations of the batch API.)
The tricky part for your use-case is that you would want a ConnectedStream as your iteration head, which should get the elements from the back-edge in a separated way from the normal input. You could simulate this by using not ConnectedStream.flatMap, but a just a simple Stream.flatMap whose input element type is an Either type, whose two components would be the normal input and the back-edge input. (And you add maps before the closeWith and to your input1, which would appropriately wrap into the two alternatives of the Either type.) Best, Gábor 2017-01-29 15:39 GMT+01:00 Matt <dromitl...@gmail.com>: > Check this image for clarification, this is what I'm trying to do: > http://i.imgur.com/iZxPv04.png > > [image: Inline image 1] > > The rectangles are the two CoFlatMapFunction, sharing a state between > process and update (map1 and map2). It's clear from the image that I need > input1 and the green box to create the blue box, and input2 and the blue > box to create the green one. > > --- > *blue* = *input1*.connect(*green*).keyBy(...).flatMap(...); > *green* = *input2*.connect(*blue*).keyBy(...).flatMap(...); > --- > > As you can see there's no cycle in the flow of data so I guess this > topology is valid. The problem is not having a way to define such flow. > > For instance, with the appropriate setters we would be able to do this: > > --- > *blue* = *input1*.connect(); > *green* = *input2*.connect(); > > *blue.*setConnection(*green*); > *green*.setConnection(*blue*); > > *blue*.keyBy(...).flatMap(...); > *green*.keyBy(...).flatMap(...); > --- > > Any idea is welcome. > > Matt > > On Sat, Jan 28, 2017 at 5:31 PM, Matt <dromitl...@gmail.com> wrote: > >> I'm aware of IterativeStream but I don't think it's useful in this case. >> >> As shown in the example above, my use case is "cyclic" in that the same >> object goes from *Input* to *predictionStream* (flatMap1), then to >> *statsStream* (flatMap2, where it's updated with an object from *Input2*) >> and finally to *predictionStream* (flatMap2). >> >> The same operator is never applied twice to the object, thus I would say >> this dataflow is cyclic only in the dependencies of the stream >> (predictionStream depends on statsStream, but it depends on >> predictionStream in the first place). >> >> I hope it is clear now. >> >> Matt >> >> On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay <gga...@gmail.com> wrote: >> >>> Hello, >>> >>> Cyclic dataflows can be built using iterations: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/ >>> dev/datastream_api.html#iterations >>> >>> Best, >>> Gábor >>> >>> >>> >>> >>> 2017-01-28 18:39 GMT+01:00 Matt <dromitl...@gmail.com>: >>> > I have a ConnectedStream (A) that depends on another ConnectedStream >>> (B), >>> > which depends on the first one (A). >>> > >>> > Simplified code: >>> > >>> > predictionStream = input >>> > .connect(statsStream) >>> > .keyBy(...) >>> > .flatMap(CoFlatMapFunction { >>> > flatMap1(obj, output) { >>> > p = prediction(obj) >>> > output.collect(p) >>> > } >>> > flatMap2(stat, output) { >>> > updateModel(stat) >>> > } >>> > }) >>> > >>> > statsStream = input2 >>> > .connect(predictionStream) >>> > .keyBy(...) >>> > .flatMap(CoFlatMapFunction { >>> > flatMap1(obj2, output) { >>> > s = getStats(obj2, p) >>> > output.collect(s) >>> > } >>> > flatMap2(prediction, output) { >>> > p = prediction >>> > } >>> > }) >>> > >>> > I'm guessing this should be possible to achieve, one way would be to >>> add a >>> > sink on statsStream to save the elements into Kafka and read from that >>> topic >>> > on predictionStream instead of initializing it with a reference of >>> > statsStream. But I would rather avoid writing unnecessarily into kafka. >>> > >>> > Is there any other way to achieve this? >>> > >>> > Thanks, >>> > Matt >>> >> >> >