Yep. That's correct. Cool that you are writing a book! :)
-Matthias On 6/9/17 7:07 PM, john cheng wrote: > Tks Matthias, I add some offset log in ProcessorStateManager and > RocksDBStore. > The behaviour is just like you explaination. > > This is restart instance1's log. The two active task really doing replay > work from position 2 of checkpoint file > > [06:55,061] createStreamTask 0_0, partitions: [streams-wc-input1-0] > [06:55,108] checkpointedOffsets: {streams-wc-Counts-changelog-0=2} > [06:55,149] Creating restoration consumer client for stream task #0_0 > [06:55,577] restoreActiveState topic-partition:streams-wc-Counts-changelog-0 > [06:55,639] streams-wc-Counts-changelog-0,restoreConsumer endOffset:3 > [06:55,639] streams-wc-Counts-changelog-0,restoreActiveState seek to 2 > [06:55,639] streams-wc-Counts-changelog-0,restoreActiveState position now: 2 > [06:55,823] streams-wc-Counts-changelog-0,restore state, k:msg5, v:00000001 > [06:55,823] RocksDB restored k:msg5, v:00000001 > [06:55,826] streams-wc-Counts-changelog-0,restore current offset:3 > [06:55,842] createStreamTask 0_1, partitions: [streams-wc-input1-1] > [06:55,846] checkpointedOffsets: {streams-wc-Counts-changelog-1=2} > [06:55,846] Creating restoration consumer client for stream task #0_1 > [06:55,920] restoreActiveState topic-partition:streams-wc-Counts-changelog-1 > [06:56,338] streams-wc-Counts-changelog-1,restoreConsumer endOffset:3 > [06:56,338] streams-wc-Counts-changelog-1,restoreActiveState seek to 2 > [06:56,339] streams-wc-Counts-changelog-1,restoreActiveState position now: 2 > [06:56,345] streams-wc-Counts-changelog-1,restore state, k:msg6, v:00000001 > [06:56,345] RocksDB restored k:msg6, v:00000001 > [06:56,345] streams-wc-Counts-changelog-1,restore current offset:3 > [06:56,345] restoreConsumer assign: [] > > And restart instance2 log, it didn't do restore/replay work, because it's > already in latest position > > [17:01,143] createStreamTask 0_0, partitions: [streams-wc-input1-0] > [17:01,180] checkpointedOffsets: {streams-wc-Counts-changelog-0=3} > [17:01,215] Creating restoration consumer client for stream task #0_0 > [17:01,601] restoreActiveState topic-partition:streams-wc-Counts-changelog-0 > [17:01,638] streams-wc-Counts-changelog-0,restoreConsumer endOffset:3 > [17:01,638] streams-wc-Counts-changelog-0,restoreActiveState seek to 3 > [17:01,638] streams-wc-Counts-changelog-0,restoreActiveState position now: 3 > [17:01,741] streams-wc-Counts-changelog-0,restore current offset:3 > [17:01,751] createStreamTask 0_1, partitions: [streams-wc-input1-1] > [17:01,754] checkpointedOffsets: {streams-wc-Counts-changelog-1=3} > [17:01,755] Creating restoration consumer client for stream task #0_1 > [17:01,821] restoreActiveState topic-partition:streams-wc-Counts-changelog-1 > [17:02,191] streams-wc-Counts-changelog-1,restoreConsumer endOffset:3 > [17:02,192] streams-wc-Counts-changelog-1,restoreActiveState seek to 3 > [17:02,192] streams-wc-Counts-changelog-1,restoreActiveState position now: 3 > [17:02,293] streams-wc-Counts-changelog-1,restore current offset:3 > [17:02,294] restoreConsumer assign: [] > > Tks again.. Early I was confused about checkpoint, state store, standby > task, restore/replay from changelog and so on. > Now I have some conclution : > 1. StreamTask execute topology and write updated result both to local state > store and change log topic > 2. StandbyTask would't execute topology, it only need to keep sync with > changelog-topic, and update restoredOffsets > 3. When rebalance or shutdown down application, close StreamTask or > StandbyTask will generate checkpoint file from ackedOffsets or > restoredOffsets > 4. The checkpoint file for StreamTask and StandbyTask is used for restore > data from changelog topic > But restore process for StreamTask is at register phase on > ProcessorStateManager > the restore process for StandbyTask is running phase on StreamThread. > And both this two case will read checkpoint file when create StreamTask > or StandbyTask > > Ps: I'm wring an chinese book about kafka-internal. So I have to be sure > all my observation has theory support. > > > 2017-06-10 1:23 GMT+08:00 Matthias J. Sax <matth...@confluent.io>: > >> Your observation is completely correct and this is also correct behavior. >> >> Note, that instance1 and instance2 both also do have a local RocksDB >> instance that holds the state. The checkpoint file basically tells >> streams, what prefix of the changelog topic is already in RocksDB. >> >> As Streams loads (non-empty) RocksDB on startup, it only must replay the >> "tail" of the changelog topic (ie, everything after the checkpointed >> offsets) to get it's local RocksDB up to date. >> >> In your example, instance1 checkpoints offsets 2/2 but the changlog >> contains 3 messages per partitions. Thus, if you restart instance1 it >> must replay the two remaining records as RocksDB does contain the first >> 4 messages already. >> >> For instance2, RocksDB contains all 6 messages and thus there is nothing >> to restore. >> >> Does this make sense? >> >> -Matthias >> >> On 6/7/17 7:02 PM, john cheng wrote: >>> I have two app instance, input topic has 2 partitions, each instance >> config >>> one thread and one replicas. >>> also, instance1's state-store is /tmp/kafka-streams, instance2's >>> state-store is /tmp/kafka-streams2. >>> now I do this experiment to study checkpointin kafka streams (0.10.0.0). >>> >>> 1. start instance1, send two msg(msg1 and msg2) to input topic, no >>> checkpoint file >>> 2. stop instance1, checkpoint file in /tmp/kafka-streams, both >> partition's >>> offset equals 1 >>> 3. restart instance1(althrough new UUID mean new instance, but here >>> consider the same instance as before), no checkpoint file again >>> 4. start instance2, send two msg(msg3 and msg4) to input topic, also no >>> checkpoint file >>> 5. stop instance1, checkpoint in /tmp/kafka-streams, both partition's >>> offset equals 2 >>> 6. send two msg(msg5 and msg6) to input topic, now this two msg all go to >>> instance2 >>> 7. stop instance2, checkpoint in /tmp/kafka-streams2, both partition's >>> offset equals 3 >>> >>> after two instance stopped, below is the checkpoint file content >>> >>> $ strings kafka-streams/*/*/.checkpoint --> instance1 >>> streams-wc-Counts-changelog 0 2 >>> streams-wc-Counts-changelog 1 2 >>> $ strings kafka-streams2/*/*/.checkpoint --> instance2 >>> streams-wc-Counts-changelog 0 3 >>> streams-wc-Counts-changelog 1 3 >>> >>> I draw a simple table about the partition and offset of each msg, also >> the >>> event happend. >>> >>> Partition,Offset | Partition,Offset | What Happened >>> msg1 P0,0 >>> msg2 | P1,0 | restart instance1 >>> msg3 P0,1 | after start instance2 >>> msg4 | P1,1 >>> msg5 P0,2 | after stop instance1 >>> msg6 | P1,2 >>> >>> Next, use kafka-consumer-offset-checker.sh to check input-topic, and all >>> six msg(each partition has three msg) were consumed >>> >>> $ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 >> --topic >>> streams-wc-input1 --group streams-wc >>> Group Topic Pid Offset >> logSize >>> Lag Owner >>> streams-wc streams-wc-input1 0 3 3 >>> 0 none >>> streams-wc streams-wc-input1 1 3 3 >>> 0 none >>> >>> Now If we restart instance1 again, As only one instance exist, so standby >>> task will not take effect. >>> That means, instance1 will create two active StreamTask, and both task >> will >>> restoreActiveState from changelog topic. >>> >>> when restore active task, we have to seek to some position, here as >>> instance1's state store: /tmp/kafka-streams checkpoint file >>> has both changelog partition, so restoreConsumer will seek to position 2. >>> >>> And change to another situation, What about restart instance2 not >>> instance1? >>> the restoreConsumer will seek to position 3, because instance2's active >>> task read checkpoint in /tmp/kafka-streams2. >>> >>> The different between this two situation is which position beginning to >>> restore StateStore in StreamTask. >>> >>> Situation One, only restart instance1, beginning position 2 means, P0 >> seek >>> after msg3, P1 seek after msg4 >>> Situation Two, only restart instance12 beginning position 3 means, P0 >> seek >>> after msg5, P1 seek after msg6 >>> >>> from Partition view, msg1,msg3,msg5 go to partition 0. >>> >>> msg P0's offset | Instance1 restart | Instance2 restart >>> msg1,1 0 >>> msg3,1 1 >>> msg5,1 2 | <- seek at pos 2 | >>> | seek at pos >> 3 >>> >>> msg P1's offset | Instance1 restart | Instance2 restart >>> msg2,1 0 >>> msg4,1 1 >>> msg6,1 2 | <- seek at pos 2 | >>> | <-seek at >> pos >>> 3 >>> >>> The restore process is poll records from changelog-topic at specific >>> position. >>> in situation one, restore msg5 and msg6 to StreamTask's state store. msg5 >>> to task1, msg6 to task0 >>> in situation two, restore nothing to StreamTask's state store??? >>> >>> I have some question about the restore process and checkpoint below: >>> 1. Should we seek to beginning, because restore state store must be >>> complete view of previous? >>> 2. The two situation described above, What will happen? >>> >>> Hope someone expain to me, or collect me if I'm understand wrong. Tks >>> before. >>> >> >> >
signature.asc
Description: OpenPGP digital signature