Tks Matthias, I add some offset log in ProcessorStateManager and
RocksDBStore.
The behaviour is just like you explaination.

This is restart instance1's log. The two active task really doing replay
work from position 2 of checkpoint file

[06:55,061] createStreamTask 0_0, partitions: [streams-wc-input1-0]
[06:55,108] checkpointedOffsets: {streams-wc-Counts-changelog-0=2}
[06:55,149] Creating restoration consumer client for stream task #0_0
[06:55,577] restoreActiveState topic-partition:streams-wc-Counts-changelog-0
[06:55,639] streams-wc-Counts-changelog-0,restoreConsumer endOffset:3
[06:55,639] streams-wc-Counts-changelog-0,restoreActiveState seek to 2
[06:55,639] streams-wc-Counts-changelog-0,restoreActiveState position now: 2
[06:55,823] streams-wc-Counts-changelog-0,restore state, k:msg5, v:00000001
[06:55,823] RocksDB restored k:msg5, v:00000001
[06:55,826] streams-wc-Counts-changelog-0,restore current offset:3
[06:55,842] createStreamTask 0_1, partitions: [streams-wc-input1-1]
[06:55,846] checkpointedOffsets: {streams-wc-Counts-changelog-1=2}
[06:55,846] Creating restoration consumer client for stream task #0_1
[06:55,920] restoreActiveState topic-partition:streams-wc-Counts-changelog-1
[06:56,338] streams-wc-Counts-changelog-1,restoreConsumer endOffset:3
[06:56,338] streams-wc-Counts-changelog-1,restoreActiveState seek to 2
[06:56,339] streams-wc-Counts-changelog-1,restoreActiveState position now: 2
[06:56,345] streams-wc-Counts-changelog-1,restore state, k:msg6, v:00000001
[06:56,345] RocksDB restored k:msg6, v:00000001
[06:56,345] streams-wc-Counts-changelog-1,restore current offset:3
[06:56,345] restoreConsumer assign: []

And restart instance2 log, it didn't do restore/replay work, because it's
already in latest position

[17:01,143] createStreamTask 0_0, partitions: [streams-wc-input1-0]
[17:01,180] checkpointedOffsets: {streams-wc-Counts-changelog-0=3}
[17:01,215] Creating restoration consumer client for stream task #0_0
[17:01,601] restoreActiveState topic-partition:streams-wc-Counts-changelog-0
[17:01,638] streams-wc-Counts-changelog-0,restoreConsumer endOffset:3
[17:01,638] streams-wc-Counts-changelog-0,restoreActiveState seek to 3
[17:01,638] streams-wc-Counts-changelog-0,restoreActiveState position now: 3
[17:01,741] streams-wc-Counts-changelog-0,restore current offset:3
[17:01,751] createStreamTask 0_1, partitions: [streams-wc-input1-1]
[17:01,754] checkpointedOffsets: {streams-wc-Counts-changelog-1=3}
[17:01,755] Creating restoration consumer client for stream task #0_1
[17:01,821] restoreActiveState topic-partition:streams-wc-Counts-changelog-1
[17:02,191] streams-wc-Counts-changelog-1,restoreConsumer endOffset:3
[17:02,192] streams-wc-Counts-changelog-1,restoreActiveState seek to 3
[17:02,192] streams-wc-Counts-changelog-1,restoreActiveState position now: 3
[17:02,293] streams-wc-Counts-changelog-1,restore current offset:3
[17:02,294] restoreConsumer assign: []

Tks again..  Early I was confused about checkpoint, state store, standby
task, restore/replay from changelog and so on.
Now I have some conclution :
1. StreamTask execute topology and write updated result both to local state
store and change log topic
2. StandbyTask would't execute topology, it only need to keep sync with
changelog-topic, and update restoredOffsets
3. When rebalance or shutdown down application,  close StreamTask or
StandbyTask will generate checkpoint file from ackedOffsets or
restoredOffsets
4. The checkpoint file for StreamTask and StandbyTask is used for restore
data from changelog topic
    But restore process for StreamTask is at register phase on
ProcessorStateManager
    the restore process for StandbyTask is running phase on StreamThread.
   And both this two case will read checkpoint file when create StreamTask
or StandbyTask

Ps: I'm wring an chinese book about kafka-internal. So I have to be sure
all my observation has theory support.


2017-06-10 1:23 GMT+08:00 Matthias J. Sax <matth...@confluent.io>:

