Hello squirrels and happy new year! I'm reviving this thread to share some results and discuss next steps.
Using the Either type I was able to get rid of redundant messages and vertex state. During the past few weeks, I have been running experiments, which show that the performance of this "Pregel" model has improved a lot :) In [1], you can see the speedup of GSA and Pregel over Spargel, for SSSP and Connected Components (CC), for the Livejournal (68m edges), Orkut (117m edges) and Wikipedia (340m edges) datasets. Regarding next steps, if no objections, I will open a Jira for adding a Pregel iteration abstraction to Gelly. The Gelly guide has to be updated to reflect the spectrum of iteration abstractions that we have discussed in this thread, i.e. Pregel -> Spargel (Scatter-Gather) -> GSA. I think it might also be a good idea to do some renaming. Currently, we call the Spargel iteration "vertex-centric", which fits better to the Pregel abstraction. I propose we rename the spargel iteration into "scatter-gather" or "signal-collect" (where it was first introduced [2]). Any other ideas? Thanks, -Vasia. [1]: https://drive.google.com/file/d/0BzQJrI2eGlyYRTRjMkp1d3R6eVE/view?usp=sharing [2]: http://link.springer.com/chapter/10.1007/978-3-642-17746-0_48 On 11 November 2015 at 11:05, Stephan Ewen <se...@apache.org> wrote: > See: https://issues.apache.org/jira/browse/FLINK-3002 > > On Wed, Nov 11, 2015 at 10:54 AM, Stephan Ewen <se...@apache.org> wrote: > > > "Either" an "Optional" types are quite useful. > > > > Let's add them to the core Java API. > > > > On Wed, Nov 11, 2015 at 10:00 AM, Vasiliki Kalavri < > > vasilikikala...@gmail.com> wrote: > > > >> Thanks Fabian! I'll try that :) > >> > >> On 10 November 2015 at 22:31, Fabian Hueske <fhue...@gmail.com> wrote: > >> > >> > You could implement a Java Either type (similar to Scala's Either) > that > >> > either has a Message or the VertexState and a corresponding > >> TypeInformation > >> > and TypeSerializer that serializes a byte flag to indicate which both > >> types > >> > is used. > >> > It might actually make sense, to add a generic Either type to the Java > >> API > >> > in general (similar to the Java Tuples with resemble the Scala > Tuples). > >> > > >> > Cheers, Fabian > >> > > >> > 2015-11-10 22:16 GMT+01:00 Vasiliki Kalavri < > vasilikikala...@gmail.com > >> >: > >> > > >> > > Hi, > >> > > > >> > > after running a few experiments, I can confirm that putting the > >> combiner > >> > > after the flatMap is indeed more efficient. > >> > > > >> > > I ran SSSP and Connected Components with Spargel, GSA, and the > Pregel > >> > model > >> > > and the results are the following: > >> > > > >> > > - for SSSP, Spargel is always the slowest, GSA is a ~1.2x faster and > >> > Pregel > >> > > is ~1.1x faster without combiner, ~1.3x faster with combiner. > >> > > - for Connected Components, Spargel and GSA perform similarly, while > >> > Pregel > >> > > is 1.4-1.6x slower. > >> > > > >> > > To start with, this is much better than I expected :) > >> > > However, there is a main shortcoming in my current implementation > that > >> > > negatively impacts performance: > >> > > Since the compute function coGroup needs to output both new vertex > >> values > >> > > and new messages, I emit a wrapping tuple that contains both vertex > >> state > >> > > and messages and then filter them out based on a boolean field. The > >> > problem > >> > > is that since I cannot emit null fields, I emit a dummy message for > >> each > >> > > new vertex state and a dummy vertex state for each new message. That > >> > > essentially means that the intermediate messages result is double in > >> > size, > >> > > if say the vertex values are of the same type as the messages (can > be > >> > worse > >> > > if the vertex values are more complex). > >> > > So my question is, is there a way to avoid this redundancy, by > either > >> > > emitting null fields or by creating an operator that could emit 2 > >> > different > >> > > types of tuples? > >> > > > >> > > Thanks! > >> > > -Vasia. > >> > > > >> > > On 9 November 2015 at 15:20, Fabian Hueske <fhue...@gmail.com> > wrote: > >> > > > >> > > > Hi Vasia, > >> > > > > >> > > > sorry for the late reply. > >> > > > I don't think there is a big difference. In both cases, the > >> > partitioning > >> > > > and sorting happens at the end of the iteration. > >> > > > If the groupReduce is applied before the workset is returned, the > >> > sorting > >> > > > happens on the filtered result (after the flatMap) which might be > a > >> > > little > >> > > > bit more efficient (depending on the ratio of messages and > solution > >> set > >> > > > updates). Also it does not require that the initial workset is > >> sorted > >> > for > >> > > > the first groupReduce. > >> > > > > >> > > > I would put it at the end. > >> > > > > >> > > > Cheers, Fabian > >> > > > > >> > > > 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri < > >> vasilikikala...@gmail.com > >> > >: > >> > > > > >> > > > > @Fabian > >> > > > > > >> > > > > Is there any advantage in putting the reducer-combiner before > >> > updating > >> > > > the > >> > > > > workset vs. after (i.e. right before the join with the solution > >> set)? > >> > > > > > >> > > > > If it helps, here are the plans of these 2 alternatives: > >> > > > > > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing > >> > > > > > >> > > > > Thanks a lot for the help! > >> > > > > > >> > > > > -Vasia. > >> > > > > > >> > > > > On 30 October 2015 at 21:28, Fabian Hueske <fhue...@gmail.com> > >> > wrote: > >> > > > > > >> > > > > > We can of course inject an optional ReduceFunction (or > >> GroupReduce, > >> > > or > >> > > > > > combinable GroupReduce) to reduce the size of the work set. > >> > > > > > I suggested to remove the GroupReduce function, because it did > >> only > >> > > > > collect > >> > > > > > all messages into a single record by emitting the input > iterator > >> > > which > >> > > > is > >> > > > > > quite dangerous. Applying a combinable reduce function is > could > >> > > improve > >> > > > > the > >> > > > > > performance considerably. > >> > > > > > > >> > > > > > The good news is that it would come "for free" because the > >> > necessary > >> > > > > > partitioning and sorting can be reused (given the forwardField > >> > > > > annotations > >> > > > > > are correctly set): > >> > > > > > - The partitioning of the reduce can be reused for the join > with > >> > the > >> > > > > > solution set > >> > > > > > - The sort of the reduce is preserved by the join with the > >> > in-memory > >> > > > > > hash-table of the solution set and can be reused for the > >> coGroup. > >> > > > > > > >> > > > > > Best, > >> > > > > > Fabian > >> > > > > > > >> > > > > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri < > >> > > vasilikikala...@gmail.com > >> > > > >: > >> > > > > > > >> > > > > > > Hi Fabian, > >> > > > > > > > >> > > > > > > thanks so much for looking into this so quickly :-) > >> > > > > > > > >> > > > > > > One update I have to make is that I tried running a few > >> > experiments > >> > > > > with > >> > > > > > > this on a 6-node cluster. The current implementation gets > >> stuck > >> > at > >> > > > > > > "Rebuilding Workset Properties" and never finishes a single > >> > > > iteration. > >> > > > > > > Running the plan of one superstep without a delta iteration > >> > > > terminates > >> > > > > > > fine. I didn't have access to the cluster today, so I > couldn't > >> > > debug > >> > > > > this > >> > > > > > > further, but I will do as soon as I have access again. > >> > > > > > > > >> > > > > > > The rest of my comments are inline: > >> > > > > > > > >> > > > > > > On 30 October 2015 at 17:53, Fabian Hueske < > fhue...@gmail.com > >> > > >> > > > wrote: > >> > > > > > > > >> > > > > > > > Hi Vasia, > >> > > > > > > > > >> > > > > > > > I had a look at your new implementation and have a few > ideas > >> > for > >> > > > > > > > improvements. > >> > > > > > > > 1) Sending out the input iterator as you do in the last > >> > > GroupReduce > >> > > > > is > >> > > > > > > > quite dangerous and does not give a benefit compared to > >> > > collecting > >> > > > > all > >> > > > > > > > elements. Even though it is an iterator, it needs to be > >> > > completely > >> > > > > > > > materialized in-memory whenever the record is touched by > >> Flink > >> > or > >> > > > > user > >> > > > > > > > code. > >> > > > > > > > I would propose to skip the reduce step completely and > >> handle > >> > all > >> > > > > > > messages > >> > > > > > > > separates and only collect them in the CoGroup function > >> before > >> > > > giving > >> > > > > > > them > >> > > > > > > > into the VertexComputeFunction. Be careful, to only do > that > >> > with > >> > > > > > > > objectReuse disabled or take care to properly copy the > >> > messages. > >> > > If > >> > > > > you > >> > > > > > > > collect the messages in the CoGroup, you don't need the > >> > > > GroupReduce, > >> > > > > > have > >> > > > > > > > smaller records and you can remove the MessageIterator > class > >> > > > > > completely. > >> > > > > > > > > >> > > > > > > > >> > > > > > > I see. The idea was to expose to message combiner that user > >> > could > >> > > > > > > implement if the messages are combinable, e.g. min, sum. > This > >> > is a > >> > > > > > common > >> > > > > > > case and reduces the message load significantly. Is there a > >> way I > >> > > > could > >> > > > > > do > >> > > > > > > something similar before the coGroup? > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > 2) Add this annotation to the AppendVertexState function: > >> > > > > > > > @ForwardedFieldsFirst("*->f0"). This indicates that the > >> > complete > >> > > > > > element > >> > > > > > > of > >> > > > > > > > the first input becomes the first field of the output. > Since > >> > the > >> > > > > input > >> > > > > > is > >> > > > > > > > partitioned on "f0" (it comes out of the partitioned > >> solution > >> > > set) > >> > > > > the > >> > > > > > > > result of ApplyVertexState will be partitioned on "f0.f0" > >> which > >> > > is > >> > > > > > > > (accidentially :-D) the join key of the following coGroup > >> > > function > >> > > > -> > >> > > > > > no > >> > > > > > > > partitioning :-) > >> > > > > > > > > >> > > > > > > > >> > > > > > > Great! I totally missed that ;) > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > 3) Adding the two flatMap functions behind the CoGroup > >> prevents > >> > > > > > chaining > >> > > > > > > > and causes therefore some serialization overhead but > >> shouldn't > >> > be > >> > > > too > >> > > > > > > bad. > >> > > > > > > > > >> > > > > > > > So in total I would make this program as follows: > >> > > > > > > > > >> > > > > > > > iVertices<K,VV> > >> > > > > > > > iMessage<K, Message> = iVertices.map(new InitWorkSet()); > >> > > > > > > > > >> > > > > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > >> > > > > > > > verticesWithMessage<Vertex, Message> = > >> > iteration.getSolutionSet() > >> > > > > > > > .join(iteration.workSet()) > >> > > > > > > > .where(0) // solution set is local and build side > >> > > > > > > > .equalTo(0) // workset is shuffled and probe side of > >> hashjoin > >> > > > > > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> = > >> > > > > > > > verticesWithMessage.coGroup(edgessWithValue) > >> > > > > > > > .where("f0.f0") // vwm is locally forward and sorted > >> > > > > > > > .equalTo(0) // edges are already partitioned and sorted > >> (if > >> > > > cached > >> > > > > > > > correctly) > >> > > > > > > > .with(...) // The coGroup collects all messages in a > >> > collection > >> > > > and > >> > > > > > > gives > >> > > > > > > > it to the ComputeFunction > >> > > > > > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned > >> when > >> > > > merged > >> > > > > > > into > >> > > > > > > > solution set > >> > > > > > > > workSet<K, Message> = superStepComp.flatMap(...) // > >> partitioned > >> > > for > >> > > > > > join > >> > > > > > > > iteration.closeWith(delta, workSet) > >> > > > > > > > > >> > > > > > > > So, if I am correct, the program will > >> > > > > > > > - partition the workset > >> > > > > > > > - sort the vertices with messages > >> > > > > > > > - partition the delta > >> > > > > > > > > >> > > > > > > > One observation I have is that this program requires that > >> all > >> > > > > messages > >> > > > > > > fit > >> > > > > > > > into memory. Was that also the case before? > >> > > > > > > > > >> > > > > > > > >> > > > > > > I believe not. The plan has one coGroup that produces the > >> > messages > >> > > > > and a > >> > > > > > > following coGroup that groups by the messages "target ID" > and > >> > > > consumes > >> > > > > > > them in an iterator. That doesn't require them to fit in > >> memory, > >> > > > > right? > >> > > > > > > > >> > > > > > > > >> > > > > > > I'm also working on a version where the graph is > represented > >> as > >> > an > >> > > > > > > adjacency list, instead of two separate datasets of vertices > >> and > >> > > > edges. > >> > > > > > The > >> > > > > > > disadvantage is that the graph has to fit in memory, but I > >> think > >> > > the > >> > > > > > > advantages are many. We'll be able to support edge value > >> > updates, > >> > > > edge > >> > > > > > > mutations and different edge access order guarantees. I'll > get > >> > back > >> > > > to > >> > > > > > this > >> > > > > > > thread when I have a working prototype. > >> > > > > > > > >> > > > > > > > >> > > > > > > > > >> > > > > > > > Cheers, > >> > > > > > > > Fabian > >> > > > > > > > > >> > > > > > > > >> > > > > > > Thanks again! > >> > > > > > > > >> > > > > > > Cheers, > >> > > > > > > -Vasia. > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri < > >> > > > > vasilikikala...@gmail.com > >> > > > > > >: > >> > > > > > > > > >> > > > > > > > > @Martin: thanks for your input! If you ran into any > other > >> > > issues > >> > > > > > that I > >> > > > > > > > > didn't mention, please let us know. Obviously, even with > >> my > >> > > > > proposal, > >> > > > > > > > there > >> > > > > > > > > are still features we cannot support, e.g. updating edge > >> > values > >> > > > and > >> > > > > > > graph > >> > > > > > > > > mutations. We'll need to re-think the underlying > iteration > >> > > and/or > >> > > > > > graph > >> > > > > > > > > representation for those. > >> > > > > > > > > > >> > > > > > > > > @Fabian: thanks a lot, no rush :) > >> > > > > > > > > Let me give you some more information that might make it > >> > easier > >> > > > to > >> > > > > > > reason > >> > > > > > > > > about performance: > >> > > > > > > > > > >> > > > > > > > > Currently, in Spargel the SolutionSet (SS) keeps the > >> vertex > >> > > state > >> > > > > and > >> > > > > > > the > >> > > > > > > > > workset (WS) keeps the active vertices. The iteration is > >> > > composed > >> > > > > of > >> > > > > > 2 > >> > > > > > > > > coGroups. The first one takes the WS and the edges and > >> > produces > >> > > > > > > messages. > >> > > > > > > > > The second one takes the messages and the SS and > produced > >> the > >> > > new > >> > > > > WS > >> > > > > > > and > >> > > > > > > > > the SS-delta. > >> > > > > > > > > > >> > > > > > > > > In my proposal, the SS has the vertex state and the WS > has > >> > > > > <vertexId, > >> > > > > > > > > MessageIterator> pairs, i.e. the inbox of each vertex. > The > >> > plan > >> > > > is > >> > > > > > more > >> > > > > > > > > complicated because compute() needs to have two > iterators: > >> > over > >> > > > the > >> > > > > > > edges > >> > > > > > > > > and over the messages. > >> > > > > > > > > First, I join SS and WS to get the active vertices (have > >> > > > received a > >> > > > > > > msg) > >> > > > > > > > > and their current state. Then I coGroup the result with > >> the > >> > > edges > >> > > > > to > >> > > > > > > > access > >> > > > > > > > > the neighbors. Now the main problem is that this coGroup > >> > needs > >> > > to > >> > > > > > have > >> > > > > > > 2 > >> > > > > > > > > outputs: the new messages and the new vertex value. I > >> > couldn't > >> > > > > really > >> > > > > > > > find > >> > > > > > > > > a nice way to do this, so I'm emitting a Tuple that > >> contains > >> > > both > >> > > > > > types > >> > > > > > > > and > >> > > > > > > > > I have a flag to separate them later with 2 flatMaps. > From > >> > the > >> > > > > vertex > >> > > > > > > > > flatMap, I crete the SS-delta and from the messaged > >> flatMap I > >> > > > > apply a > >> > > > > > > > > reduce to group the messages by vertex and send them to > >> the > >> > new > >> > > > WS. > >> > > > > > One > >> > > > > > > > > optimization would be to expose a combiner here to > reduce > >> > > message > >> > > > > > size. > >> > > > > > > > > > >> > > > > > > > > tl;dr: > >> > > > > > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > >> > > > > > > > > 2. how can we efficiently emit 2 different types of > >> records > >> > > from > >> > > > a > >> > > > > > > > coGroup? > >> > > > > > > > > 3. does it make any difference if we group/combine the > >> > messages > >> > > > > > before > >> > > > > > > > > updating the workset or after? > >> > > > > > > > > > >> > > > > > > > > Cheers, > >> > > > > > > > > -Vasia. > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > On 27 October 2015 at 18:39, Fabian Hueske < > >> > fhue...@gmail.com> > >> > > > > > wrote: > >> > > > > > > > > > >> > > > > > > > > > I'll try to have a look at the proposal from a > >> performance > >> > > > point > >> > > > > of > >> > > > > > > > view > >> > > > > > > > > in > >> > > > > > > > > > the next days. > >> > > > > > > > > > Please ping me, if I don't follow up this thread. > >> > > > > > > > > > > >> > > > > > > > > > Cheers, Fabian > >> > > > > > > > > > > >> > > > > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns < > >> > > > > > m.jungha...@mailbox.org > >> > > > > > > >: > >> > > > > > > > > > > >> > > > > > > > > > > Hi, > >> > > > > > > > > > > > >> > > > > > > > > > > At our group, we also moved several algorithms from > >> > Giraph > >> > > to > >> > > > > > Gelly > >> > > > > > > > and > >> > > > > > > > > > > ran into some confusing issues (first in > >> understanding, > >> > > > second > >> > > > > > > during > >> > > > > > > > > > > implementation) caused by the conceptional > differences > >> > you > >> > > > > > > described. > >> > > > > > > > > > > > >> > > > > > > > > > > If there are no concrete advantages (performance > >> mainly) > >> > in > >> > > > the > >> > > > > > > > Spargel > >> > > > > > > > > > > implementation, we would be very happy to see the > >> Gelly > >> > API > >> > > > be > >> > > > > > > > aligned > >> > > > > > > > > to > >> > > > > > > > > > > Pregel-like systems. > >> > > > > > > > > > > > >> > > > > > > > > > > Your SSSP example speaks for itself. > Straightforward, > >> if > >> > > the > >> > > > > > reader > >> > > > > > > > is > >> > > > > > > > > > > familiar with Pregel/Giraph/... > >> > > > > > > > > > > > >> > > > > > > > > > > Best, > >> > > > > > > > > > > Martin > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > >> > > > > > > > > > > > >> > > > > > > > > > >> Hello squirrels, > >> > > > > > > > > > >> > >> > > > > > > > > > >> I want to discuss with you a few concerns I have > >> about > >> > our > >> > > > > > current > >> > > > > > > > > > >> vertex-centric model implementation, Spargel, now > >> fully > >> > > > > subsumed > >> > > > > > > by > >> > > > > > > > > > Gelly. > >> > > > > > > > > > >> > >> > > > > > > > > > >> Spargel is our implementation of Pregel [1], but it > >> > > violates > >> > > > > > some > >> > > > > > > > > > >> fundamental properties of the model, as described > in > >> the > >> > > > paper > >> > > > > > and > >> > > > > > > > as > >> > > > > > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find > >> > myself > >> > > > > > > confused > >> > > > > > > > > both > >> > > > > > > > > > >> when trying to explain it to current Giraph users > and > >> > when > >> > > > > > porting > >> > > > > > > > my > >> > > > > > > > > > >> Giraph algorithms to it. > >> > > > > > > > > > >> > >> > > > > > > > > > >> More specifically: > >> > > > > > > > > > >> - in the Pregel model, messages produced in > >> superstep n, > >> > > are > >> > > > > > > > received > >> > > > > > > > > in > >> > > > > > > > > > >> superstep n+1. In Spargel, they are produced and > >> > consumed > >> > > in > >> > > > > the > >> > > > > > > > same > >> > > > > > > > > > >> iteration. > >> > > > > > > > > > >> - in Pregel, vertices are active during a > superstep, > >> if > >> > > they > >> > > > > > have > >> > > > > > > > > > received > >> > > > > > > > > > >> a message in the previous superstep. In Spargel, a > >> > vertex > >> > > is > >> > > > > > > active > >> > > > > > > > > > during > >> > > > > > > > > > >> a superstep if it has changed its value. > >> > > > > > > > > > >> > >> > > > > > > > > > >> These two differences require a lot of rethinking > >> when > >> > > > porting > >> > > > > > > > > > >> applications > >> > > > > > > > > > >> and can easily cause bugs. > >> > > > > > > > > > >> > >> > > > > > > > > > >> The most important problem however is that we > require > >> > the > >> > > > user > >> > > > > > to > >> > > > > > > > > split > >> > > > > > > > > > >> the > >> > > > > > > > > > >> computation in 2 phases (2 UDFs): > >> > > > > > > > > > >> - messaging: has access to the vertex state and can > >> > > produce > >> > > > > > > messages > >> > > > > > > > > > >> - update: has access to incoming messages and can > >> update > >> > > the > >> > > > > > > vertex > >> > > > > > > > > > value > >> > > > > > > > > > >> > >> > > > > > > > > > >> Pregel/Giraph only expose one UDF to the user: > >> > > > > > > > > > >> - compute: has access to both the vertex state and > >> the > >> > > > > incoming > >> > > > > > > > > > messages, > >> > > > > > > > > > >> can produce messages and update the vertex value. > >> > > > > > > > > > >> > >> > > > > > > > > > >> This might not seem like a big deal, but except > from > >> > > forcing > >> > > > > the > >> > > > > > > > user > >> > > > > > > > > to > >> > > > > > > > > > >> split their program logic into 2 phases, Spargel > also > >> > > makes > >> > > > > some > >> > > > > > > > > common > >> > > > > > > > > > >> computation patterns non-intuitive or impossible to > >> > > write. A > >> > > > > > very > >> > > > > > > > > simple > >> > > > > > > > > > >> example is propagating a message based on its value > >> or > >> > > > sender > >> > > > > > ID. > >> > > > > > > To > >> > > > > > > > > do > >> > > > > > > > > > >> this with Spargel, one has to store all the > incoming > >> > > > messages > >> > > > > in > >> > > > > > > the > >> > > > > > > > > > >> vertex > >> > > > > > > > > > >> value (might be of different type btw) during the > >> > > messaging > >> > > > > > phase, > >> > > > > > > > so > >> > > > > > > > > > that > >> > > > > > > > > > >> they can be accessed during the update phase. > >> > > > > > > > > > >> > >> > > > > > > > > > >> So, my first question is, when implementing > Spargel, > >> > were > >> > > > > other > >> > > > > > > > > > >> alternatives considered and maybe rejected in favor > >> of > >> > > > > > performance > >> > > > > > > > or > >> > > > > > > > > > >> because of some other reason? If someone knows, I > >> would > >> > > love > >> > > > > to > >> > > > > > > hear > >> > > > > > > > > > about > >> > > > > > > > > > >> them! > >> > > > > > > > > > >> > >> > > > > > > > > > >> Second, I wrote a prototype implementation [2] that > >> only > >> > > > > exposes > >> > > > > > > one > >> > > > > > > > > > UDF, > >> > > > > > > > > > >> compute(), by keeping the vertex state in the > >> solution > >> > set > >> > > > and > >> > > > > > the > >> > > > > > > > > > >> messages > >> > > > > > > > > > >> in the workset. This way all previously mentioned > >> > > > limitations > >> > > > > go > >> > > > > > > > away > >> > > > > > > > > > and > >> > > > > > > > > > >> the API (see "SSSPComputeFunction" in the example > >> [3]) > >> > > > looks a > >> > > > > > lot > >> > > > > > > > > more > >> > > > > > > > > > >> like Giraph (see [4]). > >> > > > > > > > > > >> > >> > > > > > > > > > >> I have not run any experiments yet and the > prototype > >> has > >> > > > some > >> > > > > > ugly > >> > > > > > > > > > hacks, > >> > > > > > > > > > >> but if you think any of this makes sense, then I'd > be > >> > > > willing > >> > > > > to > >> > > > > > > > > follow > >> > > > > > > > > > up > >> > > > > > > > > > >> and try to optimize it. If we see that it performs > >> well, > >> > > we > >> > > > > can > >> > > > > > > > > consider > >> > > > > > > > > > >> either replacing Spargel or adding it as an > >> alternative. > >> > > > > > > > > > >> > >> > > > > > > > > > >> Thanks for reading this long e-mail and looking > >> forward > >> > to > >> > > > > your > >> > > > > > > > input! > >> > > > > > > > > > >> > >> > > > > > > > > > >> Cheers, > >> > > > > > > > > > >> -Vasia. > >> > > > > > > > > > >> > >> > > > > > > > > > >> [1]: > >> https://kowshik.github.io/JPregel/pregel_paper.pdf > >> > > > > > > > > > >> [2]: > >> > > > > > > > > > >> > >> > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > >> > > > > > > > > > >> [3]: > >> > > > > > > > > > >> > >> > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > >> > > > > > > > > > >> [4]: > >> > > > > > > > > > >> > >> > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > >> > > > > > > > > > >> > >> > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > > > >