Hi Shi, Naiad/Timely Dataflow and other projects use global coordination which is very convenient for asynchronous progress tracking in general but it has some downsides in a production systems that count on in-flight transactional control mechanisms and rollback recovery guarantees. This is why we generally prefer decentralized approaches (despite their our downsides).
Regarding synchronous/structured iterations, this is a bit off topic and they are a bit of a different story as you already know. We maintain a graph streaming (gelly-streams) library on Flink that you might find interesting [1]. Vasia, another Flink committer is also working on that among others. You can keep an eye on it since we are planning to use this project as a showcase for a new way of doing structured and fixpoint iterations on streams in the future. P.S. many thanks for sharing your publication, it was an interesting read. Do you happen to have your source code public? We could most certainly use it in an benchmark soon. [1] https://github.com/vasia/gelly-streaming On 11 Nov 2016, at 19:18, SHI Xiaogang <shixiaoga...@gmail.com<mailto:shixiaoga...@gmail.com>> wrote: Hi, Fouad Thank you for the explanation. Now the centralized method seems correct to me. The passing of StatusUpdate events will lead to synchronous iterations and we are using the information in each iterations to terminate the computation. Actually, i prefer the centralized method because in many applications, the convergence may depend on some global statistics. For example, a PageRank program may terminate the computation when 99% vertices are converged. I think those learning programs which cannot reach the fixed-point (oscillating around the fixed-point) can benefit a lot from such features. The decentralized method makes it hard to support such convergence conditions. Another concern is that Flink cannot produce periodical results in the iteration over infinite data streams. Take a concrete example. Given an edge stream constructing a graph, the user may need the PageRank weight of each vertex in the graphs formed at certain instants. Currently Flink does not provide any input or iteration information to users, making users hard to implement such real-time iterative applications. Such features are supported in both Naiad and Tornado. I think Flink should support it as well. What do you think? Regards Xiaogang 2016-11-11 19:27 GMT+08:00 Fouad ALi <fouad.alsay...@gmail.com<mailto:fouad.alsay...@gmail.com>>: Hi Shi, It seems that you are referring to the centralized algorithm which is no longer the proposed version. In the decentralized version (check last doc) there is no master node or global coordination involved. Let us keep this discussion to the decentralized one if possible. To answer your points on the previous approach, there is a catch in your trace at t7. Here is what is happening : - Head,as well as RS, will receive a 'BroadcastStatusUpdate' from runtime (see 2.1 in the steps). - RS and Heads will broadcast StatusUpdate event and will not notify its status. - When StatusUpdate event gets back to the head it will notify its WORKING status. Hope that answers your concern. Best, Fouad On Nov 11, 2016, at 6:21 AM, SHI Xiaogang <shixiaoga...@gmail.com<mailto:shixiaoga...@gmail.com>> wrote: Hi Paris I have several concerns about the correctness of the termination protocol. I think the termination protocol put an end to the computation even when the computation has not converged. Suppose there exists a loop context constructed by a OP operator, a Head operator and a Tail operator (illustrated in Figure 2 in the first draft). The stream only contains one record. OP will pass the record to its downstream operators 10 times. In other words, the loop should iterate 10 times. If I understood the protocol correctly, the following event sequence may happen in the computation: t1: RS emits Record to OP. Since RS has reached the "end-of-stream", the system enters into Speculative Phase. t2: OP receives Record and emits it to TAIL. t3: HEAD receives the UpdateStatus event, and notifies with an IDLE state. t4. OP receives the UpdateStatus event from HEAD, and notifies with an WORKING state. t5. TAIL receives Record and emits it to HEAD. t6. TAIL receives the UpdateStatus event from OP, and notifies with an WORKING state. t7. The system starts a new attempt. HEAD receives the UpdateStatus event and notifies with an IDLE state. (Record is still in transition.) t8. OP receives the UpdateStatus event from HEAD and notifies with an IDLE state. t9. TAIL receives the UpdateStatus event from OP and notifies with an IDLE state. t10. HEAD receives Record from TAIL and emits it to OP. t11. System puts an end to the computation. Though the computation is expected to iterate 10 times, it ends earlier. The cause is that the communication channels of MASTER=>HEAD and TAIL=>HEAD are not synchronized. I think the protocol follows the idea of the Chandy-Lamport algorithm to determine a global state. But the information of whether a node has processed any record to since the last request is not STABLE. Hence i doubt the correctness of the protocol. To determine the termination correctly, we need some information that is stable. In timelyflow, Naiad collects the progress made in each iteration and terminates the loop when a little progress is made in an iteration (identified by the timestamp vector). The information is stable because the result of an iteration cannot be changed by the execution of later iterations. A similar method is also adopted in Tornado. You may see my paper for more details about the termination of loops: http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf < http://net.pku.edu.cn/~cuibin/Papers/2016SIGMOD.pdf> Regards Xiaogang 2016-11-11 3:19 GMT+08:00 Paris Carbone <par...@kth.se<mailto:par...@kth.se> <mailto: par...@kth.se<mailto:par...@kth.se>>>: Hi again Flink folks, Here is our new proposal that addresses Job Termination - the loop fault tolerance proposal will follow shortly. As Stephan hinted, we need operators to be aware of their scope level. Thus, it is time we make loops great again! :) Part of this FLIP basically introduces a new functional, compositional API for defining asynchronous loops for DataStreams. This is coupled with a decentralized algorithm for job termination with loops - along the lines of what Stephan described. We are already working on the actual prototypes as you can observe in the links of the doc. Please let us know if you like (or don't like) it and why, in this mail discussion. https://docs.google.com/document/d/1nzTlae0AFimPCTIV1LB3Z2y- PfTHtq3173EhsAkpBoQ cheers Paris and Fouad On 31 Oct 2016, at 12:53, Paris Carbone <par...@kth.se <mailto: par...@kth.se<mailto:par...@kth.se>><mailto:parisc@ kth.se<http://kth.se> <http://kth.se/>>> wrote: Hey Stephan, Thanks for looking into it! +1 for breaking this up, will do that. I can see your point and maybe it makes sense to introduce part of scoping to incorporate support for nested loops (otherwise it can’t work). Let us think about this a bit. We will share another draft for a more detail description of the approach you are suggesting asap. On 27 Oct 2016, at 10:55, Stephan Ewen <se...@apache.org<mailto:se...@apache.org> <mailto: se...@apache.org<mailto:se...@apache.org>><mailto:sewen @apache.org>> wrote: How about we break this up into two FLIPs? There are after all two orthogonal problems (termination, fault tolerance) with quite different discussion states. Concerning fault tolerance, I like the ideas. For the termination proposal, I would like to iterate a bit more. *Termination algorithm:* My main concern here is the introduction of a termination coordinator and any involvement of RPC messages when deciding termination. That would be such a fundamental break with the current runtime architecture, and it would make the currently very elegant and simple model much more complicated and harder to maintain. Given that Flink's runtime is complex enough, I would really like to avoid that. The current runtime paradigm coordinates between operators strictly via in-band events. RPC calls happen between operators and the master for triggering and acknowledging execution and checkpoints. I was wondering whether we can keep following that paradigm and still get most of what you are proposing here. In some sense, all we need to do is replace RPC calls with in-band events, and "decentralize" the coordinator such that every operator can make its own termination decision by itself. This is only a rough sketch, you probably need to flesh it out more. - I assume that the OP in the diagram knows that it is in a loop and that it is the one connected to the head and tail - When OP receives and EndOfStream Event from the regular source (RS), it emits an "AttemptTermination" event downstream to the operators involved in the loop. It attaches an attempt sequence number and memorizes that - Tail and Head forward these events - When OP receives the event back with the same attempt sequence number, and no records came in the meantime, it shuts down and emits EndOfStream downstream - When other records came back between emitting the AttemptTermination event and receiving it back, then it emits a new AttemptTermination event with the next sequence number. - This should terminate as soon as the loop is empty. Might this model even generalize to nested loops, where the "AttemptTermination" event is scoped by the loop's nesting level? Let me know what you think! Best, Stephan On Thu, Oct 27, 2016 at 10:19 AM, Stephan Ewen <se...@apache.org<mailto:se...@apache.org> <mailto:se...@apache.org><mailto: se...@apache.org<mailto:se...@apache.org> <mailto:se...@apache.org>>> wrote: Hi! I am still scanning it and compiling some comments. Give me a bit ;-) Stephan On Wed, Oct 26, 2016 at 6:13 PM, Paris Carbone <par...@kth.se<mailto:par...@kth.se> <mailto: par...@kth.se<mailto:par...@kth.se>><mailto: par...@kth.se<mailto:par...@kth.se> <mailto:par...@kth.se>>> wrote: Hey all, Now that many of you have already scanned the document (judging from the views) maybe it is time to give back some feedback! Did you like it? Would you suggest an improvement? I would suggest not to leave this in the void. It has to do with important properties that the system promises to provide. Me and Fouad will do our best to answer your questions and discuss this further. cheers Paris On 21 Oct 2016, at 08:54, Paris Carbone <par...@kth.se<mailto:par...@kth.se> <mailto: par...@kth.se<mailto:par...@kth.se>><mailto:parisc@ kth.se<http://kth.se> <http://kth.se/>><mailto:parisc@k th.se<http://th.se> <http://th.se/><http://th.se <http://th.se/>>>> wrote: Hello everyone, Loops in Apache Flink have a good potential to become a much more powerful thing in future version of Apache Flink. There is generally high demand to make them usable and first of all production-ready for upcoming releases. As a first commitment we would like to propose FLIP-13 for consistent processing with Loops. We are also working on scoped loops for Q1 2017 which we can share if there is enough interest. For now, that is an improvement proposal that solves two pending major issues: 1) The (not so trivial) problem of correct termination of jobs with iterations 2) The applicability of the checkpointing algorithm to iterative dataflow graphs. We would really appreciate it if you go through the linked draft (motivation and proposed changes) for FLIP-13 and point out comments, preferably publicly in this devlist discussion before we go ahead and update the wiki. https://docs.google.com/document/d/1M6ERj-TzlykMLHzPSwW5L9b0 BhDbtoYucmByBjRBISs/edit?usp=sharing cheers Paris and Fouad