Thanks for the detailed explanation Stephan!
Seeing Spargel as a Gather-Scatter model makes much more sense :)

I think we should be more careful not to present it as a "Pregel
equivalent" to avoid confusion of users coming from systems like Giraph.
Maybe I could put together a comparison table Pregel-Spargel-GSA to make
clear what each model can support and what are the (dis-)advantages.

I wouldn't kill Spargel either, but I would like to add to Gelly a more
generic iteration model that could support algorithms beyond simplistic
graph propagation. Pregel could be a start, even though it also has its
issues.

Having to return 2 different types and the size of the workset are the two
problems I also faced, when trying to map Pregel to a Flink plan. Also, I
am mostly convinced that representing the Graph as two separate datasets of
vertices and edges is not optimal for the Pregel model. I'll share my
findings as soon as I have an adjacency list version running.

Thanks again!

Cheers,
-Vasia.

On 3 November 2015 at 12:59, Martin Neumann <mneum...@sics.se> wrote:

> I tried out Spargel during my work with Spotify and have implemented
> several algorithms using it. In all implementations I ended up storing
> additional Data and Flags on the Vertex to carry them over from one UDF to
> the next one. It definitely makes the code harder to write and maintain.
>
> I wonder how much overhead these additional constructs cost in computation
> and memory consumption. Maybe going for a less optimized 1 UDF version will
> be not so much of a performance hit for most applications.
>
>
>
> On Tue, Nov 3, 2015 at 8:43 AM, Stephan Ewen <se...@apache.org> wrote:
>
> > Actually GAS was not known when we did the iterations work (and Spargel),
> > but the intuition that led to Spargel is similar then the intuition that
> > led to GAS.
> >
> > On Mon, Nov 2, 2015 at 4:35 PM, Stephan Ewen <se...@apache.org> wrote:
> >
> > > When creating the original version of Spargel I was pretty much
> thinking
> > > in GSA terms, more than in Pregel terms. There are some fundamental
> > > differences between Spargel and Pregel. Spargel is in between GAS and
> > > Pregel in some way, that is how I have always thought about it.
> > >
> > > The main reason for the form is that it fits the dataflow paradigm
> > easier:
> > >
> > >   - If one function emits the new state of the vertex and the messages,
> > it
> > > has two different return types, which means you need a union type and
> > > filer/split type of operation on the result, which also adds overhead.
> In
> > > the current model, each function has one return type, which makes it
> > easy.
> > >
> > >  - The workset is also the feedback channel, which is materialized at
> the
> > > superstep boundaries, so keeping it small at O(vertices), rather than
> > > O(edges) is a win for performance.
> > >
> > > There is no reason to not add a Pregel model, but I would not kill
> > Spargel
> > > for it. It will be tough to get the Pregel variant to the same
> > efficiency.
> > > Unless you want to say, for efficiency, go with GSA, for convenience
> with
> > > Pregel.
> > >
> > > There are some nice things about the Spargel model. The fact that
> > messages
> > > are first generated then consumes makes the generation of initial
> > messages
> > > simpler in many cases, I think. It was always a bit weird to me in
> Pregel
> > > that you had to check whether you are in superstep one, in which case
> you
> > > would expect no message, and generate initial value messages.
> > >
> > >
> > >
> > > On Fri, Oct 30, 2015 at 1:28 PM, Fabian Hueske <fhue...@gmail.com>
> > wrote:
> > >
> > >> We can of course inject an optional ReduceFunction (or GroupReduce, or
> > >> combinable GroupReduce) to reduce the size of the work set.
> > >> I suggested to remove the GroupReduce function, because it did only
> > >> collect
> > >> all messages into a single record by emitting the input iterator which
> > is
> > >> quite dangerous. Applying a combinable reduce function is could
> improve
> > >> the
> > >> performance considerably.
> > >>
> > >> The good news is that it would come "for free" because the necessary
> > >> partitioning and sorting can be reused (given the forwardField
> > annotations
> > >> are correctly set):
> > >> - The partitioning of the reduce can be reused for the join with the
> > >> solution set
> > >> - The sort of the reduce is preserved by the join with the in-memory
> > >> hash-table of the solution set and can be reused for the coGroup.
> > >>
> > >> Best,
> > >> Fabian
> > >>
> > >> 2015-10-30 18:38 GMT+01:00 Vasiliki Kalavri <
> vasilikikala...@gmail.com
> > >:
> > >>
> > >> > Hi Fabian,
> > >> >
> > >> > thanks so much for looking into this so quickly :-)
> > >> >
> > >> > One update I have to make is that I tried running a few experiments
> > with
> > >> > this on a 6-node cluster. The current implementation gets stuck at
> > >> > "Rebuilding Workset Properties" and never finishes a single
> iteration.
> > >> > Running the plan of one superstep without a delta iteration
> terminates
> > >> > fine. I didn't have access to the cluster today, so I couldn't debug
> > >> this
> > >> > further, but I will do as soon as I have access again.
> > >> >
> > >> > The rest of my comments are inline:
> > >> >
> > >> > On 30 October 2015 at 17:53, Fabian Hueske <fhue...@gmail.com>
> wrote:
> > >> >
> > >> > > Hi Vasia,
> > >> > >
> > >> > > I had a look at your new implementation and have a few ideas for
> > >> > > improvements.
> > >> > > 1) Sending out the input iterator as you do in the last
> GroupReduce
> > is
> > >> > > quite dangerous and does not give a benefit compared to collecting
> > all
> > >> > > elements. Even though it is an iterator, it needs to be completely
> > >> > > materialized in-memory whenever the record is touched by Flink or
> > user
> > >> > > code.
> > >> > > I would propose to skip the reduce step completely and handle all
> > >> > messages
> > >> > > separates and only collect them in the CoGroup function before
> > giving
> > >> > them
> > >> > > into the VertexComputeFunction. Be careful, to only do that with
> > >> > > objectReuse disabled or take care to properly copy the messages.
> If
> > >> you
> > >> > > collect the messages in the CoGroup, you don't need the
> GroupReduce,
> > >> have
> > >> > > smaller records and you can remove the MessageIterator class
> > >> completely.
> > >> > >
> > >> >
> > >> > ​I see. The idea was to expose to message combiner that user could
> > >> > ​implement if the messages are combinable, e.g. min, sum. This is a
> > >> common
> > >> > case and reduces the message load significantly. Is there a way I
> > could
> > >> do
> > >> > something similar before the coGroup?
> > >> >
> > >> >
> > >> >
> > >> > > 2) Add this annotation to the AppendVertexState function:
> > >> > > @ForwardedFieldsFirst("*->f0"). This indicates that the complete
> > >> element
> > >> > of
> > >> > > the first input becomes the first field of the output. Since the
> > >> input is
> > >> > > partitioned on "f0" (it comes out of the partitioned solution set)
> > the
> > >> > > result of ApplyVertexState will be partitioned on "f0.f0" which is
> > >> > > (accidentially :-D) the join key of the following coGroup function
> > ->
> > >> no
> > >> > > partitioning :-)
> > >> > >
> > >> >
> > >> > ​Great! I totally missed that ;)​
> > >> >
> > >> >
> > >> >
> > >> > > 3) Adding the two flatMap functions behind the CoGroup prevents
> > >> chaining
> > >> > > and causes therefore some serialization overhead but shouldn't be
> > too
> > >> > bad.
> > >> > >
> > >> > > So in total I would make this program as follows:
> > >> > >
> > >> > > iVertices<K,VV>
> > >> > > iMessage<K, Message> = iVertices.map(new InitWorkSet());
> > >> > >
> > >> > > iteration = iVertices.iterateDelta(iMessages, maxIt, 0)
> > >> > > verticesWithMessage<Vertex, Message> = iteration.getSolutionSet()
> > >> > >   .join(iteration.workSet())
> > >> > >   .where(0) // solution set is local and build side
> > >> > >   .equalTo(0) // workset is shuffled and probe side of hashjoin
> > >> > > superstepComp<Vertex,Tuple2<K, Message>,Bool> =
> > >> > > verticesWithMessage.coGroup(edgessWithValue)
> > >> > >   .where("f0.f0") // vwm is locally forward and sorted
> > >> > >   .equalTo(0) //  edges are already partitioned and sorted (if
> > cached
> > >> > > correctly)
> > >> > >   .with(...) // The coGroup collects all messages in a collection
> > and
> > >> > gives
> > >> > > it to the ComputeFunction
> > >> > > delta<Vertex> = superStepComp.flatMap(...) // partitioned when
> > merged
> > >> > into
> > >> > > solution set
> > >> > > workSet<K, Message> = superStepComp.flatMap(...) // partitioned
> for
> > >> join
> > >> > > iteration.closeWith(delta, workSet)
> > >> > >
> > >> > > So, if I am correct, the program will
> > >> > > - partition the workset
> > >> > > - sort the vertices with messages
> > >> > > - partition the delta
> > >> > >
> > >> > > One observation I have is that this program requires that all
> > messages
> > >> > fit
> > >> > > into memory. Was that also the case before?
> > >> > >
> > >> >
> > >> > ​I believe not. The plan has one coGroup that produces the messages
> > and
> > >> a
> > >> > following coGroup that groups by the messages "target ID" and
> consumes
> > >> > them​ in an iterator. That doesn't require them to fit in memory,
> > right?
> > >> >
> > >> >
> > >> > ​I'm also working on a version where the graph is represented as an
> > >> > adjacency list, instead of two separate datasets of vertices and
> > edges.
> > >> The
> > >> > disadvantage is that the graph has to fit in memory, but I think the
> > >> > advantages are many​. We'll be able to support edge value updates,
> > edge
> > >> > mutations and different edge access order guarantees. I'll get back
> to
> > >> this
> > >> > thread when I have a working prototype.
> > >> >
> > >> >
> > >> > >
> > >> > > Cheers,
> > >> > > Fabian
> > >> > >
> > >> >
> > >> > ​Thanks again!
> > >> >
> > >> > Cheers,
> > >> > -Vasia.
> > >> > ​
> > >> >
> > >> >
> > >> > >
> > >> > >
> > >> > > 2015-10-27 19:10 GMT+01:00 Vasiliki Kalavri <
> > >> vasilikikala...@gmail.com>:
> > >> > >
> > >> > > > @Martin: thanks for your input! If you ran into any other issues
> > >> that I
> > >> > > > didn't mention, please let us know. Obviously, even with my
> > >> proposal,
> > >> > > there
> > >> > > > are still features we cannot support, e.g. updating edge values
> > and
> > >> > graph
> > >> > > > mutations. We'll need to re-think the underlying iteration
> and/or
> > >> graph
> > >> > > > representation for those.
> > >> > > >
> > >> > > > @Fabian: thanks a lot, no rush :)
> > >> > > > Let me give you some more information that might make it easier
> to
> > >> > reason
> > >> > > > about performance:
> > >> > > >
> > >> > > > Currently, in Spargel the SolutionSet (SS) keeps the vertex
> state
> > >> and
> > >> > the
> > >> > > > workset (WS) keeps the active vertices. The iteration is
> composed
> > >> of 2
> > >> > > > coGroups. The first one takes the WS and the edges and produces
> > >> > messages.
> > >> > > > The second one takes the messages and the SS and produced the
> new
> > WS
> > >> > and
> > >> > > > the SS-delta.
> > >> > > >
> > >> > > > In my proposal, the SS has the vertex state and the WS has
> > >> <vertexId,
> > >> > > > MessageIterator> pairs, i.e. the inbox of each vertex. The plan
> is
> > >> more
> > >> > > > complicated because compute() needs to have two iterators: over
> > the
> > >> > edges
> > >> > > > and over the messages.
> > >> > > > First, I join SS and WS to get the active vertices (have
> received
> > a
> > >> > msg)
> > >> > > > and their current state. Then I coGroup the result with the
> edges
> > to
> > >> > > access
> > >> > > > the neighbors. Now the main problem is that this coGroup needs
> to
> > >> have
> > >> > 2
> > >> > > > outputs: the new messages and the new vertex value. I couldn't
> > >> really
> > >> > > find
> > >> > > > a nice way to do this, so I'm emitting a Tuple that contains
> both
> > >> types
> > >> > > and
> > >> > > > I have a flag to separate them later with 2 flatMaps. From the
> > >> vertex
> > >> > > > flatMap, I crete the SS-delta and from the messaged flatMap I
> > apply
> > >> a
> > >> > > > reduce to group the messages by vertex and send them to the new
> > WS.
> > >> One
> > >> > > > optimization would be to expose a combiner here to reduce
> message
> > >> size.
> > >> > > >
> > >> > > > tl;dr:
> > >> > > > 1. 2 coGroups vs. Join + coGroup + flatMap + reduce
> > >> > > > 2. how can we efficiently emit 2 different types of records
> from a
> > >> > > coGroup?
> > >> > > > 3. does it make any difference if we group/combine the messages
> > >> before
> > >> > > > updating the workset or after?
> > >> > > >
> > >> > > > Cheers,
> > >> > > > -Vasia.
> > >> > > >
> > >> > > >
> > >> > > > On 27 October 2015 at 18:39, Fabian Hueske <fhue...@gmail.com>
> > >> wrote:
> > >> > > >
> > >> > > > > I'll try to have a look at the proposal from a performance
> point
> > >> of
> > >> > > view
> > >> > > > in
> > >> > > > > the next days.
> > >> > > > > Please ping me, if I don't follow up this thread.
> > >> > > > >
> > >> > > > > Cheers, Fabian
> > >> > > > >
> > >> > > > > 2015-10-27 18:28 GMT+01:00 Martin Junghanns <
> > >> m.jungha...@mailbox.org
> > >> > >:
> > >> > > > >
> > >> > > > > > Hi,
> > >> > > > > >
> > >> > > > > > At our group, we also moved several algorithms from Giraph
> to
> > >> Gelly
> > >> > > and
> > >> > > > > > ran into some confusing issues (first in understanding,
> second
> > >> > during
> > >> > > > > > implementation) caused by the conceptional differences you
> > >> > described.
> > >> > > > > >
> > >> > > > > > If there are no concrete advantages (performance mainly) in
> > the
> > >> > > Spargel
> > >> > > > > > implementation, we would be very happy to see the Gelly API
> be
> > >> > > aligned
> > >> > > > to
> > >> > > > > > Pregel-like systems.
> > >> > > > > >
> > >> > > > > > Your SSSP example speaks for itself. Straightforward, if the
> > >> reader
> > >> > > is
> > >> > > > > > familiar with Pregel/Giraph/...
> > >> > > > > >
> > >> > > > > > Best,
> > >> > > > > > Martin
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On 27.10.2015 17:40, Vasiliki Kalavri wrote:
> > >> > > > > >
> > >> > > > > >> Hello squirrels,
> > >> > > > > >>
> > >> > > > > >> I want to discuss with you a few concerns I have about our
> > >> current
> > >> > > > > >> vertex-centric model implementation, Spargel, now fully
> > >> subsumed
> > >> > by
> > >> > > > > Gelly.
> > >> > > > > >>
> > >> > > > > >> Spargel is our implementation of Pregel [1], but it
> violates
> > >> some
> > >> > > > > >> fundamental properties of the model, as described in the
> > paper
> > >> and
> > >> > > as
> > >> > > > > >> implemented in e.g. Giraph, GPS, Hama. I often find myself
> > >> > confused
> > >> > > > both
> > >> > > > > >> when trying to explain it to current Giraph users and when
> > >> porting
> > >> > > my
> > >> > > > > >> Giraph algorithms to it.
> > >> > > > > >>
> > >> > > > > >> More specifically:
> > >> > > > > >> - in the Pregel model, messages produced in superstep n,
> are
> > >> > > received
> > >> > > > in
> > >> > > > > >> superstep n+1. In Spargel, they are produced and consumed
> in
> > >> the
> > >> > > same
> > >> > > > > >> iteration.
> > >> > > > > >> - in Pregel, vertices are active during a superstep, if
> they
> > >> have
> > >> > > > > received
> > >> > > > > >> a message in the previous superstep. In Spargel, a vertex
> is
> > >> > active
> > >> > > > > during
> > >> > > > > >> a superstep if it has changed its value.
> > >> > > > > >>
> > >> > > > > >> These two differences require a lot of rethinking when
> > porting
> > >> > > > > >> applications
> > >> > > > > >> and can easily cause bugs.
> > >> > > > > >>
> > >> > > > > >> The most important problem however is that we require the
> > user
> > >> to
> > >> > > > split
> > >> > > > > >> the
> > >> > > > > >> computation in 2 phases (2 UDFs):
> > >> > > > > >> - messaging: has access to the vertex state and can produce
> > >> > messages
> > >> > > > > >> - update: has access to incoming messages and can update
> the
> > >> > vertex
> > >> > > > > value
> > >> > > > > >>
> > >> > > > > >> Pregel/Giraph only expose one UDF to the user:
> > >> > > > > >> - compute: has access to both the vertex state and the
> > incoming
> > >> > > > > messages,
> > >> > > > > >> can produce messages and update the vertex value.
> > >> > > > > >>
> > >> > > > > >> This might not seem like a big deal, but except from
> forcing
> > >> the
> > >> > > user
> > >> > > > to
> > >> > > > > >> split their program logic into 2 phases, Spargel also makes
> > >> some
> > >> > > > common
> > >> > > > > >> computation patterns non-intuitive or impossible to write.
> A
> > >> very
> > >> > > > simple
> > >> > > > > >> example is propagating a message based on its value or
> sender
> > >> ID.
> > >> > To
> > >> > > > do
> > >> > > > > >> this with Spargel, one has to store all the incoming
> messages
> > >> in
> > >> > the
> > >> > > > > >> vertex
> > >> > > > > >> value (might be of different type btw) during the messaging
> > >> phase,
> > >> > > so
> > >> > > > > that
> > >> > > > > >> they can be accessed during the update phase.
> > >> > > > > >>
> > >> > > > > >> So, my first question is, when implementing Spargel, were
> > other
> > >> > > > > >> alternatives considered and maybe rejected in favor of
> > >> performance
> > >> > > or
> > >> > > > > >> because of some other reason? If someone knows, I would
> love
> > to
> > >> > hear
> > >> > > > > about
> > >> > > > > >> them!
> > >> > > > > >>
> > >> > > > > >> Second, I wrote a prototype implementation [2] that only
> > >> exposes
> > >> > one
> > >> > > > > UDF,
> > >> > > > > >> compute(), by keeping the vertex state in the solution set
> > and
> > >> the
> > >> > > > > >> messages
> > >> > > > > >> in the workset. This way all previously mentioned
> limitations
> > >> go
> > >> > > away
> > >> > > > > and
> > >> > > > > >> the API (see "SSSPComputeFunction" in the example [3])
> looks
> > a
> > >> lot
> > >> > > > more
> > >> > > > > >> like Giraph (see [4]).
> > >> > > > > >>
> > >> > > > > >> I have not run any experiments yet and the prototype has
> some
> > >> ugly
> > >> > > > > hacks,
> > >> > > > > >> but if you think any of this makes sense, then I'd be
> willing
> > >> to
> > >> > > > follow
> > >> > > > > up
> > >> > > > > >> and try to optimize it. If we see that it performs well, we
> > can
> > >> > > > consider
> > >> > > > > >> either replacing Spargel or adding it as an alternative.
> > >> > > > > >>
> > >> > > > > >> Thanks for reading this long e-mail and looking forward to
> > your
> > >> > > input!
> > >> > > > > >>
> > >> > > > > >> Cheers,
> > >> > > > > >> -Vasia.
> > >> > > > > >>
> > >> > > > > >> [1]: https://kowshik.github.io/JPregel/pregel_paper.pdf
> > >> > > > > >> [2]:
> > >> > > > > >>
> > >> > > > > >>
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/vasia/flink/tree/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew
> > >> > > > > >> [3]:
> > >> > > > > >>
> > >> > > > > >>
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/example/SSSPCompute.java
> > >> > > > > >> [4]:
> > >> > > > > >>
> > >> > > > > >>
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/SingleSourceShortestPaths.java
> > >> > > > > >>
> > >> > > > > >>
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Reply via email to