> Your observation is completely correct and this is also correct behavior.
>
> Note, that instance1 and instance2 both also do have a local RocksDB
> instance that holds the state. The checkpoint file basically tells
> streams, what prefix of the changelog topic is already in RocksDB.
>
> As Streams loads (non-empty) RocksDB on startup, it only must replay the
> "tail" of the changelog topic (ie, everything after the checkpointed
> offsets) to get it's local RocksDB up to date.
>
> In your example, instance1 checkpoints offsets 2/2 but the changlog
> contains 3 messages per partitions. Thus, if you restart instance1 it
> must replay the two remaining records as RocksDB does contain the first
> 4 messages already.
>
> For instance2, RocksDB contains all 6 messages and thus there is nothing
> to restore.
>
> Does this make sense?
>
> -Matthias
>
> On 6/7/17 7:02 PM, john cheng wrote:
> > I have two app instance, input topic has 2 partitions, each instance
> config
> > one thread and one replicas.
> > also, instance1's state-store is /tmp/kafka-streams, instance2's
> > state-store is /tmp/kafka-streams2.
> > now I do this experiment to study checkpointin kafka streams (0.10.0.0).
> >
> > 1. start instance1, send two msg(msg1 and msg2) to input topic, no
> > checkpoint file
> > 2. stop instance1, checkpoint file in /tmp/kafka-streams, both
> partition's
> > offset equals 1
> > 3. restart instance1(althrough new UUID mean new instance, but here
> > consider the same instance as before), no checkpoint file again
> > 4. start instance2, send two msg(msg3 and msg4) to input topic, also no
> > checkpoint file
> > 5. stop instance1, checkpoint in /tmp/kafka-streams, both partition's
> > offset equals 2
> > 6. send two msg(msg5 and msg6) to input topic, now this two msg all go to
> > instance2
> > 7. stop instance2, checkpoint in /tmp/kafka-streams2, both partition's
> > offset equals 3
> >
> > after two instance stopped, below is the checkpoint file content
> >
> > $ strings kafka-streams/*/*/.checkpoint   --> instance1
> > streams-wc-Counts-changelog 0 2
> > streams-wc-Counts-changelog 1 2
> > $ strings kafka-streams2/*/*/.checkpoint  --> instance2
> > streams-wc-Counts-changelog 0 3
> > streams-wc-Counts-changelog 1 3
> >
> > I draw a simple table about the partition and offset of each msg, also
> the
> > event happend.
> >
> >       Partition,Offset  | Partition,Offset    | What Happened
> > msg1  P0,0
> > msg2                    | P1,0                | restart instance1
> > msg3  P0,1                                    | after start instance2
> > msg4                    | P1,1
> > msg5  P0,2                                    | after stop instance1
> > msg6                    | P1,2
> >
> > Next, use kafka-consumer-offset-checker.sh to check input-topic, and all
> > six msg(each partition has three msg) were consumed
> >
> > $ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181
> --topic
> > streams-wc-input1 --group streams-wc
> > Group           Topic                          Pid Offset
> logSize
> >         Lag             Owner
> > streams-wc      streams-wc-input1              0   3               3
> >         0               none
> > streams-wc      streams-wc-input1              1   3               3
> >         0               none
> >
> > Now If we restart instance1 again, As only one instance exist, so standby
> > task will not take effect.
> > That means, instance1 will create two active StreamTask, and both task
> will
> > restoreActiveState from changelog topic.
> >
> > when restore active task, we have to seek to some position, here as
> > instance1's state store: /tmp/kafka-streams checkpoint file
> > has both changelog partition, so restoreConsumer will seek to position 2.
> >
> > And change to another situation, What about restart instance2 not
> > instance1?
> > the restoreConsumer will seek to position 3, because instance2's active
> > task read checkpoint in /tmp/kafka-streams2.
> >
> > The different between this two situation is which position beginning to
> > restore StateStore in StreamTask.
> >
> > Situation One, only restart instance1, beginning position 2 means, P0
> seek
> > after msg3, P1 seek after msg4
> > Situation Two, only restart instance12 beginning position 3 means, P0
> seek
> > after msg5, P1 seek after msg6
> >
> > from Partition view, msg1,msg3,msg5 go to partition 0.
> >
> > msg     P0's offset  | Instance1 restart  | Instance2 restart
> > msg1,1  0
> > msg3,1  1
> > msg5,1  2              | <- seek at pos 2   |
> >                                                            | seek at pos
> 3
> >
> > msg     P1's offset  | Instance1 restart  | Instance2 restart
> > msg2,1  0
> > msg4,1  1
> > msg6,1  2              | <- seek at pos 2   |
> >                                                            | <-seek at
> pos
> > 3
> >
> > The restore process is poll records from changelog-topic at specific
> > position.
> > in situation one, restore msg5 and msg6 to StreamTask's state store. msg5
> > to task1, msg6 to task0
> > in situation two, restore nothing to StreamTask's state store???
> >
> > I have some question about the restore process and checkpoint below:
> > 1. Should we seek to beginning, because restore state store must be
> > complete view of previous?
> > 2. The two situation described above, What will happen?
> >
> > Hope someone expain to me, or collect me if I'm understand wrong. Tks
> > before.
> >
>
>

Reply via email to