Hi all, First of all I'd like to say sorry I have gone missing for such a long time but student life hasn't been easy last year and I have neglected the work done on this topic. Anyways, on [1] is the work done during GSoC (I did share it with people but publicly) @Chad I'd be more than happy (and thankful!) to work with you to bring it up to date and finally get it into Samza trunk, would you be up for that? :)
Best, Renato M. [1] https://github.com/renato2099/SamzaKinesis 2017-02-08 2:26 GMT+01:00 Boris S <bor...@gmail.com>: > If you are implementing your own consumer, on start Samza will call > register method of the consumer and will pass the offsets. > If you are using CheckpointListener (like Jagadish mentioned) you will get > a callback on each checkpoint (so you can ignore the register call), > otherwise you can use checkpoint passed to you in the register call. > > On Tue, Feb 7, 2017 at 9:11 AM, Chad Greenberg <evil_goodn...@hotmail.com> > wrote: > > > Thanks Jagadish for the reply. A few comments. > > > > > > I was under the impression that using the KCL would mean recording the > > offsets in Kinesis/DynanoDB and therefore not Samza. Avoiding the KCL so > > that I can save state in Samza. > > > > > > I assumed that the checkpoint values were based on the values of the > put() > > method, but I did not see any explicit documentation to that effect (I > have > > been reading a ton, so I could have missed something). What I do not see > > however, is how to retrieve those values upon start-up. Is this the role > of > > the SystemAdmin? Is there any documentation about the use of SystemAdmin? > > > > ________________________________ > > From: Jagadish Venkatraman <jagadish1...@gmail.com> > > Sent: Tuesday, February 7, 2017 1:48:00 AM > > To: dev@samza.apache.org > > Subject: Re: Questions about checkpointing and Kinesis > > > > Great to hear this development on the kinesis consumer! > > > > Let me answer some of your questions here. > > > > *1. "Kinesis does not have a listener/push framework (unless I missed > > something)"* > > > > Let me point out that Kinesis has both a push and a pull based model. > You > > can choose to implement either for your use-case. > > > > - *Pull:* The pull based model supports obtaining an *ShardIterator* > for > > a shard and iterating on it. Please refer the docs for the > > *ShardIteratorRequest* here > > <http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/ > > amazonaws/services/kinesis/model/GetShardIteratorRequest.html>. > > It's important that you are aware of re-sharding - The > > record.getNextShardIterator() can return null if there was a merge / a > > split for the shard. (You can trivially handle re-shards by > re-starting) > > - *Push:* The push based model directly uses KCL (The Kinesis Client > > Library) to subscribe to events. KCL will handle sharding, > re-balancing, > > checkpointing internally. > > > > Your consumer can implement the *CheckpointListener > > <https://github.com/apache/samza/blob/master/samza-api/ > > src/main/java/org/apache/samza/checkpoint/CheckpointListener.java>* > > interface to get notified when Samza performs a checkpoint. You can save > > the offsets of the shards to kinesis by invoking > > IRecordCheckpointer.checkpoint > > <https://github.com/awslabs/amazon-kinesis-client/blob/ > > master/src/main/java/com/amazonaws/services/kinesis/ > > clientlibrary/interfaces/IRecordProcessorCheckpointer.java> > > APIs. > > > > > > *2. "My plan was to use the default KafkaCheckpointManagerFactory on an > > timed interval basis"* > > > > The checkpoint manager merely provides persistence for the checkpoints. > (In > > that sense, it's actually a checkpoint writer). You probably don't want > to > > implement a custom checkpoint manager. > > > > > > *3. "What exactly is being checkpointed? What value can I retrieve to use > > as an offset for my Kinesis stream? Or is this something I need to keep > > track of in a store? If so, what is the point of checkpointing? Can I use > > RocksDb to save the Kinesis offset at every document (efficiently that > > is)?"* > > > > *- *Samza checkpoints [ssp, offset] pairs for your tasks. > > - Kinesis has an implicit notion of sequence numbers for every shard in a > > stream. You can use that as offsets. > > - You don't want to record offsets in a separate store. If you want Samza > > to manage offsets, Samza will use Kafka internally. If you want Kinesis > to > > manage offsets (KCL) , Kinesis will use DynamoDb to store its offsets. > > > > > > > > > > On Mon, Feb 6, 2017 at 2:57 PM, Chad Greenberg < > evil_goodn...@hotmail.com> > > wrote: > > > > > Starting on an integration project between a Kinesis stream and Samza, > > > despite have no background in either, but I am familiar with most other > > > messaging/queuing systems. > > > > > > > > > Decided to keep all state management within Samza instead of using > > > Kinesis' client library. My plan was to use the default > > > KafkaCheckpointManagerFactory on an timed interval basis, but I have a > > few > > > questions. > > > > > > > > > What exactly is being checkpointed? What value can I retrieve to use as > > an > > > offset for my Kinesis stream? Or is this something I need to keep track > > of > > > in a store? If so, what is the point of checkpointing? Can I use > RocksDb > > to > > > save the Kinesis offset at every document (efficiently that is)? > > > > > > > > > Related to Kinesis and not quite Samza, it does not have a > listener/push > > > framework, but it requires constant polling (unless I missed > something). > > > First of all, I was going to have a partition for each Kinesis shard > > > partition. But the next question is, should I simply have a while(true) > > > polling method inside my consumer(BlockingEnvelopeMap)? Seems > > inefficient, > > > even with a timeout. How can I get new data to instantiate a new > > consumer? > > > My consumer will put a new document to my task. > > > > > > > > > Cheers. > > > > > > > > > > > > > > > > > > -- > > Jagadish V, > > Graduate Student, > > Department of Computer Science, > > Stanford University > > >