issue created: https://issues.apache.org/jira/browse/FLINK-3207
If anyone has any other suggestion about the renaming, let me know :) -V. On 5 January 2016 at 11:52, Aljoscha Krettek <aljos...@apache.org> wrote: > Nice to hear. :D > > I think you can go ahead and add the Jira. About the renaming: I also > think that it would make sense to do it. > > On 04 Jan 2016, at 19:48, Vasiliki Kalavri <vasilikikala...@gmail.com> > wrote: > > > > Hello squirrels and happy new year! > > > > I'm reviving this thread to share some results and discuss next steps. > > > > Using the Either type I was able to get rid of redundant messages and > > vertex state. During the past few weeks, I have been running experiments, > > which show that the performance of this "Pregel" model has improved a > lot :) > > In [1], you can see the speedup of GSA and Pregel over Spargel, for SSSP > > and Connected Components (CC), for the Livejournal (68m edges), Orkut > (117m > > edges) and Wikipedia (340m edges) datasets. > > > > Regarding next steps, if no objections, I will open a Jira for adding a > > Pregel iteration abstraction to Gelly. The Gelly guide has to be updated > to > > reflect the spectrum of iteration abstractions that we have discussed in > > this thread, i.e. Pregel -> Spargel (Scatter-Gather) -> GSA. > > > > I think it might also be a good idea to do some renaming. Currently, we > > call the Spargel iteration "vertex-centric", which fits better to the > > Pregel abstraction. I propose we rename the spargel iteration into > > "scatter-gather" or "signal-collect" (where it was first introduced [2]). > > Any other ideas? > > > > Thanks, > > -Vasia. > > > > [1]: > > > https://drive.google.com/file/d/0BzQJrI2eGlyYRTRjMkp1d3R6eVE/view?usp=sharing > > [2]: http://link.springer.com/chapter/10.1007/978-3-642-17746-0_48 > > > > On 11 November 2015 at 11:05, Stephan Ewen <se...@apache.org> wrote: > > > >> See: https://issues.apache.org/jira/browse/FLINK-3002 > >> > >> On Wed, Nov 11, 2015 at 10:54 AM, Stephan Ewen <se...@apache.org> > wrote: > >> > >>> "Either" an "Optional" types are quite useful. > >>> > >>> Let's add them to the core Java API. > >>> > >>> On Wed, Nov 11, 2015 at 10:00 AM, Vasiliki Kalavri < > >>> vasilikikala...@gmail.com> wrote: > >>> > >>>> Thanks Fabian! I'll try that :) > >>>> > >>>> On 10 November 2015 at 22:31, Fabian Hueske <fhue...@gmail.com> > wrote: > >>>> > >>>>> You could implement a Java Either type (similar to Scala's Either) > >> that > >>>>> either has a Message or the VertexState and a corresponding > >>>> TypeInformation > >>>>> and TypeSerializer that serializes a byte flag to indicate which both > >>>> types > >>>>> is used. > >>>>> It might actually make sense, to add a generic Either type to the > Java > >>>> API > >>>>> in general (similar to the Java Tuples with resemble the Scala > >> Tuples). > >>>>> > >>>>> Cheers, Fabian > >>>>> > >>>>> 2015-11-10 22:16 GMT+01:00 Vasiliki Kalavri < > >> vasilikikala...@gmail.com > >>>>> : > >>>>> > >>>>>> Hi, > >>>>>> > >>>>>> after running a few experiments, I can confirm that putting the > >>>> combiner > >>>>>> after the flatMap is indeed more efficient. > >>>>>> > >>>>>> I ran SSSP and Connected Components with Spargel, GSA, and the > >> Pregel > >>>>> model > >>>>>> and the results are the following: > >>>>>> > >>>>>> - for SSSP, Spargel is always the slowest, GSA is a ~1.2x faster and > >>>>> Pregel > >>>>>> is ~1.1x faster without combiner, ~1.3x faster with combiner. > >>>>>> - for Connected Components, Spargel and GSA perform similarly, while > >>>>> Pregel > >>>>>> is 1.4-1.6x slower. > >>>>>> > >>>>>> To start with, this is much better than I expected :) > >>>>>> However, there is a main shortcoming in my current implementation > >> that > >>>>>> negatively impacts performance: > >>>>>> Since the compute function coGroup needs to output both new vertex > >>>> values > >>>>>> and new messages, I emit a wrapping tuple that contains both vertex > >>>> state > >>>>>> and messages and then filter them out based on a boolean field. The > >>>>> problem > >>>>>> is that since I cannot emit null fields, I emit a dummy message for > >>>> each > >>>>>> new vertex state and a dummy vertex state for each new message. That > >>>>>> essentially means that the intermediate messages result is double in > >>>>> size, > >>>>>> if say the vertex values are of the same type as the messages (can > >> be > >>>>> worse > >>>>>> if the vertex values are more complex). > >>>>>> So my question is, is there a way to avoid this redundancy, by > >> either > >>>>>> emitting null fields or by creating an operator that could emit 2 > >>>>> different > >>>>>> types of tuples? > >>>>>> > >>>>>> Thanks! > >>>>>> -Vasia. > >>>>>> > >>>>>> On 9 November 2015 at 15:20, Fabian Hueske <fhue...@gmail.com> > >> wrote: > >>>>>> > >>>>>>> Hi Vasia, > >>>>>>> > >>>>>>> sorry for the late reply. > >>>>>>> I don't think there is a big difference. In both cases, the > >>>>> partitioning > >>>>>>> and sorting happens at the end of the iteration. > >>>>>>> If the groupReduce is applied before the workset is returned, the > >>>>> sorting > >>>>>>> happens on the filtered result (after the flatMap) which might be > >> a > >>>>>> little > >>>>>>> bit more efficient (depending on the ratio of messages and > >> solution > >>>> set > >>>>>>> updates). Also it does not require that the initial workset is > >>>> sorted > >>>>> for > >>>>>>> the first groupReduce. > >>>>>>> > >>>>>>> I would put it at the end. > >>>>>>> > >>>>>>> Cheers, Fabian > >>>>>>> > >>>>>>> 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri < > >>>> vasilikikala...@gmail.com > >>>>>> : > >>>>>>> > >>>>>>>> @Fabian > >>>>>>>> > >>>>>>>> Is there any advantage in putting the reducer-combiner before > >>>>> updating > >>>>>>> the > >>>>>>>> workset vs. after (i.e. right before the join with the solution > >>>> set)? > >>>>>>>> > >>>>>>>> If it helps, here are the plans of these 2 alternatives: > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >> > https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >> > https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing > >>>>>>>> > >>>>>>>> Thanks a lot for the help! > >>>>>>>> > >>>>>>>> -Vasia. > >>>>>>>> > >>>>>>>> On 30 October 2015 at 21:28, Fabian Hueske <fhue...@gmail.com> > >>>>> wrote: > >>>>>>>> > >>>>>>>>> We can of course inject an optional ReduceFunction (or > >>>> GroupReduce, > >>>>>> or > >>>>>>>>> combinable GroupReduce) to reduce the size of the work set. > >>>>>>>>> I suggested to remove the GroupReduce function, because it did > >>>> only > >>>>>>>> collect > >>>>>>>>> all messages into a single record by emitting the input > >> iterator > >>>>>> which > >>>>>>> is > >>>>>>>>> quite dangerous. Applying a combinable reduce function is > >> could > >>>>>> improve > >>>>>>>> the > >>>>>>>>> performance considerably. > >>>>>>>>> > >>>>>>>>> The good news is that it would come "for free" because the > >>>>> necessary > >>>>>>>>> partitioning and sorting can be reused (given the forwardField > >>>>>>>> annotations > >>>>>>>>> are correctly set): > >>>>>>>>> - The partitioning of the reduce can be reused for the join > >> with > >>>>> the > >>>>>>>>> solution set > >>>>>>>>> - The sort of the reduce is preserved by the join with the > >>>>> in-memory > >>>>>>>>> hash-table of the solution set and can be reused for the > >>>> coGroup. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Fabian > >>>>>>>>> > >>>>>>>>> 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri < > >>>>>> vasilikikala...@gmail.com > >>>>>>>> : > >>>>>>>>> > >>>>>>>>>> Hi Fabian, > >>>>>>>>>> > >>>>>>>>>> thanks so much for looking into this so quickly :-) > >>>>>>>>>> > >>>>>>>>>> One update I have to make is that I tried running a few > >>>>> experiments > >>>>>>>> with > >>>>>>>>>> this on a 6-node cluster. The current implementation gets > >>>> stuck > >>>>> at > >>>>>>>>>> "Rebuilding Workset Properties" and never finishes a single > >>>>>>> iteration. > >>>>>>>>>> Running the plan of one superstep without a delta iteration > >>>>>>> terminates > >>>>>>>>>> fine. I didn't have access to the cluster today, so I > >> couldn't > >>>>>> debug > >>>>>>>> this > >>>>>>>>>> further, but I will do as soon as I have access again. > >>>>>>>>>> > >>>>>>>>>> The rest of my comments are inline: > >>>>>>>>>> > >>>>>>>>>> On 30 October 2015 at 17:53, Fabian Hueske < > >> fhue...@gmail.com > >>>>> > >>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi Vasia, > >>>>>>>>>>> > >>>>>>>>>>> I had a look at your new implementation and have a few > >> ideas > >>>>> for > >>>>>>>>>>> improvements. > >>>>>>>>>>> 1) Sending out the input iterator as you do in the last > >>>>>> GroupReduce > >>>>>>>> is > >>>>>>>>>>> quite dangerous and does not give a benefit compared to > >>>>>> collecting > >>>>>>>> all > >>>>>>>>>>> elements. Even though it is an iterator, it needs to be > >>>>>> completely > >>>>>>>>>>> materialized in-memory whenever the record is touched by > >>>> Flink > >>>>> or > >>>>>>>> user > >>>>>>>>>>> code. > >>>>>>>>>>> I would propose to skip the reduce step completely and > >>>> handle > >>>>> all > >>>>>>>>>> messages > >>>>>>>>>>> separates and only collect them in the CoGroup function > >>>> before > >>>>>>> giving > >>>>>>>>>> them > >>>>>>>>>>> into the VertexComputeFunction. Be careful, to only do > >> that > >>>>> with > >>>>>>>>>>> objectReuse disabled or take care to properly copy the > >>>>> messages. > >>>>>> If > >>>>>>>> you > >>>>>>>>>>> collect the messages in the CoGroup, you don't need the > >>>>>>> GroupReduce, > >>>>>>>>> have > >>>>>>>>>>> smaller records and you can remove the MessageIterator > >> class > >>>>>>>>> completely. > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> I see. The idea was to expose to message combiner that user > >>>>> could > >>>>>>>>>> implement if the messages are combinable, e.g. min, sum. > >> This > >>>>> is a > >>>>>>>>> common > >>>>>>>>>> case and reduces the message load significantly. Is there a > >>>> way I > >>>>>>> could > >>>>>>>>> do > >>>>>>>>>> something similar before the coGroup? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> 2) Add this annotation to the AppendVertexState function: > >>>>>>>>>>> @ForwardedFieldsFirst("*->f0"). This indicates that the > >>>>> complete > >>>>>>>>> element > >>>>>>>>>> of > >>>>>>>>>>> the first input becomes the first field of the output. > >> Since > >>>>> the > >>>>>>>> input > >>>>>>>>> is > >>>>>>>>>>> partitioned on "f0" (it comes out of the partitioned > >>>> solution > >>>>>> set) > >>>>>>>> the > >>>>>>>>>>> result of ApplyVertexState will be partitioned on "f0.f0" > >>>> which > >>>>>> is > >>>>>>>>>>> (accidentially :-D) the join key of the following coGroup > >>>>>> function > >>>>>>> -> > >>>>>>>>> no > >>>>>>>>>>> partitioning :-) > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Great! I totally missed that ;) > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> 3) Adding the two flatMap functions behind the CoGroup > >>>> prevents > >>>>>>>>> chaining > >>>>>>>>>>> and causes therefore some serialization overhead but > >>>> shouldn't > >>>>> be > >>>>>>> too > >>>>>>>>>> bad. > >>>>>>>>>>> > >>>>>>>>>>> So in total I would make this program as follows: > >>>>>>>>>>> > >>>>>>>>>>> iVertices<K,VV> > >>>>>>>>>>> iMessage<K, Message> = iVertices.map(new InitWorkSet()); > >>>>>>>>>>> > >>>>>>>>>>> iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > >>>>>>>>>>> verticesWithMessage<Vertex, Message> = > >>>>> iteration.getSolutionSet() > >>>>>>>>>>> .join(iteration.workSet()) > >>>>>>>>>>> .where(0) // solution set is local and build side > >>>>>>>>>>> .equalTo(0) // workset is shuffled and probe side of > >>>> hashjoin > >>>>>>>>>>> superstepComp<Vertex,Tuple2<K, Message>,Bool> = > >>>>>>>>>>> verticesWithMessage.coGroup(edgessWithValue) > >>>>>>>>>>> .where("f0.f0") // vwm is locally forward and sorted > >>>>>>>>>>> .equalTo(0) // edges are already partitioned and sorted > >>>> (if > >>>>>>> cached > >>>>>>>>>>> correctly) > >>>>>>>>>>> .with(...) // The coGroup collects all messages in a > >>>>> collection > >>>>>>> and > >>>>>>>>>> gives > >>>>>>>>>>> it to the ComputeFunction > >>>>>>>>>>> delta<Vertex> = superStepComp.flatMap(...) // partitioned > >>>> when > >>>>>>> merged > >>>>>>>>>> into > >>>>>>>>>>> solution set > >>>>>>>>>>> workSet<K, Message> = superStepComp.flatMap(...) // > >>>> partitioned > >>>>>> for > >>>>>>>>> join > >>>>>>>>>>> iteration.closeWith(delta, workSet) > >>>>>>>>>>> > >>>>>>>>>>> So, if I am correct, the program will > >>>>>>>>>>> - partition the workset > >>>>>>>>>>> - sort the vertices with messages > >>>>>>>>>>> - partition the delta > >>>>>>>>>>> > >>>>>>>>>>> One observation I have is that this program requires that > >>>> all > >>>>>>>> messages > >>>>>>>>>> fit > >>>>>>>>>>> into memory. Was that also the case before? > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> I believe not. The plan has one coGroup that produces the > >>>>> messages > >>>>>>>> and a > >>>>>>>>>> following coGroup that groups by the messages "target ID" > >> and > >>>>>>> consumes > >>>>>>>>>> them in an iterator. That doesn't require them to fit in > >>>> memory, > >>>>>>>> right? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> I'm also working on a version where the graph is > >> represented > >>>> as > >>>>> an > >>>>>>>>>> adjacency list, instead of two separate datasets of vertices > >>>> and > >>>>>>> edges. > >>>>>>>>> The > >>>>>>>>>> disadvantage is that the graph has to fit in memory, but I > >>>> think > >>>>>> the > >>>>>>>>>> advantages are many. We'll be able to support edge value > >>>>> updates, > >>>>>>> edge > >>>>>>>>>> mutations and different edge access order guarantees. I'll > >> get > >>>>> back > >>>>>>> to > >>>>>>>>> this > >>>>>>>>>> thread when I have a working prototype. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Cheers, > >>>>>>>>>>> Fabian > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Thanks again! > >>>>>>>>>> > >>>>>>>>>> Cheers, > >>>>>>>>>> -Vasia. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri < > >>>>>>>> vasilikikala...@gmail.com > >>>>>>>>>> : > >>>>>>>>>>> > >>>>>>>>>>>> @Martin: thanks for your input! If you ran into any > >> other > >>>>>> issues > >>>>>>>>> that I > >>>>>>>>>>>> didn't mention, please let us know. Obviously, even with > >>>> my > >>>>>>>> proposal, > >>>>>>>>>>> there > >>>>>>>>>>>> are still features we cannot support, e.g. updating edge > >>>>> values > >>>>>>> and > >>>>>>>>>> graph > >>>>>>>>>>>> mutations. We'll need to re-think the underlying > >> iteration > >>>>>> and/or > >>>>>>>>> graph > >>>>>>>>>>>> representation for those. > >>>>>>>>>>>> > >>>>>>>>>>>> @Fabian: thanks a lot, no rush :) > >>>>>>>>>>>> Let me give you some more information that might make it > >>>>> easier > >>>>>>> to > >>>>>>>>>> reason > >>>>>>>>>>>> about performance: > >>>>>>>>>>>> > >>>>>>>>>>>> Currently, in Spargel the SolutionSet (SS) keeps the > >>>> vertex > >>>>>> state > >>>>>>>> and > >>>>>>>>>> the > >>>>>>>>>>>> workset (WS) keeps the active vertices. The iteration is > >>>>>> composed > >>>>>>>> of > >>>>>>>>> 2 > >>>>>>>>>>>> coGroups. The first one takes the WS and the edges and > >>>>> produces > >>>>>>>>>> messages. > >>>>>>>>>>>> The second one takes the messages and the SS and > >> produced > >>>> the > >>>>>> new > >>>>>>>> WS > >>>>>>>>>> and > >>>>>>>>>>>> the SS-delta. > >>>>>>>>>>>> > >>>>>>>>>>>> In my proposal, the SS has the vertex state and the WS > >> has > >>>>>>>> <vertexId, > >>>>>>>>>>>> MessageIterator> pairs, i.e. the inbox of each vertex. > >> The > >>>>> plan > >>>>>>> is > >>>>>>>>> more > >>>>>>>>>>>> complicated because compute() needs to have two > >> iterators: > >>>>> over > >>>>>>> the > >>>>>>>>>> edges > >>>>>>>>>>>> and over the messages. > >>>>>>>>>>>> First, I join SS and WS to get the active vertices (have > >>>>>>> received a > >>>>>>>>>> msg) > >>>>>>>>>>>> and their current state. Then I coGroup the result with > >>>> the > >>>>>> edges > >>>>>>>> to > >>>>>>>>>>> access > >>>>>>>>>>>> the neighbors. Now the main problem is that this coGroup > >>>>> needs > >>>>>> to > >>>>>>>>> have > >>>>>>>>>> 2 > >>>>>>>>>>>> outputs: the new messages and the new vertex value. I > >>>>> couldn't > >>>>>>>> really > >>>>>>>>>>> find > >>>>>>>>>>>> a nice way to do this, so I'm emitting a Tuple that > >>>> contains > >>>>>> both > >>>>>>>>> types > >>>>>>>>>>> and > >>>>>>>>>>>> I have a flag to separate them later with 2 flatMaps. > >> From > >>>>> the > >>>>>>>> vertex > >>>>>>>>>>>> flatMap, I crete the SS-delta and from the messaged > >>>> flatMap I > >>>>>>>> apply a > >>>>>>>>>>>> reduce to group the messages by vertex and send them to > >>>> the > >>>>> new > >>>>>>> WS. > >>>>>>>>> One > >>>>>>>>>>>> optimization would be to expose a combiner here to > >> reduce > >>>>>> message > >>>>>>>>> size. > >>>>>>>>>>>> > >>>>>>>>>>>> tl;dr: > >>>>>>>>>>>> 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > >>>>>>>>>>>> 2. how can we efficiently emit 2 different types of > >>>> records > >>>>>> from > >>>>>>> a > >>>>>>>>>>> coGroup? > >>>>>>>>>>>> 3. does it make any difference if we group/combine the > >>>>> messages > >>>>>>>>> before > >>>>>>>>>>>> updating the workset or after? > >>>>>>>>>>>> > >>>>>>>>>>>> Cheers, > >>>>>>>>>>>> -Vasia. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On 27 October 2015 at 18:39, Fabian Hueske < > >>>>> fhue...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> I'll try to have a look at the proposal from a > >>>> performance > >>>>>>> point > >>>>>>>> of > >>>>>>>>>>> view > >>>>>>>>>>>> in > >>>>>>>>>>>>> the next days. > >>>>>>>>>>>>> Please ping me, if I don't follow up this thread. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Cheers, Fabian > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2015-10-27 18:28 GMT+01:00 Martin Junghanns < > >>>>>>>>> m.jungha...@mailbox.org > >>>>>>>>>>> : > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> At our group, we also moved several algorithms from > >>>>> Giraph > >>>>>> to > >>>>>>>>> Gelly > >>>>>>>>>>> and > >>>>>>>>>>>>>> ran into some confusing issues (first in > >>>> understanding, > >>>>>>> second > >>>>>>>>>> during > >>>>>>>>>>>>>> implementation) caused by the conceptional > >> differences > >>>>> you > >>>>>>>>>> described. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> If there are no concrete advantages (performance > >>>> mainly) > >>>>> in > >>>>>>> the > >>>>>>>>>>> Spargel > >>>>>>>>>>>>>> implementation, we would be very happy to see the > >>>> Gelly > >>>>> API > >>>>>>> be > >>>>>>>>>>> aligned > >>>>>>>>>>>> to > >>>>>>>>>>>>>> Pregel-like systems. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Your SSSP example speaks for itself. > >> Straightforward, > >>>> if > >>>>>> the > >>>>>>>>> reader > >>>>>>>>>>> is > >>>>>>>>>>>>>> familiar with Pregel/Giraph/... > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>> Martin > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On 27.10.2015 17:40, Vasiliki Kalavri wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hello squirrels, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I want to discuss with you a few concerns I have > >>>> about > >>>>> our > >>>>>>>>> current > >>>>>>>>>>>>>>> vertex-centric model implementation, Spargel, now > >>>> fully > >>>>>>>> subsumed > >>>>>>>>>> by > >>>>>>>>>>>>> Gelly. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Spargel is our implementation of Pregel [1], but it > >>>>>> violates > >>>>>>>>> some > >>>>>>>>>>>>>>> fundamental properties of the model, as described > >> in > >>>> the > >>>>>>> paper > >>>>>>>>> and > >>>>>>>>>>> as > >>>>>>>>>>>>>>> implemented in e.g. Giraph, GPS, Hama. I often find > >>>>> myself > >>>>>>>>>> confused > >>>>>>>>>>>> both > >>>>>>>>>>>>>>> when trying to explain it to current Giraph users > >> and > >>>>> when > >>>>>>>>> porting > >>>>>>>>>>> my > >>>>>>>>>>>>>>> Giraph algorithms to it. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> More specifically: > >>>>>>>>>>>>>>> - in the Pregel model, messages produced in > >>>> superstep n, > >>>>>> are > >>>>>>>>>>> received > >>>>>>>>>>>> in > >>>>>>>>>>>>>>> superstep n+1. In Spargel, they are produced and > >>>>> consumed > >>>>>> in > >>>>>>>> the > >>>>>>>>>>> same > >>>>>>>>>>>>>>> iteration. > >>>>>>>>>>>>>>> - in Pregel, vertices are active during a > >> superstep, > >>>> if > >>>>>> they > >>>>>>>>> have > >>>>>>>>>>>>> received > >>>>>>>>>>>>>>> a message in the previous superstep. In Spargel, a > >>>>> vertex > >>>>>> is > >>>>>>>>>> active > >>>>>>>>>>>>> during > >>>>>>>>>>>>>>> a superstep if it has changed its value. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> These two differences require a lot of rethinking > >>>> when > >>>>>>> porting > >>>>>>>>>>>>>>> applications > >>>>>>>>>>>>>>> and can easily cause bugs. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> The most important problem however is that we > >> require > >>>>> the > >>>>>>> user > >>>>>>>>> to > >>>>>>>>>>>> split > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>> computation in 2 phases (2 UDFs): > >>>>>>>>>>>>>>> - messaging: has access to the vertex state and can > >>>>>> produce > >>>>>>>>>> messages > >>>>>>>>>>>>>>> - update: has access to incoming messages and can > >>>> update > >>>>>> the > >>>>>>>>>> vertex > >>>>>>>>>>>>> value > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Pregel/Giraph only expose one UDF to the user: > >>>>>>>>>>>>>>> - compute: has access to both the vertex state and > >>>> the > >>>>>>>> incoming > >>>>>>>>>>>>> messages, > >>>>>>>>>>>>>>> can produce messages and update the vertex value. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> This might not seem like a big deal, but except > >> from > >>>>>> forcing > >>>>>>>> the > >>>>>>>>>>> user > >>>>>>>>>>>> to > >>>>>>>>>>>>>>> split their program logic into 2 phases, Spargel > >> also > >>>>>> makes > >>>>>>>> some > >>>>>>>>>>>> common > >>>>>>>>>>>>>>> computation patterns non-intuitive or impossible to > >>>>>> write. A > >>>>>>>>> very > >>>>>>>>>>>> simple > >>>>>>>>>>>>>>> example is propagating a message based on its value > >>>> or > >>>>>>> sender > >>>>>>>>> ID. > >>>>>>>>>> To > >>>>>>>>>>>> do > >>>>>>>>>>>>>>> this with Spargel, one has to store all the > >> incoming > >>>>>>> messages > >>>>>>>> in > >>>>>>>>>> the > >>>>>>>>>>>>>>> vertex > >>>>>>>>>>>>>>> value (might be of different type btw) during the > >>>>>> messaging > >>>>>>>>> phase, > >>>>>>>>>>> so > >>>>>>>>>>>>> that > >>>>>>>>>>>>>>> they can be accessed during the update phase. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> So, my first question is, when implementing > >> Spargel, > >>>>> were > >>>>>>>> other > >>>>>>>>>>>>>>> alternatives considered and maybe rejected in favor > >>>> of > >>>>>>>>> performance > >>>>>>>>>>> or > >>>>>>>>>>>>>>> because of some other reason? If someone knows, I > >>>> would > >>>>>> love > >>>>>>>> to > >>>>>>>>>> hear > >>>>>>>>>>>>> about > >>>>>>>>>>>>>>> them! > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Second, I wrote a prototype implementation [2] that > >>>> only > >>>>>>>> exposes > >>>>>>>>>> one > >>>>>>>>>>>>> UDF, > >>>>>>>>>>>>>>> compute(), by keeping the vertex state in the > >>>> solution > >>>>> set > >>>>>>> and > >>>>>>>>> the > >>>>>>>>>>>>>>> messages > >>>>>>>>>>>>>>> in the workset. This way all previously mentioned > >>>>>>> limitations > >>>>>>>> go > >>>>>>>>>>> away > >>>>>>>>>>>>> and > >>>>>>>>>>>>>>> the API (see "SSSPComputeFunction" in the example > >>>> [3]) > >>>>>>> looks a > >>>>>>>>> lot > >>>>>>>>>>>> more > >>>>>>>>>>>>>>> like Giraph (see [4]). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I have not run any experiments yet and the > >> prototype > >>>> has > >>>>>>> some > >>>>>>>>> ugly > >>>>>>>>>>>>> hacks, > >>>>>>>>>>>>>>> but if you think any of this makes sense, then I'd > >> be > >>>>>>> willing > >>>>>>>> to > >>>>>>>>>>>> follow > >>>>>>>>>>>>> up > >>>>>>>>>>>>>>> and try to optimize it. If we see that it performs > >>>> well, > >>>>>> we > >>>>>>>> can > >>>>>>>>>>>> consider > >>>>>>>>>>>>>>> either replacing Spargel or adding it as an > >>>> alternative. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for reading this long e-mail and looking > >>>> forward > >>>>> to > >>>>>>>> your > >>>>>>>>>>> input! > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>> -Vasia. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> [1]: > >>>> https://kowshik.github.io/JPregel/pregel_paper.pdf > >>>>>>>>>>>>>>> [2]: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >> > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > >>>>>>>>>>>>>>> [3]: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >> > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > >>>>>>>>>>>>>>> [4]: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >> > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >>> > >> > >