For the variant with the "run()" method, this requires strong assumptions about the internal behavior of the source.
Unless I am overlooking something, the source needs to guarantee this: - It needs to lock internally and perform the state update and record emit call inside the locked scope - It needs to use the same state object all the time, otherwise the driver and the source may lock different objects - The second point makes it very hard to support sources that return copies (or shadow copies) of the state, to support asynchronous snapshotting. - A per-element lock is an overhead that we could avoid with the "next()" approach. On Thu, Apr 30, 2015 at 10:04 PM, Gyula Fóra <gyula.f...@gmail.com> wrote: > Hi, > > The only thing we need is to guarantee that the source will not output any > records or update the state while we take the snapshot and send the > barrier. There are multiple ways of doing this I guess. We could simply > lock on these objects for instance or add the methods you wrote. If we > lock, we can assure that no user thread will find a way around the next() > and hasNext() (which would otherwise cause problems), and we can also keep > the current interface. > > I think we just need to figure out what is the preferable user interface > for sources, having a simple run and cancel methods or going with the > next(), hasNext etc. Or we could just support both. > > Gyula > > On Thu, Apr 30, 2015 at 8:30 PM, Stephan Ewen <se...@apache.org> wrote: > > > Hi all! > > > > I think we need to change the interface of the streaming source function. > > > > The function currently has simply a run() method where it does its work, > > until canceled. > > > > With this, it is hard to write sources, where the state and the snapshot > > barriers are exactly aligned. > > When performing the checkpoint, the vertex will grab the state from the > > source and inject a checkpoint barrier. It is not clear that the injected > > barrier aligns with the state, because the source may have emitted more > > records since grabbing the state, or not emitted the record that is > > reflected in the state (offset). > > > > If we change the interface to a more iterator-like interface (hasNext() > and > > next()), then the vertex calls these methods and can checkpoint > in-between > > calling the methods. > > After hasNext() is a well defined point, where the state can be grabbed > and > > the barrier be emitted. > > > > > > Any opinions on that? > > > > > > Stephan > > >