"Either" an "Optional" types are quite useful. Let's add them to the core Java API.
On Wed, Nov 11, 2015 at 10:00 AM, Vasiliki Kalavri < vasilikikala...@gmail.com> wrote: > Thanks Fabian! I'll try that :) > > On 10 November 2015 at 22:31, Fabian Hueske <fhue...@gmail.com> wrote: > > > You could implement a Java Either type (similar to Scala's Either) that > > either has a Message or the VertexState and a corresponding > TypeInformation > > and TypeSerializer that serializes a byte flag to indicate which both > types > > is used. > > It might actually make sense, to add a generic Either type to the Java > API > > in general (similar to the Java Tuples with resemble the Scala Tuples). > > > > Cheers, Fabian > > > > 2015-11-10 22:16 GMT+01:00 Vasiliki Kalavri <vasilikikala...@gmail.com>: > > > > > Hi, > > > > > > after running a few experiments, I can confirm that putting the > combiner > > > after the flatMap is indeed more efficient. > > > > > > I ran SSSP and Connected Components with Spargel, GSA, and the Pregel > > model > > > and the results are the following: > > > > > > - for SSSP, Spargel is always the slowest, GSA is a ~1.2x faster and > > Pregel > > > is ~1.1x faster without combiner, ~1.3x faster with combiner. > > > - for Connected Components, Spargel and GSA perform similarly, while > > Pregel > > > is 1.4-1.6x slower. > > > > > > To start with, this is much better than I expected :) > > > However, there is a main shortcoming in my current implementation that > > > negatively impacts performance: > > > Since the compute function coGroup needs to output both new vertex > values > > > and new messages, I emit a wrapping tuple that contains both vertex > state > > > and messages and then filter them out based on a boolean field. The > > problem > > > is that since I cannot emit null fields, I emit a dummy message for > each > > > new vertex state and a dummy vertex state for each new message. That > > > essentially means that the intermediate messages result is double in > > size, > > > if say the vertex values are of the same type as the messages (can be > > worse > > > if the vertex values are more complex). > > > So my question is, is there a way to avoid this redundancy, by either > > > emitting null fields or by creating an operator that could emit 2 > > different > > > types of tuples? > > > > > > Thanks! > > > -Vasia. > > > > > > On 9 November 2015 at 15:20, Fabian Hueske <fhue...@gmail.com> wrote: > > > > > > > Hi Vasia, > > > > > > > > sorry for the late reply. > > > > I don't think there is a big difference. In both cases, the > > partitioning > > > > and sorting happens at the end of the iteration. > > > > If the groupReduce is applied before the workset is returned, the > > sorting > > > > happens on the filtered result (after the flatMap) which might be a > > > little > > > > bit more efficient (depending on the ratio of messages and solution > set > > > > updates). Also it does not require that the initial workset is sorted > > for > > > > the first groupReduce. > > > > > > > > I would put it at the end. > > > > > > > > Cheers, Fabian > > > > > > > > 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri < > vasilikikala...@gmail.com > > >: > > > > > > > > > @Fabian > > > > > > > > > > Is there any advantage in putting the reducer-combiner before > > updating > > > > the > > > > > workset vs. after (i.e. right before the join with the solution > set)? > > > > > > > > > > If it helps, here are the plans of these 2 alternatives: > > > > > > > > > > > > > > > > > > > > > > > > > https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing > > > > > > > > > > > > > > > > > > > > https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing > > > > > > > > > > Thanks a lot for the help! > > > > > > > > > > -Vasia. > > > > > > > > > > On 30 October 2015 at 21:28, Fabian Hueske <fhue...@gmail.com> > > wrote: > > > > > > > > > > > We can of course inject an optional ReduceFunction (or > GroupReduce, > > > or > > > > > > combinable GroupReduce) to reduce the size of the work set. > > > > > > I suggested to remove the GroupReduce function, because it did > only > > > > > collect > > > > > > all messages into a single record by emitting the input iterator > > > which > > > > is > > > > > > quite dangerous. Applying a combinable reduce function is could > > > improve > > > > > the > > > > > > performance considerably. > > > > > > > > > > > > The good news is that it would come "for free" because the > > necessary > > > > > > partitioning and sorting can be reused (given the forwardField > > > > > annotations > > > > > > are correctly set): > > > > > > - The partitioning of the reduce can be reused for the join with > > the > > > > > > solution set > > > > > > - The sort of the reduce is preserved by the join with the > > in-memory > > > > > > hash-table of the solution set and can be reused for the coGroup. > > > > > > > > > > > > Best, > > > > > > Fabian > > > > > > > > > > > > 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri < > > > vasilikikala...@gmail.com > > > > >: > > > > > > > > > > > > > Hi Fabian, > > > > > > > > > > > > > > thanks so much for looking into this so quickly :-) > > > > > > > > > > > > > > One update I have to make is that I tried running a few > > experiments > > > > > with > > > > > > > this on a 6-node cluster. The current implementation gets stuck > > at > > > > > > > "Rebuilding Workset Properties" and never finishes a single > > > > iteration. > > > > > > > Running the plan of one superstep without a delta iteration > > > > terminates > > > > > > > fine. I didn't have access to the cluster today, so I couldn't > > > debug > > > > > this > > > > > > > further, but I will do as soon as I have access again. > > > > > > > > > > > > > > The rest of my comments are inline: > > > > > > > > > > > > > > On 30 October 2015 at 17:53, Fabian Hueske <fhue...@gmail.com> > > > > wrote: > > > > > > > > > > > > > > > Hi Vasia, > > > > > > > > > > > > > > > > I had a look at your new implementation and have a few ideas > > for > > > > > > > > improvements. > > > > > > > > 1) Sending out the input iterator as you do in the last > > > GroupReduce > > > > > is > > > > > > > > quite dangerous and does not give a benefit compared to > > > collecting > > > > > all > > > > > > > > elements. Even though it is an iterator, it needs to be > > > completely > > > > > > > > materialized in-memory whenever the record is touched by > Flink > > or > > > > > user > > > > > > > > code. > > > > > > > > I would propose to skip the reduce step completely and handle > > all > > > > > > > messages > > > > > > > > separates and only collect them in the CoGroup function > before > > > > giving > > > > > > > them > > > > > > > > into the VertexComputeFunction. Be careful, to only do that > > with > > > > > > > > objectReuse disabled or take care to properly copy the > > messages. > > > If > > > > > you > > > > > > > > collect the messages in the CoGroup, you don't need the > > > > GroupReduce, > > > > > > have > > > > > > > > smaller records and you can remove the MessageIterator class > > > > > > completely. > > > > > > > > > > > > > > > > > > > > > > I see. The idea was to expose to message combiner that user > > could > > > > > > > implement if the messages are combinable, e.g. min, sum. This > > is a > > > > > > common > > > > > > > case and reduces the message load significantly. Is there a > way I > > > > could > > > > > > do > > > > > > > something similar before the coGroup? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2) Add this annotation to the AppendVertexState function: > > > > > > > > @ForwardedFieldsFirst("*->f0"). This indicates that the > > complete > > > > > > element > > > > > > > of > > > > > > > > the first input becomes the first field of the output. Since > > the > > > > > input > > > > > > is > > > > > > > > partitioned on "f0" (it comes out of the partitioned solution > > > set) > > > > > the > > > > > > > > result of ApplyVertexState will be partitioned on "f0.f0" > which > > > is > > > > > > > > (accidentially :-D) the join key of the following coGroup > > > function > > > > -> > > > > > > no > > > > > > > > partitioning :-) > > > > > > > > > > > > > > > > > > > > > > Great! I totally missed that ;) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3) Adding the two flatMap functions behind the CoGroup > prevents > > > > > > chaining > > > > > > > > and causes therefore some serialization overhead but > shouldn't > > be > > > > too > > > > > > > bad. > > > > > > > > > > > > > > > > So in total I would make this program as follows: > > > > > > > > > > > > > > > > iVertices<K,VV> > > > > > > > > iMessage<K, Message> = iVertices.map(new InitWorkSet()); > > > > > > > > > > > > > > > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0) > > > > > > > > verticesWithMessage<Vertex, Message> = > > iteration.getSolutionSet() > > > > > > > > .join(iteration.workSet()) > > > > > > > > .where(0) // solution set is local and build side > > > > > > > > .equalTo(0) // workset is shuffled and probe side of > hashjoin > > > > > > > > superstepComp<Vertex,Tuple2<K, Message>,Bool> = > > > > > > > > verticesWithMessage.coGroup(edgessWithValue) > > > > > > > > .where("f0.f0") // vwm is locally forward and sorted > > > > > > > > .equalTo(0) // edges are already partitioned and sorted > (if > > > > cached > > > > > > > > correctly) > > > > > > > > .with(...) // The coGroup collects all messages in a > > collection > > > > and > > > > > > > gives > > > > > > > > it to the ComputeFunction > > > > > > > > delta<Vertex> = superStepComp.flatMap(...) // partitioned > when > > > > merged > > > > > > > into > > > > > > > > solution set > > > > > > > > workSet<K, Message> = superStepComp.flatMap(...) // > partitioned > > > for > > > > > > join > > > > > > > > iteration.closeWith(delta, workSet) > > > > > > > > > > > > > > > > So, if I am correct, the program will > > > > > > > > - partition the workset > > > > > > > > - sort the vertices with messages > > > > > > > > - partition the delta > > > > > > > > > > > > > > > > One observation I have is that this program requires that all > > > > > messages > > > > > > > fit > > > > > > > > into memory. Was that also the case before? > > > > > > > > > > > > > > > > > > > > > > I believe not. The plan has one coGroup that produces the > > messages > > > > > and a > > > > > > > following coGroup that groups by the messages "target ID" and > > > > consumes > > > > > > > them in an iterator. That doesn't require them to fit in > memory, > > > > > right? > > > > > > > > > > > > > > > > > > > > > I'm also working on a version where the graph is represented > as > > an > > > > > > > adjacency list, instead of two separate datasets of vertices > and > > > > edges. > > > > > > The > > > > > > > disadvantage is that the graph has to fit in memory, but I > think > > > the > > > > > > > advantages are many. We'll be able to support edge value > > updates, > > > > edge > > > > > > > mutations and different edge access order guarantees. I'll get > > back > > > > to > > > > > > this > > > > > > > thread when I have a working prototype. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > Fabian > > > > > > > > > > > > > > > > > > > > > > Thanks again! > > > > > > > > > > > > > > Cheers, > > > > > > > -Vasia. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri < > > > > > vasilikikala...@gmail.com > > > > > > >: > > > > > > > > > > > > > > > > > @Martin: thanks for your input! If you ran into any other > > > issues > > > > > > that I > > > > > > > > > didn't mention, please let us know. Obviously, even with my > > > > > proposal, > > > > > > > > there > > > > > > > > > are still features we cannot support, e.g. updating edge > > values > > > > and > > > > > > > graph > > > > > > > > > mutations. We'll need to re-think the underlying iteration > > > and/or > > > > > > graph > > > > > > > > > representation for those. > > > > > > > > > > > > > > > > > > @Fabian: thanks a lot, no rush :) > > > > > > > > > Let me give you some more information that might make it > > easier > > > > to > > > > > > > reason > > > > > > > > > about performance: > > > > > > > > > > > > > > > > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex > > > state > > > > > and > > > > > > > the > > > > > > > > > workset (WS) keeps the active vertices. The iteration is > > > composed > > > > > of > > > > > > 2 > > > > > > > > > coGroups. The first one takes the WS and the edges and > > produces > > > > > > > messages. > > > > > > > > > The second one takes the messages and the SS and produced > the > > > new > > > > > WS > > > > > > > and > > > > > > > > > the SS-delta. > > > > > > > > > > > > > > > > > > In my proposal, the SS has the vertex state and the WS has > > > > > <vertexId, > > > > > > > > > MessageIterator> pairs, i.e. the inbox of each vertex. The > > plan > > > > is > > > > > > more > > > > > > > > > complicated because compute() needs to have two iterators: > > over > > > > the > > > > > > > edges > > > > > > > > > and over the messages. > > > > > > > > > First, I join SS and WS to get the active vertices (have > > > > received a > > > > > > > msg) > > > > > > > > > and their current state. Then I coGroup the result with the > > > edges > > > > > to > > > > > > > > access > > > > > > > > > the neighbors. Now the main problem is that this coGroup > > needs > > > to > > > > > > have > > > > > > > 2 > > > > > > > > > outputs: the new messages and the new vertex value. I > > couldn't > > > > > really > > > > > > > > find > > > > > > > > > a nice way to do this, so I'm emitting a Tuple that > contains > > > both > > > > > > types > > > > > > > > and > > > > > > > > > I have a flag to separate them later with 2 flatMaps. From > > the > > > > > vertex > > > > > > > > > flatMap, I crete the SS-delta and from the messaged > flatMap I > > > > > apply a > > > > > > > > > reduce to group the messages by vertex and send them to the > > new > > > > WS. > > > > > > One > > > > > > > > > optimization would be to expose a combiner here to reduce > > > message > > > > > > size. > > > > > > > > > > > > > > > > > > tl;dr: > > > > > > > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce > > > > > > > > > 2. how can we efficiently emit 2 different types of records > > > from > > > > a > > > > > > > > coGroup? > > > > > > > > > 3. does it make any difference if we group/combine the > > messages > > > > > > before > > > > > > > > > updating the workset or after? > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > -Vasia. > > > > > > > > > > > > > > > > > > > > > > > > > > > On 27 October 2015 at 18:39, Fabian Hueske < > > fhue...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > I'll try to have a look at the proposal from a > performance > > > > point > > > > > of > > > > > > > > view > > > > > > > > > in > > > > > > > > > > the next days. > > > > > > > > > > Please ping me, if I don't follow up this thread. > > > > > > > > > > > > > > > > > > > > Cheers, Fabian > > > > > > > > > > > > > > > > > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns < > > > > > > m.jungha...@mailbox.org > > > > > > > >: > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > At our group, we also moved several algorithms from > > Giraph > > > to > > > > > > Gelly > > > > > > > > and > > > > > > > > > > > ran into some confusing issues (first in understanding, > > > > second > > > > > > > during > > > > > > > > > > > implementation) caused by the conceptional differences > > you > > > > > > > described. > > > > > > > > > > > > > > > > > > > > > > If there are no concrete advantages (performance > mainly) > > in > > > > the > > > > > > > > Spargel > > > > > > > > > > > implementation, we would be very happy to see the Gelly > > API > > > > be > > > > > > > > aligned > > > > > > > > > to > > > > > > > > > > > Pregel-like systems. > > > > > > > > > > > > > > > > > > > > > > Your SSSP example speaks for itself. Straightforward, > if > > > the > > > > > > reader > > > > > > > > is > > > > > > > > > > > familiar with Pregel/Giraph/... > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > Martin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote: > > > > > > > > > > > > > > > > > > > > > >> Hello squirrels, > > > > > > > > > > >> > > > > > > > > > > >> I want to discuss with you a few concerns I have about > > our > > > > > > current > > > > > > > > > > >> vertex-centric model implementation, Spargel, now > fully > > > > > subsumed > > > > > > > by > > > > > > > > > > Gelly. > > > > > > > > > > >> > > > > > > > > > > >> Spargel is our implementation of Pregel [1], but it > > > violates > > > > > > some > > > > > > > > > > >> fundamental properties of the model, as described in > the > > > > paper > > > > > > and > > > > > > > > as > > > > > > > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find > > myself > > > > > > > confused > > > > > > > > > both > > > > > > > > > > >> when trying to explain it to current Giraph users and > > when > > > > > > porting > > > > > > > > my > > > > > > > > > > >> Giraph algorithms to it. > > > > > > > > > > >> > > > > > > > > > > >> More specifically: > > > > > > > > > > >> - in the Pregel model, messages produced in superstep > n, > > > are > > > > > > > > received > > > > > > > > > in > > > > > > > > > > >> superstep n+1. In Spargel, they are produced and > > consumed > > > in > > > > > the > > > > > > > > same > > > > > > > > > > >> iteration. > > > > > > > > > > >> - in Pregel, vertices are active during a superstep, > if > > > they > > > > > > have > > > > > > > > > > received > > > > > > > > > > >> a message in the previous superstep. In Spargel, a > > vertex > > > is > > > > > > > active > > > > > > > > > > during > > > > > > > > > > >> a superstep if it has changed its value. > > > > > > > > > > >> > > > > > > > > > > >> These two differences require a lot of rethinking when > > > > porting > > > > > > > > > > >> applications > > > > > > > > > > >> and can easily cause bugs. > > > > > > > > > > >> > > > > > > > > > > >> The most important problem however is that we require > > the > > > > user > > > > > > to > > > > > > > > > split > > > > > > > > > > >> the > > > > > > > > > > >> computation in 2 phases (2 UDFs): > > > > > > > > > > >> - messaging: has access to the vertex state and can > > > produce > > > > > > > messages > > > > > > > > > > >> - update: has access to incoming messages and can > update > > > the > > > > > > > vertex > > > > > > > > > > value > > > > > > > > > > >> > > > > > > > > > > >> Pregel/Giraph only expose one UDF to the user: > > > > > > > > > > >> - compute: has access to both the vertex state and the > > > > > incoming > > > > > > > > > > messages, > > > > > > > > > > >> can produce messages and update the vertex value. > > > > > > > > > > >> > > > > > > > > > > >> This might not seem like a big deal, but except from > > > forcing > > > > > the > > > > > > > > user > > > > > > > > > to > > > > > > > > > > >> split their program logic into 2 phases, Spargel also > > > makes > > > > > some > > > > > > > > > common > > > > > > > > > > >> computation patterns non-intuitive or impossible to > > > write. A > > > > > > very > > > > > > > > > simple > > > > > > > > > > >> example is propagating a message based on its value or > > > > sender > > > > > > ID. > > > > > > > To > > > > > > > > > do > > > > > > > > > > >> this with Spargel, one has to store all the incoming > > > > messages > > > > > in > > > > > > > the > > > > > > > > > > >> vertex > > > > > > > > > > >> value (might be of different type btw) during the > > > messaging > > > > > > phase, > > > > > > > > so > > > > > > > > > > that > > > > > > > > > > >> they can be accessed during the update phase. > > > > > > > > > > >> > > > > > > > > > > >> So, my first question is, when implementing Spargel, > > were > > > > > other > > > > > > > > > > >> alternatives considered and maybe rejected in favor of > > > > > > performance > > > > > > > > or > > > > > > > > > > >> because of some other reason? If someone knows, I > would > > > love > > > > > to > > > > > > > hear > > > > > > > > > > about > > > > > > > > > > >> them! > > > > > > > > > > >> > > > > > > > > > > >> Second, I wrote a prototype implementation [2] that > only > > > > > exposes > > > > > > > one > > > > > > > > > > UDF, > > > > > > > > > > >> compute(), by keeping the vertex state in the solution > > set > > > > and > > > > > > the > > > > > > > > > > >> messages > > > > > > > > > > >> in the workset. This way all previously mentioned > > > > limitations > > > > > go > > > > > > > > away > > > > > > > > > > and > > > > > > > > > > >> the API (see "SSSPComputeFunction" in the example [3]) > > > > looks a > > > > > > lot > > > > > > > > > more > > > > > > > > > > >> like Giraph (see [4]). > > > > > > > > > > >> > > > > > > > > > > >> I have not run any experiments yet and the prototype > has > > > > some > > > > > > ugly > > > > > > > > > > hacks, > > > > > > > > > > >> but if you think any of this makes sense, then I'd be > > > > willing > > > > > to > > > > > > > > > follow > > > > > > > > > > up > > > > > > > > > > >> and try to optimize it. If we see that it performs > well, > > > we > > > > > can > > > > > > > > > consider > > > > > > > > > > >> either replacing Spargel or adding it as an > alternative. > > > > > > > > > > >> > > > > > > > > > > >> Thanks for reading this long e-mail and looking > forward > > to > > > > > your > > > > > > > > input! > > > > > > > > > > >> > > > > > > > > > > >> Cheers, > > > > > > > > > > >> -Vasia. > > > > > > > > > > >> > > > > > > > > > > >> [1]: > https://kowshik.github.io/JPregel/pregel_paper.pdf > > > > > > > > > > >> [2]: > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew > > > > > > > > > > >> [3]: > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java > > > > > > > > > > >> [4]: > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >