Your observation is completely correct and this is also correct behavior.

Note, that instance1 and instance2 both also do have a local RocksDB
instance that holds the state. The checkpoint file basically tells
streams, what prefix of the changelog topic is already in RocksDB.

As Streams loads (non-empty) RocksDB on startup, it only must replay the
"tail" of the changelog topic (ie, everything after the checkpointed
offsets) to get it's local RocksDB up to date.

In your example, instance1 checkpoints offsets 2/2 but the changlog
contains 3 messages per partitions. Thus, if you restart instance1 it
must replay the two remaining records as RocksDB does contain the first
4 messages already.

For instance2, RocksDB contains all 6 messages and thus there is nothing
to restore.

Does this make sense?

-Matthias

On 6/7/17 7:02 PM, john cheng wrote:
> I have two app instance, input topic has 2 partitions, each instance config
> one thread and one replicas.
> also, instance1's state-store is /tmp/kafka-streams, instance2's
> state-store is /tmp/kafka-streams2.
> now I do this experiment to study checkpointin kafka streams (0.10.0.0).
> 
> 1. start instance1, send two msg(msg1 and msg2) to input topic, no
> checkpoint file
> 2. stop instance1, checkpoint file in /tmp/kafka-streams, both partition's
> offset equals 1
> 3. restart instance1(althrough new UUID mean new instance, but here
> consider the same instance as before), no checkpoint file again
> 4. start instance2, send two msg(msg3 and msg4) to input topic, also no
> checkpoint file
> 5. stop instance1, checkpoint in /tmp/kafka-streams, both partition's
> offset equals 2
> 6. send two msg(msg5 and msg6) to input topic, now this two msg all go to
> instance2
> 7. stop instance2, checkpoint in /tmp/kafka-streams2, both partition's
> offset equals 3
> 
> after two instance stopped, below is the checkpoint file content
> 
> $ strings kafka-streams/*/*/.checkpoint   --> instance1
> streams-wc-Counts-changelog 0 2
> streams-wc-Counts-changelog 1 2
> $ strings kafka-streams2/*/*/.checkpoint  --> instance2
> streams-wc-Counts-changelog 0 3
> streams-wc-Counts-changelog 1 3
> 
> I draw a simple table about the partition and offset of each msg, also the
> event happend.
> 
>       Partition,Offset  | Partition,Offset    | What Happened
> msg1  P0,0
> msg2                    | P1,0                | restart instance1
> msg3  P0,1                                    | after start instance2
> msg4                    | P1,1
> msg5  P0,2                                    | after stop instance1
> msg6                    | P1,2
> 
> Next, use kafka-consumer-offset-checker.sh to check input-topic, and all
> six msg(each partition has three msg) were consumed
> 
> $ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic
> streams-wc-input1 --group streams-wc
> Group           Topic                          Pid Offset          logSize
>         Lag             Owner
> streams-wc      streams-wc-input1              0   3               3
>         0               none
> streams-wc      streams-wc-input1              1   3               3
>         0               none
> 
> Now If we restart instance1 again, As only one instance exist, so standby
> task will not take effect.
> That means, instance1 will create two active StreamTask, and both task will
> restoreActiveState from changelog topic.
> 
> when restore active task, we have to seek to some position, here as
> instance1's state store: /tmp/kafka-streams checkpoint file
> has both changelog partition, so restoreConsumer will seek to position 2.
> 
> And change to another situation, What about restart instance2 not
> instance1?
> the restoreConsumer will seek to position 3, because instance2's active
> task read checkpoint in /tmp/kafka-streams2.
> 
> The different between this two situation is which position beginning to
> restore StateStore in StreamTask.
> 
> Situation One, only restart instance1, beginning position 2 means, P0 seek
> after msg3, P1 seek after msg4
> Situation Two, only restart instance12 beginning position 3 means, P0 seek
> after msg5, P1 seek after msg6
> 
> from Partition view, msg1,msg3,msg5 go to partition 0.
> 
> msg     P0's offset  | Instance1 restart  | Instance2 restart
> msg1,1  0
> msg3,1  1
> msg5,1  2              | <- seek at pos 2   |
>                                                            | seek at pos 3
> 
> msg     P1's offset  | Instance1 restart  | Instance2 restart
> msg2,1  0
> msg4,1  1
> msg6,1  2              | <- seek at pos 2   |
>                                                            | <-seek at pos
> 3
> 
> The restore process is poll records from changelog-topic at specific
> position.
> in situation one, restore msg5 and msg6 to StreamTask's state store. msg5
> to task1, msg6 to task0
> in situation two, restore nothing to StreamTask's state store???
> 
> I have some question about the restore process and checkpoint below:
> 1. Should we seek to beginning, because restore state store must be
> complete view of previous?
> 2. The two situation described above, What will happen?
> 
> Hope someone expain to me, or collect me if I'm understand wrong. Tks
> before.
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to