issue created: https://issues.apache.org/jira/browse/FLINK-3207

If anyone has any other suggestion about the renaming, let me know :)

-V.

On 5 January 2016 at 11:52, Aljoscha Krettek <aljos...@apache.org> wrote:

> Nice to hear. :D
>
> I think you can go ahead and add the Jira. About the renaming: I also
> think that it would make sense to do it.
> > On 04 Jan 2016, at 19:48, Vasiliki Kalavri <vasilikikala...@gmail.com>
> wrote:
> >
> > Hello squirrels and happy new year!
> >
> > I'm reviving this thread to share some results and discuss next steps.
> >
> > Using the Either type I was able to get rid of redundant messages and
> > vertex state. During the past few weeks, I have been running experiments,
> > which show that the performance of this "Pregel" model has improved a
> lot :)
> > In [1], you can see the speedup of GSA and Pregel over Spargel, for SSSP
> > and Connected Components (CC), for the Livejournal (68m edges), Orkut
> (117m
> > edges) and Wikipedia (340m edges) datasets.
> >
> > Regarding next steps, if no objections, I will open a Jira for adding a
> > Pregel iteration abstraction to Gelly. The Gelly guide has to be updated
> to
> > reflect the spectrum of iteration abstractions that we have discussed in
> > this thread, i.e. Pregel -> Spargel (Scatter-Gather) -> GSA.
> >
> > I think it might also be a good idea to do some renaming. Currently, we
> > call the Spargel iteration "vertex-centric", which fits better to the
> > Pregel abstraction. I propose we rename the spargel iteration into
> > "scatter-gather" or "signal-collect" (where it was first introduced [2]).
> > Any other ideas?
> >
> > Thanks,
> > -Vasia.
> >
> > [1]:
> >
> https://drive.google.com/file/d/0BzQJrI2eGlyYRTRjMkp1d3R6eVE/view?usp=sharing
> > [2]: http://link.springer.com/chapter/10.1007/978-3-642-17746-0_48
> >
> > On 11 November 2015 at 11:05, Stephan Ewen <se...@apache.org> wrote:
> >
> >> See: https://issues.apache.org/jira/browse/FLINK-3002
> >>
> >> On Wed, Nov 11, 2015 at 10:54 AM, Stephan Ewen <se...@apache.org>
> wrote:
> >>
> >>> "Either" an "Optional" types are quite useful.
> >>>
> >>> Let's add them to the core Java API.
> >>>
> >>> On Wed, Nov 11, 2015 at 10:00 AM, Vasiliki Kalavri <
> >>> vasilikikala...@gmail.com> wrote:
> >>>
> >>>> Thanks Fabian! I'll try that :)
> >>>>
> >>>> On 10 November 2015 at 22:31, Fabian Hueske <fhue...@gmail.com>
> wrote:
> >>>>
> >>>>> You could implement a Java Either type (similar to Scala's Either)
> >> that
> >>>>> either has a Message or the VertexState and a corresponding
> >>>> TypeInformation
> >>>>> and TypeSerializer that serializes a byte flag to indicate which both
> >>>> types
> >>>>> is used.
> >>>>> It might actually make sense, to add a generic Either type to the
> Java
> >>>> API
> >>>>> in general (similar to the Java Tuples with resemble the Scala
> >> Tuples).
> >>>>>
> >>>>> Cheers, Fabian
> >>>>>
> >>>>> 2015-11-10 22:16 GMT+01:00 Vasiliki Kalavri <
> >> vasilikikala...@gmail.com
> >>>>> :
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> after running a few experiments, I can confirm that putting the
> >>>> combiner
> >>>>>> after the flatMap is indeed more efficient.
> >>>>>>
> >>>>>> I ran SSSP and Connected Components with Spargel, GSA, and the
> >> Pregel
> >>>>> model
> >>>>>> and the results are the following:
> >>>>>>
> >>>>>> - for SSSP, Spargel is always the slowest, GSA is a ~1.2x faster and
> >>>>> Pregel
> >>>>>> is ~1.1x faster without combiner, ~1.3x faster with combiner.
> >>>>>> - for Connected Components, Spargel and GSA perform similarly, while
> >>>>> Pregel
> >>>>>> is 1.4-1.6x slower.
> >>>>>>
> >>>>>> To start with, this is much better than I expected :)
> >>>>>> However, there is a main shortcoming in my current implementation
> >> that
> >>>>>> negatively impacts performance:
> >>>>>> Since the compute function coGroup needs to output both new vertex
> >>>> values
> >>>>>> and new messages, I emit a wrapping tuple that contains both vertex
> >>>> state
> >>>>>> and messages and then filter them out based on a boolean field. The
> >>>>> problem
> >>>>>> is that since I cannot emit null fields, I emit a dummy message for
> >>>> each
> >>>>>> new vertex state and a dummy vertex state for each new message. That
> >>>>>> essentially means that the intermediate messages result is double in
> >>>>> size,
> >>>>>> if say the vertex values are of the same type as the messages (can
> >> be
> >>>>> worse
> >>>>>> if the vertex values are more complex).
> >>>>>> So my question is, is there a way to avoid this redundancy, by
> >> either
> >>>>>> emitting null fields or by creating an operator that could emit 2
> >>>>> different
> >>>>>> types of tuples?
> >>>>>>
> >>>>>> Thanks!
> >>>>>> -Vasia.
> >>>>>>
> >>>>>> On 9 November 2015 at 15:20, Fabian Hueske <fhue...@gmail.com>
> >> wrote:
> >>>>>>
> >>>>>>> Hi Vasia,
> >>>>>>>
> >>>>>>> sorry for the late reply.
> >>>>>>> I don't think there is a big difference. In both cases, the
> >>>>> partitioning
> >>>>>>> and sorting happens at the end of the iteration.
> >>>>>>> If the groupReduce is applied before the workset is returned, the
> >>>>> sorting
> >>>>>>> happens on the filtered result (after the flatMap) which might be
> >> a
> >>>>>> little
> >>>>>>> bit more efficient (depending on the ratio of messages and
> >> solution
> >>>> set
> >>>>>>> updates). Also it does not require that the initial workset is
> >>>> sorted
> >>>>> for
> >>>>>>> the first groupReduce.
> >>>>>>>
> >>>>>>> I would put it at the end.
> >>>>>>>
> >>>>>>> Cheers, Fabian
> >>>>>>>
> >>>>>>> 2015-11-05 17:19 GMT+01:00 Vasiliki Kalavri <
> >>>> vasilikikala...@gmail.com
> >>>>>> :
> >>>>>>>
> >>>>>>>> @Fabian
> >>>>>>>>
> >>>>>>>> Is there any advantage in putting the reducer-combiner before
> >>>>> updating
> >>>>>>> the
> >>>>>>>> workset vs. after (i.e. right before the join with the solution
> >>>> set)?
> >>>>>>>>
> >>>>>>>> If it helps, here are the plans of these 2 alternatives:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://drive.google.com/file/d/0BzQJrI2eGlyYcFV2RFo5dUFNXzg/view?usp=sharing
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://drive.google.com/file/d/0BzQJrI2eGlyYN014NXp6OEZUdGs/view?usp=sharing
> >>>>>>>>
> >>>>>>>> Thanks a lot for the help!
> >>>>>>>>
> >>>>>>>> -Vasia.
> >>>>>>>>
> >>>>>>>> On 30 October 2015 at 21:28, Fabian Hueske <fhue...@gmail.com>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> We can of course inject an optional ReduceFunction (or
> >>>> GroupReduce,
> >>>>>> or
> >>>>>>>>> combinable GroupReduce) to reduce the size of the work set.
> >>>>>>>>> I suggested to remove the GroupReduce function, because it did
> >>>> only
> >>>>>>>> collect
> >>>>>>>>> all messages into a single record by emitting the input
> >> iterator
> >>>>>> which
> >>>>>>> is
> >>>>>>>>> quite dangerous. Applying a combinable reduce function is
> >> could
> >>>>>> improve
> >>>>>>>> the
> >>>>>>>>> performance considerably.
> >>>>>>>>>
> >>>>>>>>> The good news is that it would come "for free" because the
> >>>>> necessary
> >>>>>>>>> partitioning and sorting can be reused (given the forwardField
> >>>>>>>> annotations
> >>>>>>>>> are correctly set):
> >>>>>>>>> - The partitioning of the reduce can be reused for the join
> >> with
> >>>>> the
> >>>>>>>>> solution set
> >>>>>>>>> - The sort of the reduce is preserved by the join with the
> >>>>> in-memory
> >>>>>>>>> hash-table of the solution set and can be reused for the
> >>>> coGroup.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Fabian
> >>>>>>>>>
> >>>>>>>>> 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <
> >>>>>> vasilikikala...@gmail.com
> >>>>>>>> :
> >>>>>>>>>
> >>>>>>>>>> Hi Fabian,
> >>>>>>>>>>
> >>>>>>>>>> thanks so much for looking into this so quickly :-)
> >>>>>>>>>>
> >>>>>>>>>> One update I have to make is that I tried running a few
> >>>>> experiments
> >>>>>>>> with
> >>>>>>>>>> this on a 6-node cluster. The current implementation gets
> >>>> stuck
> >>>>> at
> >>>>>>>>>> "Rebuilding Workset Properties" and never finishes a single
> >>>>>>> iteration.
> >>>>>>>>>> Running the plan of one superstep without a delta iteration
> >>>>>>> terminates
> >>>>>>>>>> fine. I didn't have access to the cluster today, so I
> >> couldn't
> >>>>>> debug
> >>>>>>>> this
> >>>>>>>>>> further, but I will do as soon as I have access again.
> >>>>>>>>>>
> >>>>>>>>>> The rest of my comments are inline:
> >>>>>>>>>>
> >>>>>>>>>> On 30 October 2015 at 17:53, Fabian Hueske <
> >> fhue...@gmail.com
> >>>>>
> >>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Vasia,
> >>>>>>>>>>>
> >>>>>>>>>>> I had a look at your new implementation and have a few
> >> ideas
> >>>>> for
> >>>>>>>>>>> improvements.
> >>>>>>>>>>> 1) Sending out the input iterator as you do in the last
> >>>>>> GroupReduce
> >>>>>>>> is
> >>>>>>>>>>> quite dangerous and does not give a benefit compared to
> >>>>>> collecting
> >>>>>>>> all
> >>>>>>>>>>> elements. Even though it is an iterator, it needs to be
> >>>>>> completely
> >>>>>>>>>>> materialized in-memory whenever the record is touched by
> >>>> Flink
> >>>>> or
> >>>>>>>> user
> >>>>>>>>>>> code.
> >>>>>>>>>>> I would propose to skip the reduce step completely and
> >>>> handle
> >>>>> all
> >>>>>>>>>> messages
> >>>>>>>>>>> separates and only collect them in the CoGroup function
> >>>> before
> >>>>>>> giving
> >>>>>>>>>> them
> >>>>>>>>>>> into the VertexComputeFunction. Be careful, to only do
> >> that
> >>>>> with
> >>>>>>>>>>> objectReuse disabled or take care to properly copy the
> >>>>> messages.
> >>>>>> If
> >>>>>>>> you
> >>>>>>>>>>> collect the messages in the CoGroup, you don't need the
> >>>>>>> GroupReduce,
> >>>>>>>>> have
> >>>>>>>>>>> smaller records and you can remove the MessageIterator
> >> class
> >>>>>>>>> completely.
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> ​I see. The idea was to expose to message combiner that user
> >>>>> could
> >>>>>>>>>> ​implement if the messages are combinable, e.g. min, sum.
> >> This
> >>>>> is a
> >>>>>>>>> common
> >>>>>>>>>> case and reduces the message load significantly. Is there a
> >>>> way I
> >>>>>>> could
> >>>>>>>>> do
> >>>>>>>>>> something similar before the coGroup?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> 2) Add this annotation to the AppendVertexState function:
> >>>>>>>>>>> @ForwardedFieldsFirst("*->f0"). This indicates that the
> >>>>> complete
> >>>>>>>>> element
> >>>>>>>>>> of
> >>>>>>>>>>> the first input becomes the first field of the output.
> >> Since
> >>>>> the
> >>>>>>>> input
> >>>>>>>>> is
> >>>>>>>>>>> partitioned on "f0" (it comes out of the partitioned
> >>>> solution
> >>>>>> set)
> >>>>>>>> the
> >>>>>>>>>>> result of ApplyVertexState will be partitioned on "f0.f0"
> >>>> which
> >>>>>> is
> >>>>>>>>>>> (accidentially :-D) the join key of the following coGroup
> >>>>>> function
> >>>>>>> ->
> >>>>>>>>> no
> >>>>>>>>>>> partitioning :-)
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> ​Great! I totally missed that ;)​
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> 3) Adding the two flatMap functions behind the CoGroup
> >>>> prevents
> >>>>>>>>> chaining
> >>>>>>>>>>> and causes therefore some serialization overhead but
> >>>> shouldn't
> >>>>> be
> >>>>>>> too
> >>>>>>>>>> bad.
> >>>>>>>>>>>
> >>>>>>>>>>> So in total I would make this program as follows:
> >>>>>>>>>>>
> >>>>>>>>>>> iVertices<K,VV>
> >>>>>>>>>>> iMessage<K, Message> = iVertices.map(new InitWorkSet());
> >>>>>>>>>>>
> >>>>>>>>>>> iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
> >>>>>>>>>>> verticesWithMessage<Vertex, Message> =
> >>>>> iteration.getSolutionSet()
> >>>>>>>>>>>  .join(iteration.workSet())
> >>>>>>>>>>>  .where(0) // solution set is local and build side
> >>>>>>>>>>>  .equalTo(0) // workset is shuffled and probe side of
> >>>> hashjoin
> >>>>>>>>>>> superstepComp<Vertex,Tuple2<K, Message>,Bool> =
> >>>>>>>>>>> verticesWithMessage.coGroup(edgessWithValue)
> >>>>>>>>>>>  .where("f0.f0") // vwm is locally forward and sorted
> >>>>>>>>>>>  .equalTo(0) //  edges are already partitioned and sorted
> >>>> (if
> >>>>>>> cached
> >>>>>>>>>>> correctly)
> >>>>>>>>>>>  .with(...) // The coGroup collects all messages in a
> >>>>> collection
> >>>>>>> and
> >>>>>>>>>> gives
> >>>>>>>>>>> it to the ComputeFunction
> >>>>>>>>>>> delta<Vertex> = superStepComp.flatMap(...) // partitioned
> >>>> when
> >>>>>>> merged
> >>>>>>>>>> into
> >>>>>>>>>>> solution set
> >>>>>>>>>>> workSet<K, Message> = superStepComp.flatMap(...) //
> >>>> partitioned
> >>>>>> for
> >>>>>>>>> join
> >>>>>>>>>>> iteration.closeWith(delta, workSet)
> >>>>>>>>>>>
> >>>>>>>>>>> So, if I am correct, the program will
> >>>>>>>>>>> - partition the workset
> >>>>>>>>>>> - sort the vertices with messages
> >>>>>>>>>>> - partition the delta
> >>>>>>>>>>>
> >>>>>>>>>>> One observation I have is that this program requires that
> >>>> all
> >>>>>>>> messages
> >>>>>>>>>> fit
> >>>>>>>>>>> into memory. Was that also the case before?
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> ​I believe not. The plan has one coGroup that produces the
> >>>>> messages
> >>>>>>>> and a
> >>>>>>>>>> following coGroup that groups by the messages "target ID"
> >> and
> >>>>>>> consumes
> >>>>>>>>>> them​ in an iterator. That doesn't require them to fit in
> >>>> memory,
> >>>>>>>> right?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> ​I'm also working on a version where the graph is
> >> represented
> >>>> as
> >>>>> an
> >>>>>>>>>> adjacency list, instead of two separate datasets of vertices
> >>>> and
> >>>>>>> edges.
> >>>>>>>>> The
> >>>>>>>>>> disadvantage is that the graph has to fit in memory, but I
> >>>> think
> >>>>>> the
> >>>>>>>>>> advantages are many​. We'll be able to support edge value
> >>>>> updates,
> >>>>>>> edge
> >>>>>>>>>> mutations and different edge access order guarantees. I'll
> >> get
> >>>>> back
> >>>>>>> to
> >>>>>>>>> this
> >>>>>>>>>> thread when I have a working prototype.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>> Fabian
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> ​Thanks again!
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> -Vasia.
> >>>>>>>>>> ​
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <
> >>>>>>>> vasilikikala...@gmail.com
> >>>>>>>>>> :
> >>>>>>>>>>>
> >>>>>>>>>>>> @Martin: thanks for your input! If you ran into any
> >> other
> >>>>>> issues
> >>>>>>>>> that I
> >>>>>>>>>>>> didn't mention, please let us know. Obviously, even with
> >>>> my
> >>>>>>>> proposal,
> >>>>>>>>>>> there
> >>>>>>>>>>>> are still features we cannot support, e.g. updating edge
> >>>>> values
> >>>>>>> and
> >>>>>>>>>> graph
> >>>>>>>>>>>> mutations. We'll need to re-think the underlying
> >> iteration
> >>>>>> and/or
> >>>>>>>>> graph
> >>>>>>>>>>>> representation for those.
> >>>>>>>>>>>>
> >>>>>>>>>>>> @Fabian: thanks a lot, no rush :)
> >>>>>>>>>>>> Let me give you some more information that might make it
> >>>>> easier
> >>>>>>> to
> >>>>>>>>>> reason
> >>>>>>>>>>>> about performance:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Currently, in Spargel the SolutionSet (SS) keeps the
> >>>> vertex
> >>>>>> state
> >>>>>>>> and
> >>>>>>>>>> the
> >>>>>>>>>>>> workset (WS) keeps the active vertices. The iteration is
> >>>>>> composed
> >>>>>>>> of
> >>>>>>>>> 2
> >>>>>>>>>>>> coGroups. The first one takes the WS and the edges and
> >>>>> produces
> >>>>>>>>>> messages.
> >>>>>>>>>>>> The second one takes the messages and the SS and
> >> produced
> >>>> the
> >>>>>> new
> >>>>>>>> WS
> >>>>>>>>>> and
> >>>>>>>>>>>> the SS-delta.
> >>>>>>>>>>>>
> >>>>>>>>>>>> In my proposal, the SS has the vertex state and the WS
> >> has
> >>>>>>>> <vertexId,
> >>>>>>>>>>>> MessageIterator> pairs, i.e. the inbox of each vertex.
> >> The
> >>>>> plan
> >>>>>>> is
> >>>>>>>>> more
> >>>>>>>>>>>> complicated because compute() needs to have two
> >> iterators:
> >>>>> over
> >>>>>>> the
> >>>>>>>>>> edges
> >>>>>>>>>>>> and over the messages.
> >>>>>>>>>>>> First, I join SS and WS to get the active vertices (have
> >>>>>>> received a
> >>>>>>>>>> msg)
> >>>>>>>>>>>> and their current state. Then I coGroup the result with
> >>>> the
> >>>>>> edges
> >>>>>>>> to
> >>>>>>>>>>> access
> >>>>>>>>>>>> the neighbors. Now the main problem is that this coGroup
> >>>>> needs
> >>>>>> to
> >>>>>>>>> have
> >>>>>>>>>> 2
> >>>>>>>>>>>> outputs: the new messages and the new vertex value. I
> >>>>> couldn't
> >>>>>>>> really
> >>>>>>>>>>> find
> >>>>>>>>>>>> a nice way to do this, so I'm emitting a Tuple that
> >>>> contains
> >>>>>> both
> >>>>>>>>> types
> >>>>>>>>>>> and
> >>>>>>>>>>>> I have a flag to separate them later with 2 flatMaps.
> >> From
> >>>>> the
> >>>>>>>> vertex
> >>>>>>>>>>>> flatMap, I crete the SS-delta and from the messaged
> >>>> flatMap I
> >>>>>>>> apply a
> >>>>>>>>>>>> reduce to group the messages by vertex and send them to
> >>>> the
> >>>>> new
> >>>>>>> WS.
> >>>>>>>>> One
> >>>>>>>>>>>> optimization would be to expose a combiner here to
> >> reduce
> >>>>>> message
> >>>>>>>>> size.
> >>>>>>>>>>>>
> >>>>>>>>>>>> tl;dr:
> >>>>>>>>>>>> 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
> >>>>>>>>>>>> 2. how can we efficiently emit 2 different types of
> >>>> records
> >>>>>> from
> >>>>>>> a
> >>>>>>>>>>> coGroup?
> >>>>>>>>>>>> 3. does it make any difference if we group/combine the
> >>>>> messages
> >>>>>>>>> before
> >>>>>>>>>>>> updating the workset or after?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers,
> >>>>>>>>>>>> -Vasia.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 27 October 2015 at 18:39, Fabian Hueske <
> >>>>> fhue...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> I'll try to have a look at the proposal from a
> >>>> performance
> >>>>>>> point
> >>>>>>>> of
> >>>>>>>>>>> view
> >>>>>>>>>>>> in
> >>>>>>>>>>>>> the next days.
> >>>>>>>>>>>>> Please ping me, if I don't follow up this thread.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers, Fabian
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-10-27 18:28 GMT+01:00 Martin Junghanns <
> >>>>>>>>> m.jungha...@mailbox.org
> >>>>>>>>>>> :
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> At our group, we also moved several algorithms from
> >>>>> Giraph
> >>>>>> to
> >>>>>>>>> Gelly
> >>>>>>>>>>> and
> >>>>>>>>>>>>>> ran into some confusing issues (first in
> >>>> understanding,
> >>>>>>> second
> >>>>>>>>>> during
> >>>>>>>>>>>>>> implementation) caused by the conceptional
> >> differences
> >>>>> you
> >>>>>>>>>> described.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> If there are no concrete advantages (performance
> >>>> mainly)
> >>>>> in
> >>>>>>> the
> >>>>>>>>>>> Spargel
> >>>>>>>>>>>>>> implementation, we would be very happy to see the
> >>>> Gelly
> >>>>> API
> >>>>>>> be
> >>>>>>>>>>> aligned
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>> Pregel-like systems.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Your SSSP example speaks for itself.
> >> Straightforward,
> >>>> if
> >>>>>> the
> >>>>>>>>> reader
> >>>>>>>>>>> is
> >>>>>>>>>>>>>> familiar with Pregel/Giraph/...
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>> Martin
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hello squirrels,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I want to discuss with you a few concerns I have
> >>>> about
> >>>>> our
> >>>>>>>>> current
> >>>>>>>>>>>>>>> vertex-centric model implementation, Spargel, now
> >>>> fully
> >>>>>>>> subsumed
> >>>>>>>>>> by
> >>>>>>>>>>>>> Gelly.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Spargel is our implementation of Pregel [1], but it
> >>>>>> violates
> >>>>>>>>> some
> >>>>>>>>>>>>>>> fundamental properties of the model, as described
> >> in
> >>>> the
> >>>>>>> paper
> >>>>>>>>> and
> >>>>>>>>>>> as
> >>>>>>>>>>>>>>> implemented in e.g. Giraph, GPS, Hama. I often find
> >>>>> myself
> >>>>>>>>>> confused
> >>>>>>>>>>>> both
> >>>>>>>>>>>>>>> when trying to explain it to current Giraph users
> >> and
> >>>>> when
> >>>>>>>>> porting
> >>>>>>>>>>> my
> >>>>>>>>>>>>>>> Giraph algorithms to it.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> More specifically:
> >>>>>>>>>>>>>>> - in the Pregel model, messages produced in
> >>>> superstep n,
> >>>>>> are
> >>>>>>>>>>> received
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>> superstep n+1. In Spargel, they are produced and
> >>>>> consumed
> >>>>>> in
> >>>>>>>> the
> >>>>>>>>>>> same
> >>>>>>>>>>>>>>> iteration.
> >>>>>>>>>>>>>>> - in Pregel, vertices are active during a
> >> superstep,
> >>>> if
> >>>>>> they
> >>>>>>>>> have
> >>>>>>>>>>>>> received
> >>>>>>>>>>>>>>> a message in the previous superstep. In Spargel, a
> >>>>> vertex
> >>>>>> is
> >>>>>>>>>> active
> >>>>>>>>>>>>> during
> >>>>>>>>>>>>>>> a superstep if it has changed its value.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> These two differences require a lot of rethinking
> >>>> when
> >>>>>>> porting
> >>>>>>>>>>>>>>> applications
> >>>>>>>>>>>>>>> and can easily cause bugs.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> The most important problem however is that we
> >> require
> >>>>> the
> >>>>>>> user
> >>>>>>>>> to
> >>>>>>>>>>>> split
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> computation in 2 phases (2 UDFs):
> >>>>>>>>>>>>>>> - messaging: has access to the vertex state and can
> >>>>>> produce
> >>>>>>>>>> messages
> >>>>>>>>>>>>>>> - update: has access to incoming messages and can
> >>>> update
> >>>>>> the
> >>>>>>>>>> vertex
> >>>>>>>>>>>>> value
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Pregel/Giraph only expose one UDF to the user:
> >>>>>>>>>>>>>>> - compute: has access to both the vertex state and
> >>>> the
> >>>>>>>> incoming
> >>>>>>>>>>>>> messages,
> >>>>>>>>>>>>>>> can produce messages and update the vertex value.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> This might not seem like a big deal, but except
> >> from
> >>>>>> forcing
> >>>>>>>> the
> >>>>>>>>>>> user
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>> split their program logic into 2 phases, Spargel
> >> also
> >>>>>> makes
> >>>>>>>> some
> >>>>>>>>>>>> common
> >>>>>>>>>>>>>>> computation patterns non-intuitive or impossible to
> >>>>>> write. A
> >>>>>>>>> very
> >>>>>>>>>>>> simple
> >>>>>>>>>>>>>>> example is propagating a message based on its value
> >>>> or
> >>>>>>> sender
> >>>>>>>>> ID.
> >>>>>>>>>> To
> >>>>>>>>>>>> do
> >>>>>>>>>>>>>>> this with Spargel, one has to store all the
> >> incoming
> >>>>>>> messages
> >>>>>>>> in
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> vertex
> >>>>>>>>>>>>>>> value (might be of different type btw) during the
> >>>>>> messaging
> >>>>>>>>> phase,
> >>>>>>>>>>> so
> >>>>>>>>>>>>> that
> >>>>>>>>>>>>>>> they can be accessed during the update phase.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> So, my first question is, when implementing
> >> Spargel,
> >>>>> were
> >>>>>>>> other
> >>>>>>>>>>>>>>> alternatives considered and maybe rejected in favor
> >>>> of
> >>>>>>>>> performance
> >>>>>>>>>>> or
> >>>>>>>>>>>>>>> because of some other reason? If someone knows, I
> >>>> would
> >>>>>> love
> >>>>>>>> to
> >>>>>>>>>> hear
> >>>>>>>>>>>>> about
> >>>>>>>>>>>>>>> them!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Second, I wrote a prototype implementation [2] that
> >>>> only
> >>>>>>>> exposes
> >>>>>>>>>> one
> >>>>>>>>>>>>> UDF,
> >>>>>>>>>>>>>>> compute(), by keeping the vertex state in the
> >>>> solution
> >>>>> set
> >>>>>>> and
> >>>>>>>>> the
> >>>>>>>>>>>>>>> messages
> >>>>>>>>>>>>>>> in the workset. This way all previously mentioned
> >>>>>>> limitations
> >>>>>>>> go
> >>>>>>>>>>> away
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>>> the API (see "SSSPComputeFunction" in the example
> >>>> [3])
> >>>>>>> looks a
> >>>>>>>>> lot
> >>>>>>>>>>>> more
> >>>>>>>>>>>>>>> like Giraph (see [4]).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I have not run any experiments yet and the
> >> prototype
> >>>> has
> >>>>>>> some
> >>>>>>>>> ugly
> >>>>>>>>>>>>> hacks,
> >>>>>>>>>>>>>>> but if you think any of this makes sense, then I'd
> >> be
> >>>>>>> willing
> >>>>>>>> to
> >>>>>>>>>>>> follow
> >>>>>>>>>>>>> up
> >>>>>>>>>>>>>>> and try to optimize it. If we see that it performs
> >>>> well,
> >>>>>> we
> >>>>>>>> can
> >>>>>>>>>>>> consider
> >>>>>>>>>>>>>>> either replacing Spargel or adding it as an
> >>>> alternative.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for reading this long e-mail and looking
> >>>> forward
> >>>>> to
> >>>>>>>> your
> >>>>>>>>>>> input!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>> -Vasia.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> [1]:
> >>>> https://kowshik.github.io/JPregel/pregel_paper.pdf
> >>>>>>>>>>>>>>> [2]:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> >>>>>>>>>>>>>>> [3]:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> >>>>>>>>>>>>>>> [4]:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>
>
>

Reply via email to