Re: Question About "Preserve Partitioning" in Stream Iteration

2015-08-03 Thread Gyula Fóra
Okay, sounds reasonable :) Stephan Ewen ezt írta (időpont: 2015. aug. 3., H, 10:24): > I don't think there is a fundamental limitation to the simpler approach. > The only real difference is that DOPs are adjusted before the tail, so only > one head/tail pair is needed. > > Nested iterations shou

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-08-03 Thread Stephan Ewen
I don't think there is a fundamental limitation to the simpler approach. The only real difference is that DOPs are adjusted before the tail, so only one head/tail pair is needed. Nested iterations should still be possible... On Mon, Aug 3, 2015 at 10:21 AM, Gyula Fóra wrote: > It is critical fo

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-08-03 Thread Gyula Fóra
It is critical for many applications (such as SAMOA or Storm compatibility) to build arbitrary cyclic flows. If your suggestion covers all cases (for instance nested iterations) then I am not against it. The current implementation is just one way to do it, but it allows arbitrary cycles. From the

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-08-03 Thread Aljoscha Krettek
Yes, that's what I was proposing in my second mail: I thought about having some tighter restrictions here. My idea was to enforce that the feedback edges must have the same parallelism as the original input stream, otherwise shipping strategies such as "keyBy", "shuffle", "rebalance" don't seem to

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-08-02 Thread Stephan Ewen
This model strikes me as pretty complicated. Imagine the extra logic and code path necessary for proper checkpointing as well. Why not do a simple approach: - There is one parallel head, one parallel tail, both with the same parallelism - Any computation in between may have it own parallelism

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-08-02 Thread Aljoscha Krettek
To answer the question plain and simple: No, there are several different parallel heads and tails. For example in this: val iter = ds.iteration() val head_tail1 = iter.map().parallelism(2) val head_tail2 = iter.map().parallelism(4) iter.closeWith(head_tail1.union(head_tail2)) We have one head/t

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-08-02 Thread Gyula Fóra
In a streaming program when we create an IterativeDataStream, we practically mark the union point of some later feedback stream (the one passed in to closeWith(..)). The operators applied on this IterativeDataStream will receive the feedback input as well. We call the operators applied on the iter

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-08-02 Thread Stephan Ewen
I don't get the discussion here, can you help me with what you mean by "different iteration heads and tails" ? An iteration does not have one parallel head and one parallel tail? On Fri, Jul 31, 2015 at 6:52 PM, Gyula Fóra wrote: > Maybe you can reuse some of the logic that is currently there o

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-07-31 Thread Gyula Fóra
Maybe you can reuse some of the logic that is currently there on the StreamGraph, with building StreamLoops first which will be used to generate the sources and sinks right before building the JobGraph. This avoids the need of knowing everything beforehand. I actually added this to avoid the compl

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-07-31 Thread Aljoscha Krettek
Sure it can be done, it's just more complex if you try to do it in a sane way without having the code that builds the StreamGraph all over the place. :D I'll try to come up with something. This is my current work in progress, by the way: https://github.com/aljoscha/flink/tree/stream-api-rework I

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-07-31 Thread Gyula Fóra
There might be reasons why a user would want different parallelism at the head operators (depending on what else that head operator might process) so restricting them to the same parallelism is a little bit weird don't you think? It kind of goes against the whole opeartors-parallelism idea. I don'

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-07-31 Thread Aljoscha Krettek
Yes, I'm not saying that it makes sense to do it, I'm just saying that it does translate and run. Your observation is true. :D I'm wondering whether it makes sense to allow users to have iteration heads with differing parallelism, in fact. On Fri, 31 Jul 2015 at 16:40 Gyula Fóra wrote: > I stil

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-07-31 Thread Gyula Fóra
I still don't get how it could possibly work, let me tell you how I see and correct me in my logic :) You have this program: ids.map1().setParallelism(2) ids.map2().setParallelism(4) //... ids.closeWith(feedback.groupBy(0)) You are suggesting that we only have one iteration source/sink pair wit

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-07-31 Thread Aljoscha Krettek
Yes, this would still work. For example, I have this crazy graph: http://postimg.org/image/xtv8ay8hv/full/ That results from this program: https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5 It works, and the implementation is very simple, actually. On Fri, 31 Jul 2015 at 14:30 Gyula Fóra wrot

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-07-31 Thread Gyula Fóra
I mean that the head operators have different parallelism: IterativeDataStream ids = ... ids.map().setParallelism(2) ids.map().setParallelism(4) //... ids.closeWith(feedback) Aljoscha Krettek ezt írta (időpont: 2015. júl. 31., P, 14:23): > I thought about having some tighter restrictions her

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-07-31 Thread Aljoscha Krettek
I thought about having some tighter restrictions here. My idea was to enforce that the feedback edges must have the same parallelism as the original input stream, otherwise shipping strategies such as "keyBy", "shuffle", "rebalance" don't seem to make sense because they would differ from the distri

Re: Question About "Preserve Partitioning" in Stream Iteration

2015-07-31 Thread Gyula Fóra
Hey, I am not sure what is the intuitive behaviour here. As you are not applying a transformation on the feedback stream but pass it to a closeWith method, I thought it was somehow nature that it gets the partitioning of the iteration input, but maybe its not intuitive. If others also think that

Question About "Preserve Partitioning" in Stream Iteration

2015-07-31 Thread Aljoscha Krettek
Hi, I'm currently working on making the StreamGraph generation more centralized (i.e. not spread across the different API classes). The question is now why we need to switch to preserve partitioning? Could we not make "preserve" partitioning the default and if users want to have shuffle partitionin