I think your assumption (and the current kafka source implementation) is that there is one state object that you update/mutate all the time.
If you draw a snapshot state object at the time of checkpoint, the source can continue and that particular offset is remembered as the state of this checkpoint and can be committed to kafka/zookeeper later. On Thu, Apr 30, 2015 at 10:09 PM, Gyula Fóra <gyula.f...@gmail.com> wrote: > Regarding the commits (for instance kafka offset): > > I dont exactly get how you mean to do this, if the source continues > processing after the checkpoint and before the commit, it will not know > what state has been committed exactly, so it would need to know the time of > checkpoint and store a local copy. > > Gyula > > > On Thu, Apr 30, 2015 at 10:04 PM, Stephan Ewen <se...@apache.org> wrote: > > > Thanks for the comments! > > > > Concerning acknowledging the checkpoint: > > > > The sinks need to definitely acknowledge it. > > If we asynchronously write the state of operator (and emit downstream > > barriers before that is complete), > > then I think that we also need those operators to acknowledge the > > checkpoint. > > > > > > For the commit messages: > > > > My first thought was to send commit messages simply as actor messages > > from the JobManager > > to the vertices that require these messages. That way, they are not > > stuck in the data flow with its possible latency. > > Also, in the data flow, messages get duplicated (at all to all > > connections). > > > > > > For iterative flows: > > > > Does the JobManager need to be aware of this, or can the IterationHead > > handle that transparently for the JobManager. > > From our last conversation, I recall: > > - Receive barriers, push out barriers > > - snapshot its state > > - wait for the barriers to come back through the backchannel > > - write the state snapshot plus the backchannel buffers > > - then only acknowledge the checkpoint > > > > My first impression is that this way the JobManager would not handle the > > IterationHead any different from all other stateful operators. > > > > Greetings, > > Stephan > > > > > > > > On Thu, Apr 30, 2015 at 9:27 PM, Paris Carbone <par...@kth.se> wrote: > > > > > I agree with all suggestions, thanks for summing it up Stephan. > > > > > > A few more points I have in mind at the moment: > > > > > > - Regarding the acknowledgements, indeed we don’t need to make all > > > operators commit back, we just have to make sure that all sinks have > > > acknowledged a checkpoint to consider it complete back at the > > coordinator. > > > > > > - Do you think we should broadcast commit responses to sources that > need > > > it after every successful checkpoint? The checkpoint interval does not > > > always match with the frequency we want to initiate a compaction for > > > example on Kafka. One alternative would be to make sources request a > > > successful checkpoint id via a future on demand. > > > > > > - We have to update the current checkpointing approach to cover > iterative > > > streams. We need to make sure we don’t send checkpoint requests to > > > iteration heads and handle downstream backup for records in transit > > during > > > checkpoints accordingly. > > > > > > cheers > > > Paris > > > > > > > On 30 Apr 2015, at 20:47, Stephan Ewen <se...@apache.org> wrote: > > > > > > > > I was looking into the handling of state in streaming operators, and > it > > > is > > > > a bit hidden from the system > > > > > > > > Right now, functions can (of they want) put some state into their > > > context. > > > > At runtime, state may occur or not. Before runtime, the system cannot > > > tell > > > > which operators are going to be stateful, and which are going to be > > > > stateless. > > > > > > > > I think it is a good idea to expose that. We can use that for > > > optimizations > > > > and we know which operators need to checkpoint state and acknowledge > > the > > > > asynchronous checkpoint. > > > > > > > > At this point, we need to assume that all operators need to send a > > > > confirmation message, which is unnecessary. > > > > > > > > Also, I think we should expose which operations want a "commit" > > > > notification after the checkpoint completed. Good examples are > > > > > > > > - the KafkaConsumer source, which can then commit the offset that is > > > safe > > > > to zookeeper > > > > > > > > - a transactional KafkaProduce sink, which can commit a batch of > > > messages > > > > to the kafka partition once the checkpoint is done (to get exactly > once > > > > guarantees that include the sink) > > > > > > > > Comments welcome! > > > > > > > > Greetings, > > > > Stephan > > > > > > > > >