Hi, Jarrad, Yes! You have found your answer! Looking forward to your implementation of SystemProducer. Just curious, what's the target output system that you are writing to?
-Yi On Tue, Sep 6, 2016 at 9:01 AM, Jarrad, Ken <ken.jar...@citi.com.invalid> wrote: > I think I have discovered the answer to my question. Hopefully someone can > confirm my understanding. > > The 'flush' method of my SystemProducer will be invoked prior to the > checkpoint. Thus the checkpoint cannot get ahead of the SystemProducer. > Thus the guarantee is 'at least once'. > > I will do my data transmission in method 'flush' of SystemProducer. Method > 'send' will accumulate the messages, pending flush. > > Specifically, method 'commit' of TaskInstance invokes: > 1. collector.flush > 2. offsetManager.checkpoint(taskName) > > The TaskInstance commit is invoked by the RunLoop either: > 1. due to time elapsed, or > 2. by co-ordination > > The property task.commit.ms controls the periodic commits. > The method 'commit' of TaskCoordinator requests a commit (from within > method 'process' of StreamTask). > > -----Original Message----- > From: Jarrad, Ken [ICG-IT] > Sent: 06 September 2016 14:55 > To: 'dev@samza.apache.org' > Subject: checkpoint on flush of system producer > > I am writing a SystemProducer and I would be grateful for any comments or > documentation. > > I want my SystemProducer to collect messages until flush is invoked. > That is, I don't want to transmit each message 'one by one'. > I want to wait until flush is invoked and then transmit 'in bulk'. > > I don't know how to do this without violating the 'at least once' > guarantee. > > I am concerned that any message that is 'sent' from my StreamTask will be > included in a checkpoint. > The message might not have been transmitted, however. > The message may reside in my SystemProducer pending flush. > > If the StreamTask is re-started from the checkpoint then the message will > not be replayed. > [The task input stream is a Kafka topic] > > How do I co-ordinate the checkpoint mechanism with SystemProducer so that > the checkpoint is delayed until the message is flushed? > >