That sounds great Chad! Please let me know how you progress on. I will try to update the references to the lastest Samza and try the code I have out, and then report back.
Best, Renato M. 2017-02-11 1:35 GMT+01:00 Chad Greenberg <evil_goodn...@hotmail.com>: > Thanks for the link Renato. I was able to incorporate some changes thanks > to Jagadish and Boris, but I will test out your code. I will update the > dependencies and take it from there. > > > ________________________________ > From: Renato Marroquín Mogrovejo <renatoj.marroq...@gmail.com> > Sent: Wednesday, February 8, 2017 5:10:12 PM > To: dev@samza.apache.org > Subject: Re: Questions about checkpointing and Kinesis > > Hi all, > > First of all I'd like to say sorry I have gone missing for such a long time > but student life hasn't been easy last year and I have neglected the work > done on this topic. > Anyways, on [1] is the work done during GSoC (I did share it with people > but publicly) > @Chad I'd be more than happy (and thankful!) to work with you to bring it > up to date and finally get it into Samza trunk, would you be up for that? > :) > > > Best, > > Renato M. > > [1] https://github.com/renato2099/SamzaKinesis > > 2017-02-08 2:26 GMT+01:00 Boris S <bor...@gmail.com>: > > > If you are implementing your own consumer, on start Samza will call > > register method of the consumer and will pass the offsets. > > If you are using CheckpointListener (like Jagadish mentioned) you will > get > > a callback on each checkpoint (so you can ignore the register call), > > otherwise you can use checkpoint passed to you in the register call. > > > > On Tue, Feb 7, 2017 at 9:11 AM, Chad Greenberg < > evil_goodn...@hotmail.com> > > wrote: > > > > > Thanks Jagadish for the reply. A few comments. > > > > > > > > > I was under the impression that using the KCL would mean recording the > > > offsets in Kinesis/DynanoDB and therefore not Samza. Avoiding the KCL > so > > > that I can save state in Samza. > > > > > > > > > I assumed that the checkpoint values were based on the values of the > > put() > > > method, but I did not see any explicit documentation to that effect (I > > have > > > been reading a ton, so I could have missed something). What I do not > see > > > however, is how to retrieve those values upon start-up. Is this the > role > > of > > > the SystemAdmin? Is there any documentation about the use of > SystemAdmin? > > > > > > ________________________________ > > > From: Jagadish Venkatraman <jagadish1...@gmail.com> > > > Sent: Tuesday, February 7, 2017 1:48:00 AM > > > To: dev@samza.apache.org > > > Subject: Re: Questions about checkpointing and Kinesis > > > > > > Great to hear this development on the kinesis consumer! > > > > > > Let me answer some of your questions here. > > > > > > *1. "Kinesis does not have a listener/push framework (unless I missed > > > something)"* > > > > > > Let me point out that Kinesis has both a push and a pull based model. > > You > > > can choose to implement either for your use-case. > > > > > > - *Pull:* The pull based model supports obtaining an *ShardIterator* > > for > > > a shard and iterating on it. Please refer the docs for the > > > *ShardIteratorRequest* here > > > <http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/ > > > amazonaws/services/kinesis/model/GetShardIteratorRequest.html>. > > > It's important that you are aware of re-sharding - The > > > record.getNextShardIterator() can return null if there was a merge > / a > > > split for the shard. (You can trivially handle re-shards by > > re-starting) > > > - *Push:* The push based model directly uses KCL (The Kinesis Client > > > Library) to subscribe to events. KCL will handle sharding, > > re-balancing, > > > checkpointing internally. > > > > > > Your consumer can implement the *CheckpointListener > > > <https://github.com/apache/samza/blob/master/samza-api/ > > > src/main/java/org/apache/samza/checkpoint/CheckpointListener.java>* > > > interface to get notified when Samza performs a checkpoint. You can > save > > > the offsets of the shards to kinesis by invoking > > > IRecordCheckpointer.checkpoint > > > <https://github.com/awslabs/amazon-kinesis-client/blob/ > > > master/src/main/java/com/amazonaws/services/kinesis/ > > > clientlibrary/interfaces/IRecordProcessorCheckpointer.java> > > > APIs. > > > > > > > > > *2. "My plan was to use the default KafkaCheckpointManagerFactory on an > > > timed interval basis"* > > > > > > The checkpoint manager merely provides persistence for the checkpoints. > > (In > > > that sense, it's actually a checkpoint writer). You probably don't want > > to > > > implement a custom checkpoint manager. > > > > > > > > > *3. "What exactly is being checkpointed? What value can I retrieve to > use > > > as an offset for my Kinesis stream? Or is this something I need to keep > > > track of in a store? If so, what is the point of checkpointing? Can I > use > > > RocksDb to save the Kinesis offset at every document (efficiently that > > > is)?"* > > > > > > *- *Samza checkpoints [ssp, offset] pairs for your tasks. > > > - Kinesis has an implicit notion of sequence numbers for every shard > in a > > > stream. You can use that as offsets. > > > - You don't want to record offsets in a separate store. If you want > Samza > > > to manage offsets, Samza will use Kafka internally. If you want Kinesis > > to > > > manage offsets (KCL) , Kinesis will use DynamoDb to store its offsets. > > > > > > > > > > > > > > > On Mon, Feb 6, 2017 at 2:57 PM, Chad Greenberg < > > evil_goodn...@hotmail.com> > > > wrote: > > > > > > > Starting on an integration project between a Kinesis stream and > Samza, > > > > despite have no background in either, but I am familiar with most > other > > > > messaging/queuing systems. > > > > > > > > > > > > Decided to keep all state management within Samza instead of using > > > > Kinesis' client library. My plan was to use the default > > > > KafkaCheckpointManagerFactory on an timed interval basis, but I have > a > > > few > > > > questions. > > > > > > > > > > > > What exactly is being checkpointed? What value can I retrieve to use > as > > > an > > > > offset for my Kinesis stream? Or is this something I need to keep > track > > > of > > > > in a store? If so, what is the point of checkpointing? Can I use > > RocksDb > > > to > > > > save the Kinesis offset at every document (efficiently that is)? > > > > > > > > > > > > Related to Kinesis and not quite Samza, it does not have a > > listener/push > > > > framework, but it requires constant polling (unless I missed > > something). > > > > First of all, I was going to have a partition for each Kinesis shard > > > > partition. But the next question is, should I simply have a > while(true) > > > > polling method inside my consumer(BlockingEnvelopeMap)? Seems > > > inefficient, > > > > even with a timeout. How can I get new data to instantiate a new > > > consumer? > > > > My consumer will put a new document to my task. > > > > > > > > > > > > Cheers. > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > Jagadish V, > > > Graduate Student, > > > Department of Computer Science, > > > Stanford University > > > > > >