Thanks Jagadish for the reply. A few comments.
I was under the impression that using the KCL would mean recording the offsets in Kinesis/DynanoDB and therefore not Samza. Avoiding the KCL so that I can save state in Samza. I assumed that the checkpoint values were based on the values of the put() method, but I did not see any explicit documentation to that effect (I have been reading a ton, so I could have missed something). What I do not see however, is how to retrieve those values upon start-up. Is this the role of the SystemAdmin? Is there any documentation about the use of SystemAdmin? ________________________________ From: Jagadish Venkatraman <jagadish1...@gmail.com> Sent: Tuesday, February 7, 2017 1:48:00 AM To: dev@samza.apache.org Subject: Re: Questions about checkpointing and Kinesis Great to hear this development on the kinesis consumer! Let me answer some of your questions here. *1. "Kinesis does not have a listener/push framework (unless I missed something)"* Let me point out that Kinesis has both a push and a pull based model. You can choose to implement either for your use-case. - *Pull:* The pull based model supports obtaining an *ShardIterator* for a shard and iterating on it. Please refer the docs for the *ShardIteratorRequest* here <http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html>. It's important that you are aware of re-sharding - The record.getNextShardIterator() can return null if there was a merge / a split for the shard. (You can trivially handle re-shards by re-starting) - *Push:* The push based model directly uses KCL (The Kinesis Client Library) to subscribe to events. KCL will handle sharding, re-balancing, checkpointing internally. Your consumer can implement the *CheckpointListener <https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointListener.java>* interface to get notified when Samza performs a checkpoint. You can save the offsets of the shards to kinesis by invoking IRecordCheckpointer.checkpoint <https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessorCheckpointer.java> APIs. *2. "My plan was to use the default KafkaCheckpointManagerFactory on an timed interval basis"* The checkpoint manager merely provides persistence for the checkpoints. (In that sense, it's actually a checkpoint writer). You probably don't want to implement a custom checkpoint manager. *3. "What exactly is being checkpointed? What value can I retrieve to use as an offset for my Kinesis stream? Or is this something I need to keep track of in a store? If so, what is the point of checkpointing? Can I use RocksDb to save the Kinesis offset at every document (efficiently that is)?"* *- *Samza checkpoints [ssp, offset] pairs for your tasks. - Kinesis has an implicit notion of sequence numbers for every shard in a stream. You can use that as offsets. - You don't want to record offsets in a separate store. If you want Samza to manage offsets, Samza will use Kafka internally. If you want Kinesis to manage offsets (KCL) , Kinesis will use DynamoDb to store its offsets. On Mon, Feb 6, 2017 at 2:57 PM, Chad Greenberg <evil_goodn...@hotmail.com> wrote: > Starting on an integration project between a Kinesis stream and Samza, > despite have no background in either, but I am familiar with most other > messaging/queuing systems. > > > Decided to keep all state management within Samza instead of using > Kinesis' client library. My plan was to use the default > KafkaCheckpointManagerFactory on an timed interval basis, but I have a few > questions. > > > What exactly is being checkpointed? What value can I retrieve to use as an > offset for my Kinesis stream? Or is this something I need to keep track of > in a store? If so, what is the point of checkpointing? Can I use RocksDb to > save the Kinesis offset at every document (efficiently that is)? > > > Related to Kinesis and not quite Samza, it does not have a listener/push > framework, but it requires constant polling (unless I missed something). > First of all, I was going to have a partition for each Kinesis shard > partition. But the next question is, should I simply have a while(true) > polling method inside my consumer(BlockingEnvelopeMap)? Seems inefficient, > even with a timeout. How can I get new data to instantiate a new consumer? > My consumer will put a new document to my task. > > > Cheers. > > > > -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University