Hi Xingcan, FLINK-1885 looked into adding a bulk mode to Gelly's iterative models.
As an alternative you could implement your algorithm with Flink operators and a bulk iteration. Most of the Gelly library is written with native operators. Greg On Fri, Feb 10, 2017 at 5:02 AM, Xingcan Cui <[email protected]> wrote: > Hi Vasia, > > b) As I said, when some vertices finished their work in current phase, > they have nothing to do (no value updates, no message received, just like > slept) but to wait for other vertices that have not finished (the current > phase) yet. After that in the next phase, all the vertices should go back > to work again and if there are some vertices become inactive in last phase, > it could be hard to reactive them again by message since we even don't know > which vertices to send to. The only solution is to keep all vertices > active, whether by updating vertices values in each super step or sending > heartbeat messages to vertices themselves (which will bring a lot of extra > work to the MessageCombiner). > > c) I know it's not elegant or even an awful idea to store the edge info > into vertex values. However, we can not change edge values or maintain > states (even a pick or unpick mark) in edges during a vertex-centric > iteration. Then what can we do if an algorithm really need that? > > Thanks for your patience. > > Best, > Xingcan > > On Fri, Feb 10, 2017 at 4:50 PM, Vasiliki Kalavri < > [email protected]> wrote: > >> Hi Xingcan, >> >> On 9 February 2017 at 18:16, Xingcan Cui <[email protected]> wrote: >> >>> Hi Vasia, >>> >>> thanks for your reply. It helped a lot and I got some new ideas. >>> >>> a) As you said, I did use the getPreviousIterationAggregate() method in >>> preSuperstep() of the next superstep. >>> However, if the (only?) global (aggregate) results can not be guaranteed >>> to be consistency, what should we >>> do with the postSuperstep() method? >>> >> >> The postSuperstep() method is analogous to the close() method in a >> RichFunction, which is typically used for cleanup. >> >> >> >>> >>> b) Though we can active vertices by update method or messages, IMO, it >>> may be more proper for users >>> themselves to decide when to halt a vertex's iteration. Considering a >>> complex algorithm that contains different >>> phases inside a vertex-centric iteration. Before moving to the next >>> phase (that should be synchronized), >>> there may be some vertices that already finished their work in current >>> phase and they just wait for others. >>> Users may choose the finished vertices to idle until the next phase, but >>> rather than to halt them. >>> Can we consider adding the voteToHalt() method and some internal >>> variables to the Vertex/Edge class >>> (or just create an "advanced" version of them) to make the halting more >>> controllable? >>> >> >> >> I suppose adding a voteToHalt() method is possible, but I'm not sure I >> see how that would make halting more controllable. If a vertex hasn't >> changed value or hasn't received a message, it has no work to do in the >> next iteration, so why keep it active? If in a later superstep, a >> previously inactive vertex receives a message, it will become active again. >> Is this what you're looking for or am I missing something? >> >> >> >>> >>> c) Sorry that I didn't make it clear before. Here the initialization >>> means a "global" one that executes once >>> before the iteration. For example, users may want to initialize the >>> vertices' values by their adjacent edges >>> before the iteration starts. Maybe we can add an extra coGroupFunction >>> to the configuration parameters >>> and apply it before the iteration? >>> >> >> >> You can initialize the graph by using any Gelly transformation methods >> before starting the iteration, e.g. mapVertices, mapEdges, reduceOnEdges, >> etc. >> Btw, a vertex can iterate over its edges inside the ComputeFunction using >> the getEdges() method. Initializing the vertex values with neighboring >> edges might not be a good idea if you have vertices with high degrees. >> >> >> Cheers, >> -Vasia. >> >> >> >>> >>> What do you think? >>> >>> (BTW, I started a PR on FLINK-1526(MST Lib&Example). Considering the >>> complexity, the example is not >>> provided.) >>> >>> Really appreciate for all your help. >>> >>> Best, >>> Xingcan >>> >>> On Thu, Feb 9, 2017 at 5:36 PM, Vasiliki Kalavri < >>> [email protected]> wrote: >>> >>>> Hi Xingcan, >>>> >>>> On 7 February 2017 at 10:10, Xingcan Cui <[email protected]> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I got some question about the vertex-centric iteration in Gelly. >>>>> >>>>> a) It seems the postSuperstep method is called before the superstep >>>>> barrier (I got different aggregate values of the same superstep in this >>>>> method). Is this a bug? Or the design is just like that? >>>>> >>>> >>>> The postSuperstep() method is called inside the close() method of a >>>> RichCoGroupFunction that wraps the ComputeFunction. The close() method >>>> It is called after the last call to the coGroup() after each iteration >>>> superstep. >>>> The aggregate values are not guaranteed to be consistent during the >>>> same superstep when they are computed. To retrieve an aggregate value for >>>> superstep i, you should use the getPreviousIterationAggregate() method >>>> in superstep i+1. >>>> >>>> >>>>> >>>>> b) There is not setHalt method for vertices. When no message received, >>>>> a vertex just quit the next iteration. Should I manually send messages >>>>> (like heartbeat) to keep the vertices active? >>>>> >>>> >>>> That's because vertex halting is implicitly controlled by the >>>> underlying delta iterations of Flink. A vertex will remain active as long >>>> as it receives a message or it updates its value, otherwise it will become >>>> inactive. The documentation on Gelly iterations [1] and DataSet iterations >>>> [2] might be helpful. >>>> >>>> >>>> >>>>> >>>>> c) I think we may need an initialization method in the ComputeFunction. >>>>> >>>> >>>> >>>> There exists a preSuperstep() method for initialization. This one will >>>> be executed once per superstep before the compute function is invoked for >>>> every vertex. Would this work for you? >>>> >>>> >>>> >>>>> >>>>> Any opinions? Thanks. >>>>> >>>>> Best, >>>>> Xingcan >>>>> >>>>> >>>>> >>>> I hope this helps, >>>> -Vasia. >>>> >>>> >>>> [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.2/ >>>> dev/libs/gelly/iterative_graph_processing.html#vertex-centri >>>> c-iterations >>>> [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.2/ >>>> dev/batch/iterations.html >>>> >>>> >>> >> >
