Thanks for the comments! Concerning acknowledging the checkpoint:
The sinks need to definitely acknowledge it. If we asynchronously write the state of operator (and emit downstream barriers before that is complete), then I think that we also need those operators to acknowledge the checkpoint. For the commit messages: My first thought was to send commit messages simply as actor messages from the JobManager to the vertices that require these messages. That way, they are not stuck in the data flow with its possible latency. Also, in the data flow, messages get duplicated (at all to all connections). For iterative flows: Does the JobManager need to be aware of this, or can the IterationHead handle that transparently for the JobManager. >From our last conversation, I recall: - Receive barriers, push out barriers - snapshot its state - wait for the barriers to come back through the backchannel - write the state snapshot plus the backchannel buffers - then only acknowledge the checkpoint My first impression is that this way the JobManager would not handle the IterationHead any different from all other stateful operators. Greetings, Stephan On Thu, Apr 30, 2015 at 9:27 PM, Paris Carbone <par...@kth.se> wrote: > I agree with all suggestions, thanks for summing it up Stephan. > > A few more points I have in mind at the moment: > > - Regarding the acknowledgements, indeed we don’t need to make all > operators commit back, we just have to make sure that all sinks have > acknowledged a checkpoint to consider it complete back at the coordinator. > > - Do you think we should broadcast commit responses to sources that need > it after every successful checkpoint? The checkpoint interval does not > always match with the frequency we want to initiate a compaction for > example on Kafka. One alternative would be to make sources request a > successful checkpoint id via a future on demand. > > - We have to update the current checkpointing approach to cover iterative > streams. We need to make sure we don’t send checkpoint requests to > iteration heads and handle downstream backup for records in transit during > checkpoints accordingly. > > cheers > Paris > > > On 30 Apr 2015, at 20:47, Stephan Ewen <se...@apache.org> wrote: > > > > I was looking into the handling of state in streaming operators, and it > is > > a bit hidden from the system > > > > Right now, functions can (of they want) put some state into their > context. > > At runtime, state may occur or not. Before runtime, the system cannot > tell > > which operators are going to be stateful, and which are going to be > > stateless. > > > > I think it is a good idea to expose that. We can use that for > optimizations > > and we know which operators need to checkpoint state and acknowledge the > > asynchronous checkpoint. > > > > At this point, we need to assume that all operators need to send a > > confirmation message, which is unnecessary. > > > > Also, I think we should expose which operations want a "commit" > > notification after the checkpoint completed. Good examples are > > > > - the KafkaConsumer source, which can then commit the offset that is > safe > > to zookeeper > > > > - a transactional KafkaProduce sink, which can commit a batch of > messages > > to the kafka partition once the checkpoint is done (to get exactly once > > guarantees that include the sink) > > > > Comments welcome! > > > > Greetings, > > Stephan > >