Hi Vasia, I had a look at your new implementation and have a few ideas for improvements. 1) Sending out the input iterator as you do in the last GroupReduce is quite dangerous and does not give a benefit compared to collecting all elements. Even though it is an iterator, it needs to be completely materialized in-memory whenever the record is touched by Flink or user code. I would propose to skip the reduce step completely and handle all messages separates and only collect them in the CoGroup function before giving them into the VertexComputeFunction. Be careful, to only do that with objectReuse disabled or take care to properly copy the messages. If you collect the messages in the CoGroup, you don't need the GroupReduce, have smaller records and you can remove the MessageIterator class completely. 2) Add this annotation to the AppendVertexState function: @ForwardedFieldsFirst("*->f0"). This indicates that the complete element of the first input becomes the first field of the output. Since the input is partitioned on "f0" (it comes out of the partitioned solution set) the result of ApplyVertexState will be partitioned on "f0.f0" which is (accidentially :-D) the join key of the following coGroup function -> no partitioning :-) 3) Adding the two flatMap functions behind the CoGroup prevents chaining and causes therefore some serialization overhead but shouldn't be too bad.
So in total I would make this program as follows: iVertices<K,VV> iMessage<K, Message> = iVertices.map(new InitWorkSet()); iteration = iVertices.iterateDelta(iMessages, maxIt, 0) verticesWithMessage<Vertex, Message> = iteration.getSolutionSet() .join(iteration.workSet()) .where(0) // solution set is local and build side .equalTo(0) // workset is shuffled and probe side of hashjoin superstepComp<Vertex,Tuple2<K, Message>,Bool> = verticesWithMessage.coGroup(edgessWithValue) .where("f0.f0") // vwm is locally forward and sorted .equalTo(0) // edges are already partitioned and sorted (if cached correctly) .with(...) // The coGroup collects all messages in a collection and gives it to the ComputeFunction delta<Vertex> = superStepComp.flatMap(...) // partitioned when merged into solution set workSet<K, Message> = superStepComp.flatMap(...) // partitioned for join iteration.closeWith(delta, workSet) So, if I am correct, the program will - partition the workset - sort the vertices with messages - partition the delta One observation I have is that this program requires that all messages fit into memory. Was that also the case before? Cheers, Fabian 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <vasilikikala...@gmail.com>: > @Martin: thanks for your input! If you ran into any other issues that I > didn't mention, please let us know. Obviously, even with my proposal, there > are still features we cannot support, e.g. updating edge values and graph > mutations. We'll need to re-think the underlying iteration and/or graph > representation for those. > > @Fabian: thanks a lot, no rush :) > Let me give you some more information that might make it easier to reason > about performance: > > Currently, in Spargel the SolutionSet (SS) keeps the vertex state and the > workset (WS) keeps the active vertices. The iteration is composed of 2 > coGroups. The first one takes the WS and the edges and produces messages. > The second one takes the messages and the SS and produced the new WS and > the SS-delta. > > In my proposal, the SS has the vertex state and the WS has <vertexId, > MessageIterator> pairs, i.e. the inbox of each vertex. The plan is more > complicated because compute() needs to have two iterators: over the edges > and over the messages. > First, I join SS and WS to get the active vertices (have received a msg) > and their current state. Then I coGroup the result with the edges to access > the neighbors. Now the main problem is that this coGroup needs to have 2 > outputs: the new messages and the new vertex value. I couldn't really find > a nice way to do this, so I'm emitting a Tuple that contains both types and > I have a flag to separate them later with 2 flatMaps. From the vertex > flatMap, I crete the SS-delta and from the messaged flatMap I apply a > reduce to group the messages by vertex and send them to the new WS. One > optimization would be to expose a combiner here to reduce message size. > > tl;dr: > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > 2. how can we efficiently emit 2 different types of records from a coGroup? > 3. does it make any difference if we group/combine the messages before > updating the workset or after? > > Cheers, > -Vasia. > > > On 27 October 2015 at 18:39, Fabian Hueske <fhue...@gmail.com> wrote: > > > I'll try to have a look at the proposal from a performance point of view > in > > the next days. > > Please ping me, if I don't follow up this thread. > > > > Cheers, Fabian > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <m.jungha...@mailbox.org>: > > > > > Hi, > > > > > > At our group, we also moved several algorithms from Giraph to Gelly and > > > ran into some confusing issues (first in understanding, second during > > > implementation) caused by the conceptional differences you described. > > > > > > If there are no concrete advantages (performance mainly) in the Spargel > > > implementation, we would be very happy to see the Gelly API be aligned > to > > > Pregel-like systems. > > > > > > Your SSSP example speaks for itself. Straightforward, if the reader is > > > familiar with Pregel/Giraph/... > > > > > > Best, > > > Martin > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > > > > >> Hello squirrels, > > >> > > >> I want to discuss with you a few concerns I have about our current > > >> vertex-centric model implementation, Spargel, now fully subsumed by > > Gelly. > > >> > > >> Spargel is our implementation of Pregel [1], but it violates some > > >> fundamental properties of the model, as described in the paper and as > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself confused > both > > >> when trying to explain it to current Giraph users and when porting my > > >> Giraph algorithms to it. > > >> > > >> More specifically: > > >> - in the Pregel model, messages produced in superstep n, are received > in > > >> superstep n+1. In Spargel, they are produced and consumed in the same > > >> iteration. > > >> - in Pregel, vertices are active during a superstep, if they have > > received > > >> a message in the previous superstep. In Spargel, a vertex is active > > during > > >> a superstep if it has changed its value. > > >> > > >> These two differences require a lot of rethinking when porting > > >> applications > > >> and can easily cause bugs. > > >> > > >> The most important problem however is that we require the user to > split > > >> the > > >> computation in 2 phases (2 UDFs): > > >> - messaging: has access to the vertex state and can produce messages > > >> - update: has access to incoming messages and can update the vertex > > value > > >> > > >> Pregel/Giraph only expose one UDF to the user: > > >> - compute: has access to both the vertex state and the incoming > > messages, > > >> can produce messages and update the vertex value. > > >> > > >> This might not seem like a big deal, but except from forcing the user > to > > >> split their program logic into 2 phases, Spargel also makes some > common > > >> computation patterns non-intuitive or impossible to write. A very > simple > > >> example is propagating a message based on its value or sender ID. To > do > > >> this with Spargel, one has to store all the incoming messages in the > > >> vertex > > >> value (might be of different type btw) during the messaging phase, so > > that > > >> they can be accessed during the update phase. > > >> > > >> So, my first question is, when implementing Spargel, were other > > >> alternatives considered and maybe rejected in favor of performance or > > >> because of some other reason? If someone knows, I would love to hear > > about > > >> them! > > >> > > >> Second, I wrote a prototype implementation [2] that only exposes one > > UDF, > > >> compute(), by keeping the vertex state in the solution set and the > > >> messages > > >> in the workset. This way all previously mentioned limitations go away > > and > > >> the API (see "SSSPComputeFunction" in the example [3]) looks a lot > more > > >> like Giraph (see [4]). > > >> > > >> I have not run any experiments yet and the prototype has some ugly > > hacks, > > >> but if you think any of this makes sense, then I'd be willing to > follow > > up > > >> and try to optimize it. If we see that it performs well, we can > consider > > >> either replacing Spargel or adding it as an alternative. > > >> > > >> Thanks for reading this long e-mail and looking forward to your input! > > >> > > >> Cheers, > > >> -Vasia. > > >> > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf > > >> [2]: > > >> > > >> > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > > >> [3]: > > >> > > >> > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > >> [4]: > > >> > > >> > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > > >> > > >> > > >