I think your assumption (and the current kafka source implementation) is
that there is one state object that you update/mutate all the time.

If you draw a snapshot state object at the time of checkpoint, the source
can continue and that particular offset is remembered as the state of this
checkpoint
and can be committed to kafka/zookeeper later.

On Thu, Apr 30, 2015 at 10:09 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:

> Regarding the commits (for instance kafka offset):
>
> I dont exactly get how you mean to do this, if the source continues
> processing after the checkpoint and before the commit, it will not know
> what state has been committed exactly, so it would need to know the time of
> checkpoint and store a local copy.
>
> Gyula
>
>
> On Thu, Apr 30, 2015 at 10:04 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > Thanks for the comments!
> >
> > Concerning acknowledging the checkpoint:
> >
> >    The sinks need to definitely acknowledge it.
> >    If we asynchronously write the state of operator (and emit downstream
> > barriers before that is complete),
> >    then I think that we also need those operators to acknowledge the
> > checkpoint.
> >
> >
> > For the commit messages:
> >
> >    My first thought was to send commit messages simply as actor messages
> > from the JobManager
> >    to the vertices that require these messages. That way, they are not
> > stuck in the data flow with its possible latency.
> >    Also, in the data flow, messages get duplicated (at all to all
> > connections).
> >
> >
> > For iterative flows:
> >
> > Does the JobManager need to be aware of this, or can the IterationHead
> > handle that transparently for the JobManager.
> > From our last conversation, I recall:
> >  - Receive barriers, push out barriers
> >  - snapshot its state
> >  - wait for the barriers to come back through the backchannel
> >  - write the state snapshot plus the backchannel buffers
> >  - then only acknowledge the checkpoint
> >
> > My first impression is that this way the JobManager would not handle the
> > IterationHead any different from all other stateful operators.
> >
> > Greetings,
> > Stephan
> >
> >
> >
> > On Thu, Apr 30, 2015 at 9:27 PM, Paris Carbone <par...@kth.se> wrote:
> >
> > > I agree with all suggestions, thanks for summing it up Stephan.
> > >
> > > A few more points I have in mind at the moment:
> > >
> > > - Regarding the acknowledgements, indeed we don’t need to make all
> > > operators commit back, we just have to make sure that all sinks have
> > > acknowledged a checkpoint to consider it complete back at the
> > coordinator.
> > >
> > > - Do you think we should broadcast commit responses to sources that
> need
> > > it after every successful checkpoint? The checkpoint interval does not
> > > always match with the frequency we want to initiate a compaction for
> > > example on Kafka. One alternative would be to make sources request a
> > > successful checkpoint id via a future on demand.
> > >
> > > - We have to update the current checkpointing approach to cover
> iterative
> > > streams. We need to make sure we don’t send checkpoint requests to
> > > iteration heads and handle downstream backup for records in transit
> > during
> > > checkpoints accordingly.
> > >
> > > cheers
> > > Paris
> > >
> > > > On 30 Apr 2015, at 20:47, Stephan Ewen <se...@apache.org> wrote:
> > > >
> > > > I was looking into the handling of state in streaming operators, and
> it
> > > is
> > > > a bit hidden from the system
> > > >
> > > > Right now, functions can (of they want) put some state into their
> > > context.
> > > > At runtime, state may occur or not. Before runtime, the system cannot
> > > tell
> > > > which operators are going to be stateful, and which are going to be
> > > > stateless.
> > > >
> > > > I think it is a good idea to expose that. We can use that for
> > > optimizations
> > > > and we know which operators need to checkpoint state and acknowledge
> > the
> > > > asynchronous checkpoint.
> > > >
> > > > At this point, we need to assume that all operators need to send a
> > > > confirmation message, which is unnecessary.
> > > >
> > > > Also, I think we should expose which operations want a "commit"
> > > > notification after the checkpoint completed. Good examples are
> > > >
> > > >  - the KafkaConsumer source, which can then commit the offset that is
> > > safe
> > > > to zookeeper
> > > >
> > > >  - a transactional KafkaProduce sink, which can commit a batch of
> > > messages
> > > > to the kafka partition once the checkpoint is done (to get exactly
> once
> > > > guarantees that include the sink)
> > > >
> > > > Comments welcome!
> > > >
> > > > Greetings,
> > > > Stephan
> > >
> > >
> >
>

Reply via email to