That would be one way of doing it, yes... On Thu, Apr 30, 2015 at 10:23 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:
> Okay, so the commit would be something like: > > commitState(OperatorState state) > > > On Thu, Apr 30, 2015 at 10:17 PM, Stephan Ewen <se...@apache.org> wrote: > > > I think your assumption (and the current kafka source implementation) is > > that there is one state object that you update/mutate all the time. > > > > If you draw a snapshot state object at the time of checkpoint, the source > > can continue and that particular offset is remembered as the state of > this > > checkpoint > > and can be committed to kafka/zookeeper later. > > > > On Thu, Apr 30, 2015 at 10:09 PM, Gyula Fóra <gyula.f...@gmail.com> > wrote: > > > > > Regarding the commits (for instance kafka offset): > > > > > > I dont exactly get how you mean to do this, if the source continues > > > processing after the checkpoint and before the commit, it will not know > > > what state has been committed exactly, so it would need to know the > time > > of > > > checkpoint and store a local copy. > > > > > > Gyula > > > > > > > > > On Thu, Apr 30, 2015 at 10:04 PM, Stephan Ewen <se...@apache.org> > wrote: > > > > > > > Thanks for the comments! > > > > > > > > Concerning acknowledging the checkpoint: > > > > > > > > The sinks need to definitely acknowledge it. > > > > If we asynchronously write the state of operator (and emit > > downstream > > > > barriers before that is complete), > > > > then I think that we also need those operators to acknowledge the > > > > checkpoint. > > > > > > > > > > > > For the commit messages: > > > > > > > > My first thought was to send commit messages simply as actor > > messages > > > > from the JobManager > > > > to the vertices that require these messages. That way, they are > not > > > > stuck in the data flow with its possible latency. > > > > Also, in the data flow, messages get duplicated (at all to all > > > > connections). > > > > > > > > > > > > For iterative flows: > > > > > > > > Does the JobManager need to be aware of this, or can the > IterationHead > > > > handle that transparently for the JobManager. > > > > From our last conversation, I recall: > > > > - Receive barriers, push out barriers > > > > - snapshot its state > > > > - wait for the barriers to come back through the backchannel > > > > - write the state snapshot plus the backchannel buffers > > > > - then only acknowledge the checkpoint > > > > > > > > My first impression is that this way the JobManager would not handle > > the > > > > IterationHead any different from all other stateful operators. > > > > > > > > Greetings, > > > > Stephan > > > > > > > > > > > > > > > > On Thu, Apr 30, 2015 at 9:27 PM, Paris Carbone <par...@kth.se> > wrote: > > > > > > > > > I agree with all suggestions, thanks for summing it up Stephan. > > > > > > > > > > A few more points I have in mind at the moment: > > > > > > > > > > - Regarding the acknowledgements, indeed we don’t need to make all > > > > > operators commit back, we just have to make sure that all sinks > have > > > > > acknowledged a checkpoint to consider it complete back at the > > > > coordinator. > > > > > > > > > > - Do you think we should broadcast commit responses to sources that > > > need > > > > > it after every successful checkpoint? The checkpoint interval does > > not > > > > > always match with the frequency we want to initiate a compaction > for > > > > > example on Kafka. One alternative would be to make sources request > a > > > > > successful checkpoint id via a future on demand. > > > > > > > > > > - We have to update the current checkpointing approach to cover > > > iterative > > > > > streams. We need to make sure we don’t send checkpoint requests to > > > > > iteration heads and handle downstream backup for records in transit > > > > during > > > > > checkpoints accordingly. > > > > > > > > > > cheers > > > > > Paris > > > > > > > > > > > On 30 Apr 2015, at 20:47, Stephan Ewen <se...@apache.org> wrote: > > > > > > > > > > > > I was looking into the handling of state in streaming operators, > > and > > > it > > > > > is > > > > > > a bit hidden from the system > > > > > > > > > > > > Right now, functions can (of they want) put some state into their > > > > > context. > > > > > > At runtime, state may occur or not. Before runtime, the system > > cannot > > > > > tell > > > > > > which operators are going to be stateful, and which are going to > be > > > > > > stateless. > > > > > > > > > > > > I think it is a good idea to expose that. We can use that for > > > > > optimizations > > > > > > and we know which operators need to checkpoint state and > > acknowledge > > > > the > > > > > > asynchronous checkpoint. > > > > > > > > > > > > At this point, we need to assume that all operators need to send > a > > > > > > confirmation message, which is unnecessary. > > > > > > > > > > > > Also, I think we should expose which operations want a "commit" > > > > > > notification after the checkpoint completed. Good examples are > > > > > > > > > > > > - the KafkaConsumer source, which can then commit the offset > that > > is > > > > > safe > > > > > > to zookeeper > > > > > > > > > > > > - a transactional KafkaProduce sink, which can commit a batch of > > > > > messages > > > > > > to the kafka partition once the checkpoint is done (to get > exactly > > > once > > > > > > guarantees that include the sink) > > > > > > > > > > > > Comments welcome! > > > > > > > > > > > > Greetings, > > > > > > Stephan > > > > > > > > > > > > > > > > > > > >