That would be one way of doing it, yes...

On Thu, Apr 30, 2015 at 10:23 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:

> Okay, so the commit would be something like:
>
> commitState(OperatorState state)
>
>
> On Thu, Apr 30, 2015 at 10:17 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > I think your assumption (and the current kafka source implementation) is
> > that there is one state object that you update/mutate all the time.
> >
> > If you draw a snapshot state object at the time of checkpoint, the source
> > can continue and that particular offset is remembered as the state of
> this
> > checkpoint
> > and can be committed to kafka/zookeeper later.
> >
> > On Thu, Apr 30, 2015 at 10:09 PM, Gyula Fóra <gyula.f...@gmail.com>
> wrote:
> >
> > > Regarding the commits (for instance kafka offset):
> > >
> > > I dont exactly get how you mean to do this, if the source continues
> > > processing after the checkpoint and before the commit, it will not know
> > > what state has been committed exactly, so it would need to know the
> time
> > of
> > > checkpoint and store a local copy.
> > >
> > > Gyula
> > >
> > >
> > > On Thu, Apr 30, 2015 at 10:04 PM, Stephan Ewen <se...@apache.org>
> wrote:
> > >
> > > > Thanks for the comments!
> > > >
> > > > Concerning acknowledging the checkpoint:
> > > >
> > > >    The sinks need to definitely acknowledge it.
> > > >    If we asynchronously write the state of operator (and emit
> > downstream
> > > > barriers before that is complete),
> > > >    then I think that we also need those operators to acknowledge the
> > > > checkpoint.
> > > >
> > > >
> > > > For the commit messages:
> > > >
> > > >    My first thought was to send commit messages simply as actor
> > messages
> > > > from the JobManager
> > > >    to the vertices that require these messages. That way, they are
> not
> > > > stuck in the data flow with its possible latency.
> > > >    Also, in the data flow, messages get duplicated (at all to all
> > > > connections).
> > > >
> > > >
> > > > For iterative flows:
> > > >
> > > > Does the JobManager need to be aware of this, or can the
> IterationHead
> > > > handle that transparently for the JobManager.
> > > > From our last conversation, I recall:
> > > >  - Receive barriers, push out barriers
> > > >  - snapshot its state
> > > >  - wait for the barriers to come back through the backchannel
> > > >  - write the state snapshot plus the backchannel buffers
> > > >  - then only acknowledge the checkpoint
> > > >
> > > > My first impression is that this way the JobManager would not handle
> > the
> > > > IterationHead any different from all other stateful operators.
> > > >
> > > > Greetings,
> > > > Stephan
> > > >
> > > >
> > > >
> > > > On Thu, Apr 30, 2015 at 9:27 PM, Paris Carbone <par...@kth.se>
> wrote:
> > > >
> > > > > I agree with all suggestions, thanks for summing it up Stephan.
> > > > >
> > > > > A few more points I have in mind at the moment:
> > > > >
> > > > > - Regarding the acknowledgements, indeed we don’t need to make all
> > > > > operators commit back, we just have to make sure that all sinks
> have
> > > > > acknowledged a checkpoint to consider it complete back at the
> > > > coordinator.
> > > > >
> > > > > - Do you think we should broadcast commit responses to sources that
> > > need
> > > > > it after every successful checkpoint? The checkpoint interval does
> > not
> > > > > always match with the frequency we want to initiate a compaction
> for
> > > > > example on Kafka. One alternative would be to make sources request
> a
> > > > > successful checkpoint id via a future on demand.
> > > > >
> > > > > - We have to update the current checkpointing approach to cover
> > > iterative
> > > > > streams. We need to make sure we don’t send checkpoint requests to
> > > > > iteration heads and handle downstream backup for records in transit
> > > > during
> > > > > checkpoints accordingly.
> > > > >
> > > > > cheers
> > > > > Paris
> > > > >
> > > > > > On 30 Apr 2015, at 20:47, Stephan Ewen <se...@apache.org> wrote:
> > > > > >
> > > > > > I was looking into the handling of state in streaming operators,
> > and
> > > it
> > > > > is
> > > > > > a bit hidden from the system
> > > > > >
> > > > > > Right now, functions can (of they want) put some state into their
> > > > > context.
> > > > > > At runtime, state may occur or not. Before runtime, the system
> > cannot
> > > > > tell
> > > > > > which operators are going to be stateful, and which are going to
> be
> > > > > > stateless.
> > > > > >
> > > > > > I think it is a good idea to expose that. We can use that for
> > > > > optimizations
> > > > > > and we know which operators need to checkpoint state and
> > acknowledge
> > > > the
> > > > > > asynchronous checkpoint.
> > > > > >
> > > > > > At this point, we need to assume that all operators need to send
> a
> > > > > > confirmation message, which is unnecessary.
> > > > > >
> > > > > > Also, I think we should expose which operations want a "commit"
> > > > > > notification after the checkpoint completed. Good examples are
> > > > > >
> > > > > >  - the KafkaConsumer source, which can then commit the offset
> that
> > is
> > > > > safe
> > > > > > to zookeeper
> > > > > >
> > > > > >  - a transactional KafkaProduce sink, which can commit a batch of
> > > > > messages
> > > > > > to the kafka partition once the checkpoint is done (to get
> exactly
> > > once
> > > > > > guarantees that include the sink)
> > > > > >
> > > > > > Comments welcome!
> > > > > >
> > > > > > Greetings,
> > > > > > Stephan
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to