Okay, so the commit would be something like:

commitState(OperatorState state)


On Thu, Apr 30, 2015 at 10:17 PM, Stephan Ewen <se...@apache.org> wrote:

> I think your assumption (and the current kafka source implementation) is
> that there is one state object that you update/mutate all the time.
>
> If you draw a snapshot state object at the time of checkpoint, the source
> can continue and that particular offset is remembered as the state of this
> checkpoint
> and can be committed to kafka/zookeeper later.
>
> On Thu, Apr 30, 2015 at 10:09 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:
>
> > Regarding the commits (for instance kafka offset):
> >
> > I dont exactly get how you mean to do this, if the source continues
> > processing after the checkpoint and before the commit, it will not know
> > what state has been committed exactly, so it would need to know the time
> of
> > checkpoint and store a local copy.
> >
> > Gyula
> >
> >
> > On Thu, Apr 30, 2015 at 10:04 PM, Stephan Ewen <se...@apache.org> wrote:
> >
> > > Thanks for the comments!
> > >
> > > Concerning acknowledging the checkpoint:
> > >
> > >    The sinks need to definitely acknowledge it.
> > >    If we asynchronously write the state of operator (and emit
> downstream
> > > barriers before that is complete),
> > >    then I think that we also need those operators to acknowledge the
> > > checkpoint.
> > >
> > >
> > > For the commit messages:
> > >
> > >    My first thought was to send commit messages simply as actor
> messages
> > > from the JobManager
> > >    to the vertices that require these messages. That way, they are not
> > > stuck in the data flow with its possible latency.
> > >    Also, in the data flow, messages get duplicated (at all to all
> > > connections).
> > >
> > >
> > > For iterative flows:
> > >
> > > Does the JobManager need to be aware of this, or can the IterationHead
> > > handle that transparently for the JobManager.
> > > From our last conversation, I recall:
> > >  - Receive barriers, push out barriers
> > >  - snapshot its state
> > >  - wait for the barriers to come back through the backchannel
> > >  - write the state snapshot plus the backchannel buffers
> > >  - then only acknowledge the checkpoint
> > >
> > > My first impression is that this way the JobManager would not handle
> the
> > > IterationHead any different from all other stateful operators.
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > >
> > > On Thu, Apr 30, 2015 at 9:27 PM, Paris Carbone <par...@kth.se> wrote:
> > >
> > > > I agree with all suggestions, thanks for summing it up Stephan.
> > > >
> > > > A few more points I have in mind at the moment:
> > > >
> > > > - Regarding the acknowledgements, indeed we don’t need to make all
> > > > operators commit back, we just have to make sure that all sinks have
> > > > acknowledged a checkpoint to consider it complete back at the
> > > coordinator.
> > > >
> > > > - Do you think we should broadcast commit responses to sources that
> > need
> > > > it after every successful checkpoint? The checkpoint interval does
> not
> > > > always match with the frequency we want to initiate a compaction for
> > > > example on Kafka. One alternative would be to make sources request a
> > > > successful checkpoint id via a future on demand.
> > > >
> > > > - We have to update the current checkpointing approach to cover
> > iterative
> > > > streams. We need to make sure we don’t send checkpoint requests to
> > > > iteration heads and handle downstream backup for records in transit
> > > during
> > > > checkpoints accordingly.
> > > >
> > > > cheers
> > > > Paris
> > > >
> > > > > On 30 Apr 2015, at 20:47, Stephan Ewen <se...@apache.org> wrote:
> > > > >
> > > > > I was looking into the handling of state in streaming operators,
> and
> > it
> > > > is
> > > > > a bit hidden from the system
> > > > >
> > > > > Right now, functions can (of they want) put some state into their
> > > > context.
> > > > > At runtime, state may occur or not. Before runtime, the system
> cannot
> > > > tell
> > > > > which operators are going to be stateful, and which are going to be
> > > > > stateless.
> > > > >
> > > > > I think it is a good idea to expose that. We can use that for
> > > > optimizations
> > > > > and we know which operators need to checkpoint state and
> acknowledge
> > > the
> > > > > asynchronous checkpoint.
> > > > >
> > > > > At this point, we need to assume that all operators need to send a
> > > > > confirmation message, which is unnecessary.
> > > > >
> > > > > Also, I think we should expose which operations want a "commit"
> > > > > notification after the checkpoint completed. Good examples are
> > > > >
> > > > >  - the KafkaConsumer source, which can then commit the offset that
> is
> > > > safe
> > > > > to zookeeper
> > > > >
> > > > >  - a transactional KafkaProduce sink, which can commit a batch of
> > > > messages
> > > > > to the kafka partition once the checkpoint is done (to get exactly
> > once
> > > > > guarantees that include the sink)
> > > > >
> > > > > Comments welcome!
> > > > >
> > > > > Greetings,
> > > > > Stephan
> > > >
> > > >
> > >
> >
>

Reply via email to