In a streaming program when we create an IterativeDataStream, we practically mark the union point of some later feedback stream (the one passed in to closeWith(..)).
The operators applied on this IterativeDataStream will receive the feedback input as well. We call the operators applied on the iterative dataStream head operators. We call the operators that produce the streams passed into closeWith tail operators. With this terminology we can have many heads and tails with varying parallelism. Stephan Ewen <se...@apache.org> ezt írta (időpont: 2015. aug. 2., V, 20:16): > I don't get the discussion here, can you help me with what you mean by > "different iteration heads and tails" ? > > An iteration does not have one parallel head and one parallel tail? > > On Fri, Jul 31, 2015 at 6:52 PM, Gyula Fóra <gyula.f...@gmail.com> wrote: > > > Maybe you can reuse some of the logic that is currently there on the > > StreamGraph, with building StreamLoops first which will be used to > generate > > the sources and sinks right before building the JobGraph. This avoids the > > need of knowing everything beforehand. > > > > I actually added this to avoid the complexities that you are probably > > facing now. > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. júl. > 31., > > P, 17:28): > > > > > Sure it can be done, it's just more complex if you try to do it in a > sane > > > way without having the code that builds the StreamGraph all over the > > place. > > > :D > > > > > > I'll try to come up with something. This is my current work in > progress, > > by > > > the way: https://github.com/aljoscha/flink/tree/stream-api-rework > > > > > > I managed to ban the StreamGraph from StreamExecutionEnvironment and > the > > > API classes such as DataStream. The API methods construct a Graph of > > > Transformation Nodes and don't contain any information themselves. Then > > > there is a StreamGraphGenerator that builds a StreamGraph from the > > > transformations. The abstraction is very nice and simple, the only > > problem > > > that remains are the differing-parallelism-iterations but I'll figure > > them > > > out. > > > > > > P.S. The code is not well documented yet, but the base class for > > > transformations is StreamTransformation. From there anyone who want's > to > > > check it out can find the other transformations. > > > > > > On Fri, 31 Jul 2015 at 17:17 Gyula Fóra <gyula.f...@gmail.com> wrote: > > > > > > > There might be reasons why a user would want different parallelism at > > the > > > > head operators (depending on what else that head operator might > > process) > > > so > > > > restricting them to the same parallelism is a little bit weird don't > > you > > > > think? It kind of goes against the whole opeartors-parallelism idea. > > > > > > > > I don't think its a huge complexity to group head operators together > by > > > > parallelism and add a source/sink per each group like we do now. What > > do > > > > you say? > > > > > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. júl. > > > 31., > > > > P, 17:10): > > > > > > > > > Yes, I'm not saying that it makes sense to do it, I'm just saying > > that > > > it > > > > > does translate and run. Your observation is true. :D > > > > > > > > > > I'm wondering whether it makes sense to allow users to have > iteration > > > > heads > > > > > with differing parallelism, in fact. > > > > > > > > > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra <gyula.f...@gmail.com> > > wrote: > > > > > > > > > > > I still don't get how it could possibly work, let me tell you > how I > > > see > > > > > and > > > > > > correct me in my logic :) > > > > > > > > > > > > You have this program: > > > > > > ids.map1().setParallelism(2) > > > > > > ids.map2().setParallelism(4) > > > > > > > > > > > > //... > > > > > > > > > > > > ids.closeWith(feedback.groupBy(0)) > > > > > > > > > > > > You are suggesting that we only have one iteration source/sink > pair > > > > with > > > > > > parallelism of either 2 or 4. I will assume that the parallelism > > is 2 > > > > for > > > > > > the sake of the argument. > > > > > > > > > > > > The iteration source is connected to map1 and map2 with Forward > > > > > > partitioning and the sink is connected with groupBy(0). > > > > > > Each sink instance will receive all tuples of a given key which > > also > > > > > means > > > > > > that each iteration source instance (2) will too. > > > > > > > > > > > > Now here comes the problem: the source will forward the tuples to > > > map 1 > > > > > and > > > > > > since we have forward connection we maintiain the groupby > semantics > > > > (this > > > > > > is perfect.) the sources will also forward to map 2 which has > > higher > > > > > > parallelism so the tuple sending turns into round robin, which > > screws > > > > up > > > > > > the groupby. > > > > > > > > > > > > What did I miss? > > > > > > Gyula > > > > > > > > > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. > > júl. > > > > > 31., > > > > > > P, 14:59): > > > > > > > > > > > > > Yes, this would still work. For example, I have this crazy > graph: > > > > > > > http://postimg.org/image/xtv8ay8hv/full/ That results from > this > > > > > program: > > > > > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5 > > > > > > > > > > > > > > It works, and the implementation is very simple, actually. > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra <gyula.f...@gmail.com> > > > > wrote: > > > > > > > > > > > > > > > I mean that the head operators have different parallelism: > > > > > > > > > > > > > > > > IterativeDataStream ids = ... > > > > > > > > > > > > > > > > ids.map().setParallelism(2) > > > > > > > > ids.map().setParallelism(4) > > > > > > > > > > > > > > > > //... > > > > > > > > > > > > > > > > ids.closeWith(feedback) > > > > > > > > > > > > > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: > > 2015. > > > > júl. > > > > > > > 31., > > > > > > > > P, 14:23): > > > > > > > > > > > > > > > > > I thought about having some tighter restrictions here. My > > idea > > > > was > > > > > to > > > > > > > > > enforce that the feedback edges must have the same > > parallelism > > > as > > > > > the > > > > > > > > > original input stream, otherwise shipping strategies such > as > > > > > "keyBy", > > > > > > > > > "shuffle", "rebalance" don't seem to make sense because > they > > > > would > > > > > > > differ > > > > > > > > > from the distribution of the original elements (at least > > IMHO). > > > > > Maybe > > > > > > > I'm > > > > > > > > > wrong there, though. > > > > > > > > > > > > > > > > > > To me it seems intuitive that I get the feedback at the > head > > > they > > > > > > way I > > > > > > > > > specify it at the tail. But maybe that's also just me... :D > > > > > > > > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra <gyf...@apache.org > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hey, > > > > > > > > > > > > > > > > > > > > I am not sure what is the intuitive behaviour here. As > you > > > are > > > > > not > > > > > > > > > applying > > > > > > > > > > a transformation on the feedback stream but pass it to a > > > > > closeWith > > > > > > > > > method, > > > > > > > > > > I thought it was somehow nature that it gets the > > partitioning > > > > of > > > > > > the > > > > > > > > > > iteration input, but maybe its not intuitive. > > > > > > > > > > > > > > > > > > > > If others also think that preserving feedback > partitioning > > > > should > > > > > > be > > > > > > > > the > > > > > > > > > > default I am not against it :) > > > > > > > > > > > > > > > > > > > > Btw, this still won't make it very simple. We still need > as > > > > many > > > > > > > > > > source/sink pairs as we have different parallelism among > > the > > > > head > > > > > > > > > > operators. Otherwise the forwarding logic wont work. > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > Aljoscha Krettek <aljos...@apache.org> ezt írta > (időpont: > > > > 2015. > > > > > > júl. > > > > > > > > > 31., > > > > > > > > > > P, 11:52): > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > I'm currently working on making the StreamGraph > > generation > > > > more > > > > > > > > > > centralized > > > > > > > > > > > (i.e. not spread across the different API classes). The > > > > > question > > > > > > is > > > > > > > > now > > > > > > > > > > why > > > > > > > > > > > we need to switch to preserve partitioning? Could we > not > > > make > > > > > > > > > "preserve" > > > > > > > > > > > partitioning the default and if users want to have > > shuffle > > > > > > > > partitioning > > > > > > > > > > or > > > > > > > > > > > anything they have to specify it manually when adding > the > > > > > > feedback > > > > > > > > > edge? > > > > > > > > > > > > > > > > > > > > > > This would make for a very simple scheme where the > > > iteration > > > > > > > sources > > > > > > > > > are > > > > > > > > > > > always connected to the heads using "forward" and the > > tails > > > > are > > > > > > > > > connected > > > > > > > > > > > to the iteration sinks using whatever partitioner was > set > > > by > > > > > the > > > > > > > > user. > > > > > > > > > > This > > > > > > > > > > > would make it more transparent than the current default > > of > > > > the > > > > > > > > > "shuffle" > > > > > > > > > > > betweens tails and iteration sinks. > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > Aljoscha > > > > > > > > > > > > > > > > > > > > > > P.S. I now we had quite some discussion about > introducing > > > > > > "preserve > > > > > > > > > > > partitioning" but now, when I think of it it should be > > the > > > > > > > default... > > > > > > > > > :D > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >