Yep. That's correct.

Cool that you are writing a book! :)


-Matthias

On 6/9/17 7:07 PM, john cheng wrote:
> Tks Matthias, I add some offset log in ProcessorStateManager and
> RocksDBStore.
> The behaviour is just like you explaination.
> 
> This is restart instance1's log. The two active task really doing replay
> work from position 2 of checkpoint file
> 
> [06:55,061] createStreamTask 0_0, partitions: [streams-wc-input1-0]
> [06:55,108] checkpointedOffsets: {streams-wc-Counts-changelog-0=2}
> [06:55,149] Creating restoration consumer client for stream task #0_0
> [06:55,577] restoreActiveState topic-partition:streams-wc-Counts-changelog-0
> [06:55,639] streams-wc-Counts-changelog-0,restoreConsumer endOffset:3
> [06:55,639] streams-wc-Counts-changelog-0,restoreActiveState seek to 2
> [06:55,639] streams-wc-Counts-changelog-0,restoreActiveState position now: 2
> [06:55,823] streams-wc-Counts-changelog-0,restore state, k:msg5, v:00000001
> [06:55,823] RocksDB restored k:msg5, v:00000001
> [06:55,826] streams-wc-Counts-changelog-0,restore current offset:3
> [06:55,842] createStreamTask 0_1, partitions: [streams-wc-input1-1]
> [06:55,846] checkpointedOffsets: {streams-wc-Counts-changelog-1=2}
> [06:55,846] Creating restoration consumer client for stream task #0_1
> [06:55,920] restoreActiveState topic-partition:streams-wc-Counts-changelog-1
> [06:56,338] streams-wc-Counts-changelog-1,restoreConsumer endOffset:3
> [06:56,338] streams-wc-Counts-changelog-1,restoreActiveState seek to 2
> [06:56,339] streams-wc-Counts-changelog-1,restoreActiveState position now: 2
> [06:56,345] streams-wc-Counts-changelog-1,restore state, k:msg6, v:00000001
> [06:56,345] RocksDB restored k:msg6, v:00000001
> [06:56,345] streams-wc-Counts-changelog-1,restore current offset:3
> [06:56,345] restoreConsumer assign: []
> 
> And restart instance2 log, it didn't do restore/replay work, because it's
> already in latest position
> 
> [17:01,143] createStreamTask 0_0, partitions: [streams-wc-input1-0]
> [17:01,180] checkpointedOffsets: {streams-wc-Counts-changelog-0=3}
> [17:01,215] Creating restoration consumer client for stream task #0_0
> [17:01,601] restoreActiveState topic-partition:streams-wc-Counts-changelog-0
> [17:01,638] streams-wc-Counts-changelog-0,restoreConsumer endOffset:3
> [17:01,638] streams-wc-Counts-changelog-0,restoreActiveState seek to 3
> [17:01,638] streams-wc-Counts-changelog-0,restoreActiveState position now: 3
> [17:01,741] streams-wc-Counts-changelog-0,restore current offset:3
> [17:01,751] createStreamTask 0_1, partitions: [streams-wc-input1-1]
> [17:01,754] checkpointedOffsets: {streams-wc-Counts-changelog-1=3}
> [17:01,755] Creating restoration consumer client for stream task #0_1
> [17:01,821] restoreActiveState topic-partition:streams-wc-Counts-changelog-1
> [17:02,191] streams-wc-Counts-changelog-1,restoreConsumer endOffset:3
> [17:02,192] streams-wc-Counts-changelog-1,restoreActiveState seek to 3
> [17:02,192] streams-wc-Counts-changelog-1,restoreActiveState position now: 3
> [17:02,293] streams-wc-Counts-changelog-1,restore current offset:3
> [17:02,294] restoreConsumer assign: []
> 
> Tks again..  Early I was confused about checkpoint, state store, standby
> task, restore/replay from changelog and so on.
> Now I have some conclution :
> 1. StreamTask execute topology and write updated result both to local state
> store and change log topic
> 2. StandbyTask would't execute topology, it only need to keep sync with
> changelog-topic, and update restoredOffsets
> 3. When rebalance or shutdown down application,  close StreamTask or
> StandbyTask will generate checkpoint file from ackedOffsets or
> restoredOffsets
> 4. The checkpoint file for StreamTask and StandbyTask is used for restore
> data from changelog topic
>     But restore process for StreamTask is at register phase on
> ProcessorStateManager
>     the restore process for StandbyTask is running phase on StreamThread.
>    And both this two case will read checkpoint file when create StreamTask
> or StandbyTask
> 
> Ps: I'm wring an chinese book about kafka-internal. So I have to be sure
> all my observation has theory support.
> 
> 
> 2017-06-10 1:23 GMT+08:00 Matthias J. Sax <matth...@confluent.io>:
> 
>> Your observation is completely correct and this is also correct behavior.
>>
>> Note, that instance1 and instance2 both also do have a local RocksDB
>> instance that holds the state. The checkpoint file basically tells
>> streams, what prefix of the changelog topic is already in RocksDB.
>>
>> As Streams loads (non-empty) RocksDB on startup, it only must replay the
>> "tail" of the changelog topic (ie, everything after the checkpointed
>> offsets) to get it's local RocksDB up to date.
>>
>> In your example, instance1 checkpoints offsets 2/2 but the changlog
>> contains 3 messages per partitions. Thus, if you restart instance1 it
>> must replay the two remaining records as RocksDB does contain the first
>> 4 messages already.
>>
>> For instance2, RocksDB contains all 6 messages and thus there is nothing
>> to restore.
>>
>> Does this make sense?
>>
>> -Matthias
>>
>> On 6/7/17 7:02 PM, john cheng wrote:
>>> I have two app instance, input topic has 2 partitions, each instance
>> config
>>> one thread and one replicas.
>>> also, instance1's state-store is /tmp/kafka-streams, instance2's
>>> state-store is /tmp/kafka-streams2.
>>> now I do this experiment to study checkpointin kafka streams (0.10.0.0).
>>>
>>> 1. start instance1, send two msg(msg1 and msg2) to input topic, no
>>> checkpoint file
>>> 2. stop instance1, checkpoint file in /tmp/kafka-streams, both
>> partition's
>>> offset equals 1
>>> 3. restart instance1(althrough new UUID mean new instance, but here
>>> consider the same instance as before), no checkpoint file again
>>> 4. start instance2, send two msg(msg3 and msg4) to input topic, also no
>>> checkpoint file
>>> 5. stop instance1, checkpoint in /tmp/kafka-streams, both partition's
>>> offset equals 2
>>> 6. send two msg(msg5 and msg6) to input topic, now this two msg all go to
>>> instance2
>>> 7. stop instance2, checkpoint in /tmp/kafka-streams2, both partition's
>>> offset equals 3
>>>
>>> after two instance stopped, below is the checkpoint file content
>>>
>>> $ strings kafka-streams/*/*/.checkpoint   --> instance1
>>> streams-wc-Counts-changelog 0 2
>>> streams-wc-Counts-changelog 1 2
>>> $ strings kafka-streams2/*/*/.checkpoint  --> instance2
>>> streams-wc-Counts-changelog 0 3
>>> streams-wc-Counts-changelog 1 3
>>>
>>> I draw a simple table about the partition and offset of each msg, also
>> the
>>> event happend.
>>>
>>>       Partition,Offset  | Partition,Offset    | What Happened
>>> msg1  P0,0
>>> msg2                    | P1,0                | restart instance1
>>> msg3  P0,1                                    | after start instance2
>>> msg4                    | P1,1
>>> msg5  P0,2                                    | after stop instance1
>>> msg6                    | P1,2
>>>
>>> Next, use kafka-consumer-offset-checker.sh to check input-topic, and all
>>> six msg(each partition has three msg) were consumed
>>>
>>> $ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181
>> --topic
>>> streams-wc-input1 --group streams-wc
>>> Group           Topic                          Pid Offset
>> logSize
>>>         Lag             Owner
>>> streams-wc      streams-wc-input1              0   3               3
>>>         0               none
>>> streams-wc      streams-wc-input1              1   3               3
>>>         0               none
>>>
>>> Now If we restart instance1 again, As only one instance exist, so standby
>>> task will not take effect.
>>> That means, instance1 will create two active StreamTask, and both task
>> will
>>> restoreActiveState from changelog topic.
>>>
>>> when restore active task, we have to seek to some position, here as
>>> instance1's state store: /tmp/kafka-streams checkpoint file
>>> has both changelog partition, so restoreConsumer will seek to position 2.
>>>
>>> And change to another situation, What about restart instance2 not
>>> instance1?
>>> the restoreConsumer will seek to position 3, because instance2's active
>>> task read checkpoint in /tmp/kafka-streams2.
>>>
>>> The different between this two situation is which position beginning to
>>> restore StateStore in StreamTask.
>>>
>>> Situation One, only restart instance1, beginning position 2 means, P0
>> seek
>>> after msg3, P1 seek after msg4
>>> Situation Two, only restart instance12 beginning position 3 means, P0
>> seek
>>> after msg5, P1 seek after msg6
>>>
>>> from Partition view, msg1,msg3,msg5 go to partition 0.
>>>
>>> msg     P0's offset  | Instance1 restart  | Instance2 restart
>>> msg1,1  0
>>> msg3,1  1
>>> msg5,1  2              | <- seek at pos 2   |
>>>                                                            | seek at pos
>> 3
>>>
>>> msg     P1's offset  | Instance1 restart  | Instance2 restart
>>> msg2,1  0
>>> msg4,1  1
>>> msg6,1  2              | <- seek at pos 2   |
>>>                                                            | <-seek at
>> pos
>>> 3
>>>
>>> The restore process is poll records from changelog-topic at specific
>>> position.
>>> in situation one, restore msg5 and msg6 to StreamTask's state store. msg5
>>> to task1, msg6 to task0
>>> in situation two, restore nothing to StreamTask's state store???
>>>
>>> I have some question about the restore process and checkpoint below:
>>> 1. Should we seek to beginning, because restore state store must be
>>> complete view of previous?
>>> 2. The two situation described above, What will happen?
>>>
>>> Hope someone expain to me, or collect me if I'm understand wrong. Tks
>>> before.
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to