For the variant with the "run()" method, this requires strong assumptions
about the internal behavior of the source.

Unless I am overlooking something, the source needs to guarantee this:

 - It needs to lock internally and perform the state update and record emit
call inside the locked scope

 - It needs to use the same state object all the time, otherwise the driver
and the source may lock different objects

 - The second point makes it very hard to support sources that return
copies (or shadow copies) of the state, to support asynchronous
snapshotting.

 - A per-element lock is an overhead that we could avoid with the "next()"
approach.


On Thu, Apr 30, 2015 at 10:04 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hi,
>
> The only thing we need is to guarantee that the source will not output any
> records or update the state while we take the snapshot and send the
> barrier. There are multiple ways of doing this I guess. We could simply
> lock on these objects for instance or add the methods you wrote. If we
> lock, we can assure that no user thread will find a way around the next()
> and hasNext() (which would otherwise cause problems), and we can also keep
> the current interface.
>
> I think we just need to figure out what is the preferable user interface
> for sources, having a simple run and cancel methods or going with the
> next(), hasNext etc. Or we could just support both.
>
> Gyula
>
> On Thu, Apr 30, 2015 at 8:30 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > Hi all!
> >
> > I think we need to change the interface of the streaming source function.
> >
> > The function currently has simply a run() method where it does its work,
> > until canceled.
> >
> > With this, it is hard to write sources, where the state and the snapshot
> > barriers are exactly aligned.
> > When performing the checkpoint, the vertex will grab the state from the
> > source and inject a checkpoint barrier. It is not clear that the injected
> > barrier aligns with the state, because the source may have emitted more
> > records since grabbing the state, or not emitted the record that is
> > reflected in the state (offset).
> >
> > If we change the interface to a more iterator-like interface (hasNext()
> and
> > next()), then the vertex calls these methods and can checkpoint
> in-between
> > calling the methods.
> > After hasNext() is a well defined point, where the state can be grabbed
> and
> > the barrier be emitted.
> >
> >
> > Any opinions on that?
> >
> >
> > Stephan
> >
>

Reply via email to