That's indeed weird. Have you tried to run Kafka brokers with 2.6 while Kafka Streams client with 2.7?
On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai <ude...@itrsgroup.com> wrote: > Hello Guozhang, > > > > I have tried your suggestions with an inMemoryStore FYI and seen the > following: > > > > 1. I have the record added to the state store, stopped the > application, and check the earliest and latest offsets via the command line > tools. This shows that the earliest offset is 1, and the latest offset is > also 1. Does this mean that the record has been marked for deletion > already? My retention.ms config is set to 3 days (259200000 ms), so it > should not be marked for deletion if added a couple minutes prior? > 2. Following the above, this makes sense as well. When logging the > starting offset, it is not 0, but rather 1: > > *topic: streamapp-teststore-changelog, partition: 4, start offset: 1, > end offset: 1* > > > > I also confirmed different behavior when we change the changelog topic > cleanup policy from “*compact,delete”* to just “*compact”*. We DO NOT see > this issue when the changelog is just set to compact. We also confirmed > that this does not happen when we run everything on Kafka version 2.6. > > > > Thanks, > > Upesh > > > Upesh Desai | Senior Software Developer | *ude...@itrsgroup.com* > <ude...@itrsgroup.com> > *www.itrsgroup.com* <https://www.itrsgroup.com/> > <https://www.itrsgroup.com/> > > *From: *Guozhang Wang <wangg...@gmail.com> > *Date: *Thursday, March 25, 2021 at 4:01 PM > *To: *Users <users@kafka.apache.org> > *Cc: *Bart Lilje <bli...@itrsgroup.com> > *Subject: *Re: Kafka Streams Processor API state stores not restored via > changelog topics > > Hello Upesh, > > Could you confirm a few more things for me: > > 1. After you stopped the application, and wiped out the state dir; check if > the corresponding changelog topic has one record indeed at offset 0 --- > this can be done via the admin#listOffsets (get the earliest and latest > offset, which should be 0 and 1 correspondingly). > 2. After you resumed the application, check from which starting position we > are restoring the changelog --- this can be done via implementing the > `stateRestoreListener.onRestoreStart(partition, storeName, startOffset, > restoreEndOffset);`, should be 0 > > If both of them check out fine as expected, then from the code I think > bufferedLimitIndex should be updated to 1. > > > Guozhang > > On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai <ude...@itrsgroup.com> wrote: > > > Hi Guozhang, > > > > > > > > Here are some of the answers to your questions I see during my testing: > > > > > > > > 1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in my > > test 1 record had been added to the store. However the numRecords > variable > > is still set to 0 > > 2. For that particular test, `hasRestoredToEnd()` indeed returns true > > as well. But it is confusing since the store is actually empty / that > > record I added does not exist in the store when trying to check for > it. > > 3. N/A > > > > > > > > A little more information, the records we add to this store/changelog are > > of type <CustomKey,byte[]> where the value is always set to an empty byte > > array `new byte[0]`. A couple other variations I have tried are setting > to > > a non-empty static byte array such as `new byte[1]` or `new byte[]{1}`. > > > > > > > > Hope this gives a little more clarity and hope to hear from you soon. > > > > > > > > Thanks, > > > > Upesh > > > > > > Upesh Desai | Senior Software Developer | *ude...@itrsgroup.com* > > <ude...@itrsgroup.com> > > *www.itrsgroup.com* <https://www.itrsgroup.com/> > > <https://www.itrsgroup.com/> > > > > *From: *Guozhang Wang <wangg...@gmail.com> > > *Date: *Wednesday, March 24, 2021 at 1:37 PM > > *To: *Users <users@kafka.apache.org> > > *Cc: *Bart Lilje <bli...@itrsgroup.com> > > *Subject: *Re: Kafka Streams Processor API state stores not restored via > > changelog topics > > > > Hello Upesh, > > > > Thanks for the detailed report. I looked through the code and tried to > > reproduce the issue, but so far have not succeeded. I think I may need > some > > further information from you to help my further investigation. > > > > 1) The `bufferedLimitIndex == 0` itself does not necessarily mean there's > > an issue, as long as it could still be bumped later (i.e. it is possible > > that the restore consumer has not fetched data yet). What's key though, > is > > to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it would > > be created with null value, and then been initialized once. > ChangelogReader > > would stop restoring once the current offset has reached beyond this > value > > or if this value itself is 0. > > > > 2) If `restoreEndOffset` is initialized to a non-zero value, then check > if > > the restoration indeed completed without applying any records, this is > > determined as `hasRestoredToEnd()` returning true. > > > > 3) If `restoreEndOffset` is initialized to 0, then we need to check why: > on > > top of my head I can only think of that the consumer's end offset request > > gets the response with 0, indicating the changelog is now empty. > > > > > > Guozhang > > > > > > On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ude...@itrsgroup.com> > wrote: > > > > > Hi all, > > > > > > > > > > > > Our team think we discovered a bug over the weekend withing the Kafka > > > Streams / Processor API. We are running 2.7.0. > > > > > > > > > > > > When configuring a state store backed by a changelog topic with the > > > cleanup policy configuration set to “compact,delete”: > > > > > > > > > > > > final StoreBuilder<KeyValueStore<k,v>> store = Stores > > > .*keyValueStoreBuilder*( > > > Stores.*persistentKeyValueStore*(*STORE_ID*), > > > kSerde, > > > vSerde) > > > .withLoggingEnabled(Map.*of*( > > > *RETENTION_MS_CONFIG*, "90000000"), > > > *CLEANUP_POLICY_CONFIG*, "compact,delete")) > > > .withCachingEnabled(); > > > > > > > > > > > > Here is how we reproduced the problem: > > > > > > 1. Records are written to the state store, and subsequently produced > > > to the changelog topic. > > > 2. Store streams application > > > 3. Delete state.dir directory > > > 4. Restart streams application > > > 5. Confirm state store is initialized empty with no records restored > > > from changelog > > > > > > > > > > > > We see this problem with both in-memory and RocksDB backed state > stores. > > > For persistent state store, if the streams application is restarted > > without > > > the state dir being deleted, the application still does not “restore” > > from > > > the changelog, but records are still seen in the state store. > > > > > > > > > > > > When rolling back to 2.6, we do not see this issue. > > > > > > > > > > > > Doing some debugging in the source code, in the StoreChangelogReader > > class > > > I found that the number of records to restore is always 0 based on the > > > below snippet: > > > > > > > > > > > > private void restoreChangelog(final ChangelogMetadata > changelogMetadata) > > { > > > final ProcessorStateManager stateManager = > > changelogMetadata.stateManager; > > > final StateStoreMetadata storeMetadata = > > changelogMetadata.storeMetadata; > > > final TopicPartition partition = > storeMetadata.changelogPartition(); > > > final String storeName = storeMetadata.store().name(); > > > final int numRecords = changelogMetadata.bufferedLimitIndex; > > > > > > > > > > > > Where ‘changelogMetadata.bufferedLimitIndex’ always == 0. > > > > > > > > > > > > My question to you all is, 1) Is this expected behavior? 2) If not, is > it > > > a bug? > > > > > > > > > > > > Hope to get some clarity, and thanks in advance! > > > > > > > > > > > > Best, > > > Upesh > > > <https://www.itrsgroup.com/> > > > Upesh Desai > > > Senior Software Developer > > > *ude...@itrsgroup.com* <ude...@itrsgroup.com> > > > *www.itrsgroup.com* <https://www.itrsgroup.com/> > > > Internet communications are not secure and therefore the ITRS Group > does > > > not accept legal responsibility for the contents of this message. Any > > view > > > or opinions presented are solely those of the author and do not > > necessarily > > > represent those of the ITRS Group unless otherwise specifically stated. > > > [itrs.email.signature] > > > > > > > > > *Disclaimer* > > > > > > The information contained in this communication from the sender is > > > confidential. It is intended solely for use by the recipient and others > > > authorized to receive it. If you are not the recipient, you are hereby > > > notified that any disclosure, copying, distribution or taking action in > > > relation of the contents of this information is strictly prohibited and > > may > > > be unlawful. > > > > > > This email has been scanned for viruses and malware, and may have been > > > automatically archived by *Mimecast Ltd*, an innovator in Software as a > > > Service (SaaS) for business. Providing a *safer* and *more useful* > place > > > for your human generated data. Specializing in; Security, archiving and > > > compliance. > > > > > > > > > -- > > -- Guozhang > > > > > > *Disclaimer* > > > > The information contained in this communication from the sender is > > confidential. It is intended solely for use by the recipient and others > > authorized to receive it. If you are not the recipient, you are hereby > > notified that any disclosure, copying, distribution or taking action in > > relation of the contents of this information is strictly prohibited and > may > > be unlawful. > > > > This email has been scanned for viruses and malware, and may have been > > automatically archived by *Mimecast Ltd*, an innovator in Software as a > > Service (SaaS) for business. Providing a *safer* and *more useful* place > > for your human generated data. Specializing in; Security, archiving and > > compliance. > > > > > -- > -- Guozhang > > > *Disclaimer* > > The information contained in this communication from the sender is > confidential. It is intended solely for use by the recipient and others > authorized to receive it. If you are not the recipient, you are hereby > notified that any disclosure, copying, distribution or taking action in > relation of the contents of this information is strictly prohibited and may > be unlawful. > > This email has been scanned for viruses and malware, and may have been > automatically archived by *Mimecast Ltd*, an innovator in Software as a > Service (SaaS) for business. Providing a *safer* and *more useful* place > for your human generated data. Specializing in; Security, archiving and > compliance. > -- -- Guozhang