I think I have discovered the answer to my question. Hopefully someone can 
confirm my understanding.

The 'flush' method of my SystemProducer will be invoked prior to the 
checkpoint. Thus the checkpoint cannot get ahead of the SystemProducer. Thus 
the guarantee is 'at least once'.

I will do my data transmission in method 'flush' of SystemProducer. Method 
'send' will accumulate the messages, pending flush.

Specifically, method 'commit' of TaskInstance invokes:
        1. collector.flush
        2. offsetManager.checkpoint(taskName)

The TaskInstance commit is invoked by the RunLoop either:
        1. due to time elapsed, or 
        2. by co-ordination

The property task.commit.ms controls the periodic commits. 
The method 'commit' of TaskCoordinator requests a commit (from within method 
'process' of StreamTask).

-----Original Message-----
From: Jarrad, Ken [ICG-IT] 
Sent: 06 September 2016 14:55
To: 'dev@samza.apache.org'
Subject: checkpoint on flush of system producer

I am writing a SystemProducer and I would be grateful for any comments or 
documentation.

I want my SystemProducer to collect messages until flush is invoked.
That is, I don't want to transmit each message 'one by one'.
I want to wait until flush is invoked and then transmit 'in bulk'.

I don't know how to do this without violating the 'at least once' guarantee.

I am concerned that any message that is 'sent' from my StreamTask will be 
included in a checkpoint.
The message might not have been transmitted, however.
The message may reside in my SystemProducer pending flush.

If the StreamTask is re-started from the checkpoint then the message will not 
be replayed.
[The task input stream is a Kafka topic]

How do I co-ordinate the checkpoint mechanism with SystemProducer so that the 
checkpoint is delayed until the message is flushed?

Reply via email to