Yes. That's correct. -Matthias
On 3/31/17 9:32 PM, Sachin Mittal wrote: > Hi, > Ok so basically what I understand is that there are no global offset > maintained from changelog topic at broker level. > Every local state store maintains the offset under a local checkpoint file. > > And in order to make sure state store rebuilds or builds its state by > reading from changelog topic faster, we need to ensure that change log > topics are compacted efficiently. > > I hope these assumptions are correct. > > Thanks > Sachin > > > On Sat, Apr 1, 2017 at 4:51 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> 1. The whole log will be read. >> >> 2. It will read all the key-value pairs. However, the store will contain >> only the latest record for each key, after state recovery finished. >> >> Both both (1) and (2): note, that changelog topics are compacted, thus, >> it will not read everything since you started your app, as log >> compaction will remove old records that got "overwritten" with newer >> records. >> >> 3. The instance will hit the timeout and you will get an exception. >> >> 4. and 5. It depends. On a clean shutdown, Streams writes a checkpoint >> file and thus knows the state of the store. Therefore, it does not need >> to read the changelog to recover the store. On unclean shutdown (like >> kill -9) the checkpoint file will be missing, and thus Streams will wipe >> out the state and recreate it from scratch (thus, you don't need to call >> .cleanup() manually) >> >> 6. >> https://github.com/apache/kafka/blob/trunk/streams/src/ >> main/java/org/apache/kafka/streams/processor/internals/ >> StoreChangelogReader.java#L99 >> >> called here: >> https://github.com/apache/kafka/blob/trunk/streams/src/ >> main/java/org/apache/kafka/streams/processor/internals/ >> StreamThread.java#L1294 >> >> >> -Matthias >> >> On 3/31/17 11:46 AM, Sachin Mittal wrote: >>> Hi, >>> There are two ways to re start a streams application >>> 1. executing streams.cleanUp() before streams.start() >>> This cleans up the local state store. >>> >>> 2. Just by calling streams.start() >>> >>> What are the differences between two. >>> >>> As I understand in first case it will try to create local state store by >>> replaying the changelog topic. >>> >>> So questions here are >>> 1. Will it try to replay the whole log from earliest or from last >> committed >>> offset? >>> 2. Will it read and fetch all the values from the topic for a given key >> or >>> only the last value for a key when creating a state store for that change >>> log topic? >>> 3. What happens if time to create the state store is greater than >>> max.poll.interval.ms >>> >>> 4. If we don't delete the state store then what happens. Does it again >> try >>> to recreate the same store by reading entire change log topic? Or if it >>> determines somehow from the state store what the latest offset is and >>> updates the state store with values from that offset onwards. >>> >>> 5. In case of unclean shutdown say by kill -9 is it advisable to cleanup >>> local state store before restart or we can just stat it normally. >>> >>> 6. Finally can anyone point me to the code where it creates the state >> store >>> by reading from changelog topic. >>> >>> Thanks >>> Sachin >>> >> >> >
signature.asc
Description: OpenPGP digital signature