Thanks Jagadish for the reply. A few comments.

I was under the impression that using the KCL would mean recording the offsets 
in Kinesis/DynanoDB and therefore not Samza. Avoiding the KCL so that I can 
save state in Samza.


I assumed that the checkpoint values were based on the values of the put() 
method, but I did not see any explicit documentation to that effect (I have 
been reading a ton, so I could have missed something). What I do not see 
however, is how to retrieve those values upon start-up. Is this the role of the 
SystemAdmin? Is there any documentation about the use of SystemAdmin?

________________________________
From: Jagadish Venkatraman <jagadish1...@gmail.com>
Sent: Tuesday, February 7, 2017 1:48:00 AM
To: dev@samza.apache.org
Subject: Re: Questions about checkpointing and Kinesis

Great to hear this development on the kinesis consumer!

Let me answer some of your questions here.

*1. "Kinesis does not have a listener/push framework (unless I missed
something)"*

 Let me point out that Kinesis has both a push and a pull based model. You
can choose to implement either for your use-case.

   - *Pull:* The pull based model supports obtaining an *ShardIterator* for
   a shard and iterating on it. Please refer the docs for the
   *ShardIteratorRequest* here
   
<http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html>.
   It's important that you are aware of re-sharding - The
   record.getNextShardIterator() can return null if there was a merge / a
   split for the shard. (You can trivially handle re-shards by re-starting)
   - *Push:* The push based model directly uses KCL (The Kinesis Client
   Library) to subscribe to events. KCL will handle sharding, re-balancing,
   checkpointing internally.

Your consumer can implement the *CheckpointListener
<https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointListener.java>*
interface to get notified when Samza performs a checkpoint. You can save
the offsets of the shards to kinesis by invoking
IRecordCheckpointer.checkpoint
<https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessorCheckpointer.java>
APIs.


*2. "My plan was to use the default KafkaCheckpointManagerFactory on an
timed interval basis"*

The checkpoint manager merely provides persistence for the checkpoints. (In
that sense, it's actually a checkpoint writer). You probably don't want to
implement a custom checkpoint manager.


*3. "What exactly is being checkpointed? What value can I retrieve to use
as an offset for my Kinesis stream? Or is this something I need to keep
track of in a store? If so, what is the point of checkpointing? Can I use
RocksDb to save the Kinesis offset at every document (efficiently that
is)?"*

*- *Samza checkpoints [ssp, offset] pairs for your tasks.
- Kinesis has an implicit notion of sequence numbers for every shard in a
stream. You can use that as offsets.
- You don't want to record offsets in a separate store. If you want Samza
to manage offsets, Samza will use Kafka internally. If you want Kinesis to
manage offsets (KCL) , Kinesis will use DynamoDb to store its offsets.




On Mon, Feb 6, 2017 at 2:57 PM, Chad Greenberg <evil_goodn...@hotmail.com>
wrote:

> Starting on an integration project between a Kinesis stream and Samza,
> despite have no background in either, but I am familiar with most other
> messaging/queuing systems.
>
>
> Decided to keep all state management within Samza instead of using
> Kinesis' client library. My plan was to use the default
> KafkaCheckpointManagerFactory on an timed interval basis, but I have a few
> questions.
>
>
> What exactly is being checkpointed? What value can I retrieve to use as an
> offset for my Kinesis stream? Or is this something I need to keep track of
> in a store? If so, what is the point of checkpointing? Can I use RocksDb to
> save the Kinesis offset at every document (efficiently that is)?
>
>
> Related to Kinesis and not quite Samza, it does not have a listener/push
> framework, but it requires constant polling (unless I missed something).
> First of all, I was going to have a partition for each Kinesis shard
> partition. But the next question is, should I simply have a while(true)
> polling method inside my consumer(BlockingEnvelopeMap)? Seems inefficient,
> even with a timeout. How can I get new data to instantiate a new consumer?
> My consumer will put a new document to my task.
>
>
> Cheers.
>
>
>
>


--
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Reply via email to