Okay, sounds very reasonable :) On Thu, Apr 30, 2015 at 10:15 PM, Stephan Ewen <se...@apache.org> wrote:
> For the variant with the "run()" method, this requires strong assumptions > about the internal behavior of the source. > > Unless I am overlooking something, the source needs to guarantee this: > > - It needs to lock internally and perform the state update and record emit > call inside the locked scope > > - It needs to use the same state object all the time, otherwise the driver > and the source may lock different objects > > - The second point makes it very hard to support sources that return > copies (or shadow copies) of the state, to support asynchronous > snapshotting. > > - A per-element lock is an overhead that we could avoid with the "next()" > approach. > > > On Thu, Apr 30, 2015 at 10:04 PM, Gyula Fóra <gyula.f...@gmail.com> wrote: > > > Hi, > > > > The only thing we need is to guarantee that the source will not output > any > > records or update the state while we take the snapshot and send the > > barrier. There are multiple ways of doing this I guess. We could simply > > lock on these objects for instance or add the methods you wrote. If we > > lock, we can assure that no user thread will find a way around the next() > > and hasNext() (which would otherwise cause problems), and we can also > keep > > the current interface. > > > > I think we just need to figure out what is the preferable user interface > > for sources, having a simple run and cancel methods or going with the > > next(), hasNext etc. Or we could just support both. > > > > Gyula > > > > On Thu, Apr 30, 2015 at 8:30 PM, Stephan Ewen <se...@apache.org> wrote: > > > > > Hi all! > > > > > > I think we need to change the interface of the streaming source > function. > > > > > > The function currently has simply a run() method where it does its > work, > > > until canceled. > > > > > > With this, it is hard to write sources, where the state and the > snapshot > > > barriers are exactly aligned. > > > When performing the checkpoint, the vertex will grab the state from the > > > source and inject a checkpoint barrier. It is not clear that the > injected > > > barrier aligns with the state, because the source may have emitted more > > > records since grabbing the state, or not emitted the record that is > > > reflected in the state (offset). > > > > > > If we change the interface to a more iterator-like interface (hasNext() > > and > > > next()), then the vertex calls these methods and can checkpoint > > in-between > > > calling the methods. > > > After hasNext() is a well defined point, where the state can be grabbed > > and > > > the barrier be emitted. > > > > > > > > > Any opinions on that? > > > > > > > > > Stephan > > > > > >