Okay, sounds very reasonable :)

On Thu, Apr 30, 2015 at 10:15 PM, Stephan Ewen <se...@apache.org> wrote:

> For the variant with the "run()" method, this requires strong assumptions
> about the internal behavior of the source.
>
> Unless I am overlooking something, the source needs to guarantee this:
>
>  - It needs to lock internally and perform the state update and record emit
> call inside the locked scope
>
>  - It needs to use the same state object all the time, otherwise the driver
> and the source may lock different objects
>
>  - The second point makes it very hard to support sources that return
> copies (or shadow copies) of the state, to support asynchronous
> snapshotting.
>
>  - A per-element lock is an overhead that we could avoid with the "next()"
> approach.
>
>
> On Thu, Apr 30, 2015 at 10:04 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:
>
> > Hi,
> >
> > The only thing we need is to guarantee that the source will not output
> any
> > records or update the state while we take the snapshot and send the
> > barrier. There are multiple ways of doing this I guess. We could simply
> > lock on these objects for instance or add the methods you wrote. If we
> > lock, we can assure that no user thread will find a way around the next()
> > and hasNext() (which would otherwise cause problems), and we can also
> keep
> > the current interface.
> >
> > I think we just need to figure out what is the preferable user interface
> > for sources, having a simple run and cancel methods or going with the
> > next(), hasNext etc. Or we could just support both.
> >
> > Gyula
> >
> > On Thu, Apr 30, 2015 at 8:30 PM, Stephan Ewen <se...@apache.org> wrote:
> >
> > > Hi all!
> > >
> > > I think we need to change the interface of the streaming source
> function.
> > >
> > > The function currently has simply a run() method where it does its
> work,
> > > until canceled.
> > >
> > > With this, it is hard to write sources, where the state and the
> snapshot
> > > barriers are exactly aligned.
> > > When performing the checkpoint, the vertex will grab the state from the
> > > source and inject a checkpoint barrier. It is not clear that the
> injected
> > > barrier aligns with the state, because the source may have emitted more
> > > records since grabbing the state, or not emitted the record that is
> > > reflected in the state (offset).
> > >
> > > If we change the interface to a more iterator-like interface (hasNext()
> > and
> > > next()), then the vertex calls these methods and can checkpoint
> > in-between
> > > calling the methods.
> > > After hasNext() is a well defined point, where the state can be grabbed
> > and
> > > the barrier be emitted.
> > >
> > >
> > > Any opinions on that?
> > >
> > >
> > > Stephan
> > >
> >
>

Reply via email to