Thanks for the detailed explanation Stephan! Seeing Spargel as a Gather-Scatter model makes much more sense :)
I think we should be more careful not to present it as a "Pregel equivalent" to avoid confusion of users coming from systems like Giraph. Maybe I could put together a comparison table Pregel-Spargel-GSA to make clear what each model can support and what are the (dis-)advantages. I wouldn't kill Spargel either, but I would like to add to Gelly a more generic iteration model that could support algorithms beyond simplistic graph propagation. Pregel could be a start, even though it also has its issues. Having to return 2 different types and the size of the workset are the two problems I also faced, when trying to map Pregel to a Flink plan. Also, I am mostly convinced that representing the Graph as two separate datasets of vertices and edges is not optimal for the Pregel model. I'll share my findings as soon as I have an adjacency list version running. Thanks again! Cheers, -Vasia. On 3 November 2015 at 12:59, Martin Neumann <mneum...@sics.se> wrote: > I tried out Spargel during my work with Spotify and have implemented > several algorithms using it. In all implementations I ended up storing > additional Data and Flags on the Vertex to carry them over from one UDF to > the next one. It definitely makes the code harder to write and maintain. > > I wonder how much overhead these additional constructs cost in computation > and memory consumption. Maybe going for a less optimized 1 UDF version will > be not so much of a performance hit for most applications. > > > > On Tue, Nov 3, 2015 at 8:43 AM, Stephan Ewen <se...@apache.org> wrote: > > > Actually GAS was not known when we did the iterations work (and Spargel), > > but the intuition that led to Spargel is similar then the intuition that > > led to GAS. > > > > On Mon, Nov 2, 2015 at 4:35 PM, Stephan Ewen <se...@apache.org> wrote: > > > > > When creating the original version of Spargel I was pretty much > thinking > > > in GSA terms, more than in Pregel terms. There are some fundamental > > > differences between Spargel and Pregel. Spargel is in between GAS and > > > Pregel in some way, that is how I have always thought about it. > > > > > > The main reason for the form is that it fits the dataflow paradigm > > easier: > > > > > > - If one function emits the new state of the vertex and the messages, > > it > > > has two different return types, which means you need a union type and > > > filer/split type of operation on the result, which also adds overhead. > In > > > the current model, each function has one return type, which makes it > > easy. > > > > > > - The workset is also the feedback channel, which is materialized at > the > > > superstep boundaries, so keeping it small at O(vertices), rather than > > > O(edges) is a win for performance. > > > > > > There is no reason to not add a Pregel model, but I would not kill > > Spargel > > > for it. It will be tough to get the Pregel variant to the same > > efficiency. > > > Unless you want to say, for efficiency, go with GSA, for convenience > with > > > Pregel. > > > > > > There are some nice things about the Spargel model. The fact that > > messages > > > are first generated then consumes makes the generation of initial > > messages > > > simpler in many cases, I think. It was always a bit weird to me in > Pregel > > > that you had to check whether you are in superstep one, in which case > you > > > would expect no message, and generate initial value messages. > > > > > > > > > > > > On Fri, Oct 30, 2015 at 1:28 PM, Fabian Hueske <fhue...@gmail.com> > > wrote: > > > > > >> We can of course inject an optional ReduceFunction (or GroupReduce, or > > >> combinable GroupReduce) to reduce the size of the work set. > > >> I suggested to remove the GroupReduce function, because it did only > > >> collect > > >> all messages into a single record by emitting the input iterator which > > is > > >> quite dangerous. Applying a combinable reduce function is could > improve > > >> the > > >> performance considerably. > > >> > > >> The good news is that it would come "for free" because the necessary > > >> partitioning and sorting can be reused (given the forwardField > > annotations > > >> are correctly set): > > >> - The partitioning of the reduce can be reused for the join with the > > >> solution set > > >> - The sort of the reduce is preserved by the join with the in-memory > > >> hash-table of the solution set and can be reused for the coGroup. > > >> > > >> Best, > > >> Fabian > > >> > > >> 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri < > vasilikikala...@gmail.com > > >: > > >> > > >> > Hi Fabian, > > >> > > > >> > thanks so much for looking into this so quickly :-) > > >> > > > >> > One update I have to make is that I tried running a few experiments > > with > > >> > this on a 6-node cluster. The current implementation gets stuck at > > >> > "Rebuilding Workset Properties" and never finishes a single > iteration. > > >> > Running the plan of one superstep without a delta iteration > terminates > > >> > fine. I didn't have access to the cluster today, so I couldn't debug > > >> this > > >> > further, but I will do as soon as I have access again. > > >> > > > >> > The rest of my comments are inline: > > >> > > > >> > On 30 October 2015 at 17:53, Fabian Hueske <fhue...@gmail.com> > wrote: > > >> > > > >> > > Hi Vasia, > > >> > > > > >> > > I had a look at your new implementation and have a few ideas for > > >> > > improvements. > > >> > > 1) Sending out the input iterator as you do in the last > GroupReduce > > is > > >> > > quite dangerous and does not give a benefit compared to collecting > > all > > >> > > elements. Even though it is an iterator, it needs to be completely > > >> > > materialized in-memory whenever the record is touched by Flink or > > user > > >> > > code. > > >> > > I would propose to skip the reduce step completely and handle all > > >> > messages > > >> > > separates and only collect them in the CoGroup function before > > giving > > >> > them > > >> > > into the VertexComputeFunction. Be careful, to only do that with > > >> > > objectReuse disabled or take care to properly copy the messages. > If > > >> you > > >> > > collect the messages in the CoGroup, you don't need the > GroupReduce, > > >> have > > >> > > smaller records and you can remove the MessageIterator class > > >> completely. > > >> > > > > >> > > > >> > I see. The idea was to expose to message combiner that user could > > >> > implement if the messages are combinable, e.g. min, sum. This is a > > >> common > > >> > case and reduces the message load significantly. Is there a way I > > could > > >> do > > >> > something similar before the coGroup? > > >> > > > >> > > > >> > > > >> > > 2) Add this annotation to the AppendVertexState function: > > >> > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete > > >> element > > >> > of > > >> > > the first input becomes the first field of the output. Since the > > >> input is > > >> > > partitioned on "f0" (it comes out of the partitioned solution set) > > the > > >> > > result of ApplyVertexState will be partitioned on "f0.f0" which is > > >> > > (accidentially :-D) the join key of the following coGroup function > > -> > > >> no > > >> > > partitioning :-) > > >> > > > > >> > > > >> > Great! I totally missed that ;) > > >> > > > >> > > > >> > > > >> > > 3) Adding the two flatMap functions behind the CoGroup prevents > > >> chaining > > >> > > and causes therefore some serialization overhead but shouldn't be > > too > > >> > bad. > > >> > > > > >> > > So in total I would make this program as follows: > > >> > > > > >> > > iVertices<K,VV> > > >> > > iMessage<K, Message> = iVertices.map(new InitWorkSet()); > > >> > > > > >> > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > > >> > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet() > > >> > > .join(iteration.workSet()) > > >> > > .where(0) // solution set is local and build side > > >> > > .equalTo(0) // workset is shuffled and probe side of hashjoin > > >> > > superstepComp<Vertex,Tuple2<K, Message>,Bool> = > > >> > > verticesWithMessage.coGroup(edgessWithValue) > > >> > > .where("f0.f0") // vwm is locally forward and sorted > > >> > > .equalTo(0) // edges are already partitioned and sorted (if > > cached > > >> > > correctly) > > >> > > .with(...) // The coGroup collects all messages in a collection > > and > > >> > gives > > >> > > it to the ComputeFunction > > >> > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when > > merged > > >> > into > > >> > > solution set > > >> > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned > for > > >> join > > >> > > iteration.closeWith(delta, workSet) > > >> > > > > >> > > So, if I am correct, the program will > > >> > > - partition the workset > > >> > > - sort the vertices with messages > > >> > > - partition the delta > > >> > > > > >> > > One observation I have is that this program requires that all > > messages > > >> > fit > > >> > > into memory. Was that also the case before? > > >> > > > > >> > > > >> > I believe not. The plan has one coGroup that produces the messages > > and > > >> a > > >> > following coGroup that groups by the messages "target ID" and > consumes > > >> > them in an iterator. That doesn't require them to fit in memory, > > right? > > >> > > > >> > > > >> > I'm also working on a version where the graph is represented as an > > >> > adjacency list, instead of two separate datasets of vertices and > > edges. > > >> The > > >> > disadvantage is that the graph has to fit in memory, but I think the > > >> > advantages are many. We'll be able to support edge value updates, > > edge > > >> > mutations and different edge access order guarantees. I'll get back > to > > >> this > > >> > thread when I have a working prototype. > > >> > > > >> > > > >> > > > > >> > > Cheers, > > >> > > Fabian > > >> > > > > >> > > > >> > Thanks again! > > >> > > > >> > Cheers, > > >> > -Vasia. > > >> > > > >> > > > >> > > > >> > > > > >> > > > > >> > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri < > > >> vasilikikala...@gmail.com>: > > >> > > > > >> > > > @Martin: thanks for your input! If you ran into any other issues > > >> that I > > >> > > > didn't mention, please let us know. Obviously, even with my > > >> proposal, > > >> > > there > > >> > > > are still features we cannot support, e.g. updating edge values > > and > > >> > graph > > >> > > > mutations. We'll need to re-think the underlying iteration > and/or > > >> graph > > >> > > > representation for those. > > >> > > > > > >> > > > @Fabian: thanks a lot, no rush :) > > >> > > > Let me give you some more information that might make it easier > to > > >> > reason > > >> > > > about performance: > > >> > > > > > >> > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex > state > > >> and > > >> > the > > >> > > > workset (WS) keeps the active vertices. The iteration is > composed > > >> of 2 > > >> > > > coGroups. The first one takes the WS and the edges and produces > > >> > messages. > > >> > > > The second one takes the messages and the SS and produced the > new > > WS > > >> > and > > >> > > > the SS-delta. > > >> > > > > > >> > > > In my proposal, the SS has the vertex state and the WS has > > >> <vertexId, > > >> > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan > is > > >> more > > >> > > > complicated because compute() needs to have two iterators: over > > the > > >> > edges > > >> > > > and over the messages. > > >> > > > First, I join SS and WS to get the active vertices (have > received > > a > > >> > msg) > > >> > > > and their current state. Then I coGroup the result with the > edges > > to > > >> > > access > > >> > > > the neighbors. Now the main problem is that this coGroup needs > to > > >> have > > >> > 2 > > >> > > > outputs: the new messages and the new vertex value. I couldn't > > >> really > > >> > > find > > >> > > > a nice way to do this, so I'm emitting a Tuple that contains > both > > >> types > > >> > > and > > >> > > > I have a flag to separate them later with 2 flatMaps. From the > > >> vertex > > >> > > > flatMap, I crete the SS-delta and from the messaged flatMap I > > apply > > >> a > > >> > > > reduce to group the messages by vertex and send them to the new > > WS. > > >> One > > >> > > > optimization would be to expose a combiner here to reduce > message > > >> size. > > >> > > > > > >> > > > tl;dr: > > >> > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > > >> > > > 2. how can we efficiently emit 2 different types of records > from a > > >> > > coGroup? > > >> > > > 3. does it make any difference if we group/combine the messages > > >> before > > >> > > > updating the workset or after? > > >> > > > > > >> > > > Cheers, > > >> > > > -Vasia. > > >> > > > > > >> > > > > > >> > > > On 27 October 2015 at 18:39, Fabian Hueske <fhue...@gmail.com> > > >> wrote: > > >> > > > > > >> > > > > I'll try to have a look at the proposal from a performance > point > > >> of > > >> > > view > > >> > > > in > > >> > > > > the next days. > > >> > > > > Please ping me, if I don't follow up this thread. > > >> > > > > > > >> > > > > Cheers, Fabian > > >> > > > > > > >> > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns < > > >> m.jungha...@mailbox.org > > >> > >: > > >> > > > > > > >> > > > > > Hi, > > >> > > > > > > > >> > > > > > At our group, we also moved several algorithms from Giraph > to > > >> Gelly > > >> > > and > > >> > > > > > ran into some confusing issues (first in understanding, > second > > >> > during > > >> > > > > > implementation) caused by the conceptional differences you > > >> > described. > > >> > > > > > > > >> > > > > > If there are no concrete advantages (performance mainly) in > > the > > >> > > Spargel > > >> > > > > > implementation, we would be very happy to see the Gelly API > be > > >> > > aligned > > >> > > > to > > >> > > > > > Pregel-like systems. > > >> > > > > > > > >> > > > > > Your SSSP example speaks for itself. Straightforward, if the > > >> reader > > >> > > is > > >> > > > > > familiar with Pregel/Giraph/... > > >> > > > > > > > >> > > > > > Best, > > >> > > > > > Martin > > >> > > > > > > > >> > > > > > > > >> > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > >> > > > > > > > >> > > > > >> Hello squirrels, > > >> > > > > >> > > >> > > > > >> I want to discuss with you a few concerns I have about our > > >> current > > >> > > > > >> vertex-centric model implementation, Spargel, now fully > > >> subsumed > > >> > by > > >> > > > > Gelly. > > >> > > > > >> > > >> > > > > >> Spargel is our implementation of Pregel [1], but it > violates > > >> some > > >> > > > > >> fundamental properties of the model, as described in the > > paper > > >> and > > >> > > as > > >> > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself > > >> > confused > > >> > > > both > > >> > > > > >> when trying to explain it to current Giraph users and when > > >> porting > > >> > > my > > >> > > > > >> Giraph algorithms to it. > > >> > > > > >> > > >> > > > > >> More specifically: > > >> > > > > >> - in the Pregel model, messages produced in superstep n, > are > > >> > > received > > >> > > > in > > >> > > > > >> superstep n+1. In Spargel, they are produced and consumed > in > > >> the > > >> > > same > > >> > > > > >> iteration. > > >> > > > > >> - in Pregel, vertices are active during a superstep, if > they > > >> have > > >> > > > > received > > >> > > > > >> a message in the previous superstep. In Spargel, a vertex > is > > >> > active > > >> > > > > during > > >> > > > > >> a superstep if it has changed its value. > > >> > > > > >> > > >> > > > > >> These two differences require a lot of rethinking when > > porting > > >> > > > > >> applications > > >> > > > > >> and can easily cause bugs. > > >> > > > > >> > > >> > > > > >> The most important problem however is that we require the > > user > > >> to > > >> > > > split > > >> > > > > >> the > > >> > > > > >> computation in 2 phases (2 UDFs): > > >> > > > > >> - messaging: has access to the vertex state and can produce > > >> > messages > > >> > > > > >> - update: has access to incoming messages and can update > the > > >> > vertex > > >> > > > > value > > >> > > > > >> > > >> > > > > >> Pregel/Giraph only expose one UDF to the user: > > >> > > > > >> - compute: has access to both the vertex state and the > > incoming > > >> > > > > messages, > > >> > > > > >> can produce messages and update the vertex value. > > >> > > > > >> > > >> > > > > >> This might not seem like a big deal, but except from > forcing > > >> the > > >> > > user > > >> > > > to > > >> > > > > >> split their program logic into 2 phases, Spargel also makes > > >> some > > >> > > > common > > >> > > > > >> computation patterns non-intuitive or impossible to write. > A > > >> very > > >> > > > simple > > >> > > > > >> example is propagating a message based on its value or > sender > > >> ID. > > >> > To > > >> > > > do > > >> > > > > >> this with Spargel, one has to store all the incoming > messages > > >> in > > >> > the > > >> > > > > >> vertex > > >> > > > > >> value (might be of different type btw) during the messaging > > >> phase, > > >> > > so > > >> > > > > that > > >> > > > > >> they can be accessed during the update phase. > > >> > > > > >> > > >> > > > > >> So, my first question is, when implementing Spargel, were > > other > > >> > > > > >> alternatives considered and maybe rejected in favor of > > >> performance > > >> > > or > > >> > > > > >> because of some other reason? If someone knows, I would > love > > to > > >> > hear > > >> > > > > about > > >> > > > > >> them! > > >> > > > > >> > > >> > > > > >> Second, I wrote a prototype implementation [2] that only > > >> exposes > > >> > one > > >> > > > > UDF, > > >> > > > > >> compute(), by keeping the vertex state in the solution set > > and > > >> the > > >> > > > > >> messages > > >> > > > > >> in the workset. This way all previously mentioned > limitations > > >> go > > >> > > away > > >> > > > > and > > >> > > > > >> the API (see "SSSPComputeFunction" in the example [3]) > looks > > a > > >> lot > > >> > > > more > > >> > > > > >> like Giraph (see [4]). > > >> > > > > >> > > >> > > > > >> I have not run any experiments yet and the prototype has > some > > >> ugly > > >> > > > > hacks, > > >> > > > > >> but if you think any of this makes sense, then I'd be > willing > > >> to > > >> > > > follow > > >> > > > > up > > >> > > > > >> and try to optimize it. If we see that it performs well, we > > can > > >> > > > consider > > >> > > > > >> either replacing Spargel or adding it as an alternative. > > >> > > > > >> > > >> > > > > >> Thanks for reading this long e-mail and looking forward to > > your > > >> > > input! > > >> > > > > >> > > >> > > > > >> Cheers, > > >> > > > > >> -Vasia. > > >> > > > > >> > > >> > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > > >> > > > > >> [2]: > > >> > > > > >> > > >> > > > > >> > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > > >> > > > > >> [3]: > > >> > > > > >> > > >> > > > > >> > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > >> > > > > >> [4]: > > >> > > > > >> > > >> > > > > >> > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > > >> > > > > >> > > >> > > > > >> > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > > > > >