If you are implementing your own consumer, on start Samza will call register method of the consumer and will pass the offsets. If you are using CheckpointListener (like Jagadish mentioned) you will get a callback on each checkpoint (so you can ignore the register call), otherwise you can use checkpoint passed to you in the register call.
On Tue, Feb 7, 2017 at 9:11 AM, Chad Greenberg <evil_goodn...@hotmail.com> wrote: > Thanks Jagadish for the reply. A few comments. > > > I was under the impression that using the KCL would mean recording the > offsets in Kinesis/DynanoDB and therefore not Samza. Avoiding the KCL so > that I can save state in Samza. > > > I assumed that the checkpoint values were based on the values of the put() > method, but I did not see any explicit documentation to that effect (I have > been reading a ton, so I could have missed something). What I do not see > however, is how to retrieve those values upon start-up. Is this the role of > the SystemAdmin? Is there any documentation about the use of SystemAdmin? > > ________________________________ > From: Jagadish Venkatraman <jagadish1...@gmail.com> > Sent: Tuesday, February 7, 2017 1:48:00 AM > To: dev@samza.apache.org > Subject: Re: Questions about checkpointing and Kinesis > > Great to hear this development on the kinesis consumer! > > Let me answer some of your questions here. > > *1. "Kinesis does not have a listener/push framework (unless I missed > something)"* > > Let me point out that Kinesis has both a push and a pull based model. You > can choose to implement either for your use-case. > > - *Pull:* The pull based model supports obtaining an *ShardIterator* for > a shard and iterating on it. Please refer the docs for the > *ShardIteratorRequest* here > <http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/ > amazonaws/services/kinesis/model/GetShardIteratorRequest.html>. > It's important that you are aware of re-sharding - The > record.getNextShardIterator() can return null if there was a merge / a > split for the shard. (You can trivially handle re-shards by re-starting) > - *Push:* The push based model directly uses KCL (The Kinesis Client > Library) to subscribe to events. KCL will handle sharding, re-balancing, > checkpointing internally. > > Your consumer can implement the *CheckpointListener > <https://github.com/apache/samza/blob/master/samza-api/ > src/main/java/org/apache/samza/checkpoint/CheckpointListener.java>* > interface to get notified when Samza performs a checkpoint. You can save > the offsets of the shards to kinesis by invoking > IRecordCheckpointer.checkpoint > <https://github.com/awslabs/amazon-kinesis-client/blob/ > master/src/main/java/com/amazonaws/services/kinesis/ > clientlibrary/interfaces/IRecordProcessorCheckpointer.java> > APIs. > > > *2. "My plan was to use the default KafkaCheckpointManagerFactory on an > timed interval basis"* > > The checkpoint manager merely provides persistence for the checkpoints. (In > that sense, it's actually a checkpoint writer). You probably don't want to > implement a custom checkpoint manager. > > > *3. "What exactly is being checkpointed? What value can I retrieve to use > as an offset for my Kinesis stream? Or is this something I need to keep > track of in a store? If so, what is the point of checkpointing? Can I use > RocksDb to save the Kinesis offset at every document (efficiently that > is)?"* > > *- *Samza checkpoints [ssp, offset] pairs for your tasks. > - Kinesis has an implicit notion of sequence numbers for every shard in a > stream. You can use that as offsets. > - You don't want to record offsets in a separate store. If you want Samza > to manage offsets, Samza will use Kafka internally. If you want Kinesis to > manage offsets (KCL) , Kinesis will use DynamoDb to store its offsets. > > > > > On Mon, Feb 6, 2017 at 2:57 PM, Chad Greenberg <evil_goodn...@hotmail.com> > wrote: > > > Starting on an integration project between a Kinesis stream and Samza, > > despite have no background in either, but I am familiar with most other > > messaging/queuing systems. > > > > > > Decided to keep all state management within Samza instead of using > > Kinesis' client library. My plan was to use the default > > KafkaCheckpointManagerFactory on an timed interval basis, but I have a > few > > questions. > > > > > > What exactly is being checkpointed? What value can I retrieve to use as > an > > offset for my Kinesis stream? Or is this something I need to keep track > of > > in a store? If so, what is the point of checkpointing? Can I use RocksDb > to > > save the Kinesis offset at every document (efficiently that is)? > > > > > > Related to Kinesis and not quite Samza, it does not have a listener/push > > framework, but it requires constant polling (unless I missed something). > > First of all, I was going to have a partition for each Kinesis shard > > partition. But the next question is, should I simply have a while(true) > > polling method inside my consumer(BlockingEnvelopeMap)? Seems > inefficient, > > even with a timeout. How can I get new data to instantiate a new > consumer? > > My consumer will put a new document to my task. > > > > > > Cheers. > > > > > > > > > > > -- > Jagadish V, > Graduate Student, > Department of Computer Science, > Stanford University >