Great to hear! Always a pleasure. Guozhang
On Tue, Mar 30, 2021 at 8:04 PM Upesh Desai <ude...@itrsgroup.com> wrote: > Hi Guozhang, > > > > We can confirm the behavior with the 2.7.1 release. Appreciate all the > help! > > > > Cheers, > > Upesh > > > Upesh Desai | Senior Software Developer | *ude...@itrsgroup.com* > <ude...@itrsgroup.com> > *www.itrsgroup.com* <https://www.itrsgroup.com/> > <https://www.itrsgroup.com/> > > *From: *Guozhang Wang <wangg...@gmail.com> > *Date: *Tuesday, March 30, 2021 at 2:10 PM > *To: *Users <users@kafka.apache.org> > *Cc: *Bart Lilje <bli...@itrsgroup.com> > *Subject: *Re: Kafka Streams Processor API state stores not restored via > changelog topics > > Great, I think https://issues.apache.org/jira/browse/KAFKA-12323 is indeed > the root cause then. Note that this is only an issue with punctuation > triggered events, where `context.timestamp()` would return 0 (and it is > fixed in the yet-to-release 2.7.1/2.8.0). > > You can consider applying the patch if you could on top of 2.7.0, or wait > for the new release; OR, if your production code does not actually use > punctuation to write records to Kafka, then this issue would not actually > impact you. > > > Guozhang > > On Tue, Mar 30, 2021 at 11:56 AM Upesh Desai <ude...@itrsgroup.com> wrote: > > > Hi Guozhang, > > > > > > > > Great to hear we might have found the issue! > > > > > > > > To answer your question, the changelog record is generated by us calling > > ‘store.put(key,value)’ from the punctuate callback, which makes sense > then > > because the timestamp would be 0 like you saw in your test as well. > > > > > > > > Best, > > > > Upesh > > > > > > Upesh Desai | Senior Software Developer | *ude...@itrsgroup.com* > > <ude...@itrsgroup.com> > > *www.itrsgroup.com* <https://www.itrsgroup.com/> > > <https://www.itrsgroup.com/> > > > > *From: *Guozhang Wang <wangg...@gmail.com> > > *Date: *Tuesday, March 30, 2021 at 1:00 PM > > *To: *Users <users@kafka.apache.org> > > *Cc: *Bart Lilje <bli...@itrsgroup.com> > > *Subject: *Re: Kafka Streams Processor API state stores not restored via > > changelog topics > > > > Hello Upesh, > > > > These are super helpful logs, and I think I'm very close to the root > cause > > of it. You see, the written changelog record's timestamp is set to 0 > > (i.e. January 1st 1970 at midnight GMT), and hence given a reasonable > Kafka > > server start time (presumingly in 21st century), the retention time would > > always be breached, and causing the log deletion mechanism to trigger. > > > > The timestamp is set with `context.timestamp()` which would use the > > processing record's timestamp; but myself have seen and fixed a bug ( > > https://issues.apache.org/jira/browse/KAFKA-12323) where the timestamp > was > > not populated and hence set to 0 if was generated as part of a > punctuation. > > So my next key question is: is this changelog record generated, i.e. its > > put call triggered, from processing an input record, or from a > punctuation > > call? > > > > > > Guozhang > > > > On Mon, Mar 29, 2021 at 2:01 PM Upesh Desai <ude...@itrsgroup.com> > wrote: > > > > > Hi Guozhang, > > > > > > > > > > > > When testing with a 2.6.1 broker and 2.7 streams application, I see the > > > same behavior as described before with the 2.7 broker where just after > a > > > record is written to the changelog topic, the log segment is rolled and > > > deleted citing that the retention time has passed (the record was > written > > > to the state store at ~15:49: > > > > > > > > > > > > [2021-03-29 15:49:13,757] INFO [Log > > > partition=test-stream-store-changelog-4, > > > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Found deletable > > > segments with base offsets [0] due to retention time 259200000ms breach > > > (kafka.log.Log) > > > [2021-03-29 15:49:13,761] INFO [ProducerStateManager > > > partition=test-stream-store-changelog-4] Writing producer snapshot at > > > offset 1 (kafka.log.ProducerStateManager) > > > [2021-03-29 15:49:13,763] INFO [Log > > > partition=test-stream-store-changelog-4, > > > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Rolled new log > > > segment at offset 1 in 5 ms. (kafka.log.Log) > > > [2021-03-29 15:49:13,764] INFO [Log > > > partition=test-stream-store-changelog-4, > > > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Scheduling > segments > > > for deletion LogSegment(baseOffset=0, size=156, > > > lastModifiedTime=1617050940118, largestTime=0) (kafka.log.Log) > > > [2021-03-29 15:49:13,765] INFO [Log > > > partition=test-stream-store-changelog-4, > > > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Incremented log > > start > > > offset to 1 due to segment deletion (kafka.log.Log) > > > > > > > > > > > > Does this have anything to do with the *largetTime=0* mentioned in the > > > log? This was the first and only record written to the store/changelog. > > Is > > > there anything else we can try to resolve this issue or give us more > > > insight into where this issue could originate from? > > > > > > > > > > > > Thanks, > > > Upesh > > > > > > > > > Upesh Desai | Senior Software Developer | *ude...@itrsgroup.com* > > > <ude...@itrsgroup.com> > > > *www.itrsgroup.com* <https://www.itrsgroup.com/> > > > <https://www.itrsgroup.com/> > > > > > > *From: *Upesh Desai <ude...@itrsgroup.com> > > > *Date: *Thursday, March 25, 2021 at 6:46 PM > > > *To: *users@kafka.apache.org <users@kafka.apache.org> > > > *Cc: *Bart Lilje <bli...@itrsgroup.com> > > > *Subject: *Re: Kafka Streams Processor API state stores not restored > via > > > changelog topics > > > > > > We have not tried running 2.6 brokers and 2.7 client, I will try and > get > > > back to you. > > > > > > > > > > > > We are not enabling EOS on the streams, we have it set to > AT_LEAST_ONCE. > > > The shutdowns and restarts of the stream app are clean each time. > > > > > > > > > > > > I see in the broker logs certain lines indicating that the log segment > is > > > being rolled and deleted, but I don’t see how or why this should be > > > happening when the records were just written. See the log line snippets > > > included in the attached file. Initially 8 records are added (offsets > > 0-8), > > > followed by a single record (offset 9). They are rolled and deleted > > almost > > > instantly. > > > > > > > > > > > > Best, > > > > > > Upesh > > > > > > > > > > > > *Upesh Desai*** > > > > > > * | * > > > > > > Senior Software Developer > > > > > > | > > > > > > *ude...@itrsgroup.com <ude...@itrsgroup.com>* > > > > > > *www.itrsgroup.com* <https://www.itrsgroup.com/> > > > > > > <https://www.itrsgroup.com/> > > > > > > *From: *Guozhang Wang <wangg...@gmail.com> > > > *Date: *Thursday, March 25, 2021 at 6:31 PM > > > *To: *Users <users@kafka.apache.org> > > > *Subject: *Re: Kafka Streams Processor API state stores not restored > via > > > changelog topics > > > > > > BTW, yes that indicates the record in the changelog was already > truncated > > > (logically). But since we only physically truncate logs by segments, > > which > > > is 1GB by default, it should still be physically on the log. Are you > > > enabling EOS on Streams, and when you shutdown the streams app, is > that a > > > clean shutdown? > > > > > > On Thu, Mar 25, 2021 at 4:22 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > That's indeed weird. > > > > > > > > Have you tried to run Kafka brokers with 2.6 while Kafka Streams > client > > > > with 2.7? > > > > > > > > On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai <ude...@itrsgroup.com> > > > wrote: > > > > > > > >> Hello Guozhang, > > > >> > > > >> > > > >> > > > >> I have tried your suggestions with an inMemoryStore FYI and seen the > > > >> following: > > > >> > > > >> > > > >> > > > >> 1. I have the record added to the state store, stopped the > > > >> application, and check the earliest and latest offsets via the > > > command line > > > >> tools. This shows that the earliest offset is 1, and the latest > > > offset is > > > >> also 1. Does this mean that the record has been marked for > deletion > > > >> already? My retention.ms config is set to 3 days (259200000 ms), > > so > > > >> it should not be marked for deletion if added a couple minutes > > prior? > > > >> 2. Following the above, this makes sense as well. When logging > the > > > >> starting offset, it is not 0, but rather 1: > > > >> > > > >> *topic: streamapp-teststore-changelog, partition: 4, start > offset: > > 1, > > > >> end offset: 1* > > > >> > > > >> > > > >> > > > >> I also confirmed different behavior when we change the changelog > topic > > > >> cleanup policy from “*compact,delete”* to just “*compact”*. We DO > NOT > > > >> see this issue when the changelog is just set to compact. We also > > > confirmed > > > >> that this does not happen when we run everything on Kafka version > 2.6. > > > >> > > > >> > > > >> > > > >> Thanks, > > > >> > > > >> Upesh > > > >> > > > >> > > > >> Upesh Desai | Senior Software Developer | * > ude...@itrsgroup.com* > > > >> <ude...@itrsgroup.com> > > > >> *www.itrsgroup.com* <https://www.itrsgroup.com/> > > > >> <https://www.itrsgroup.com/> > > > >> > > > >> *From: *Guozhang Wang <wangg...@gmail.com> > > > >> *Date: *Thursday, March 25, 2021 at 4:01 PM > > > >> *To: *Users <users@kafka.apache.org> > > > >> *Cc: *Bart Lilje <bli...@itrsgroup.com> > > > >> *Subject: *Re: Kafka Streams Processor API state stores not restored > > via > > > >> changelog topics > > > >> > > > >> Hello Upesh, > > > >> > > > >> Could you confirm a few more things for me: > > > >> > > > >> 1. After you stopped the application, and wiped out the state dir; > > check > > > >> if > > > >> the corresponding changelog topic has one record indeed at offset 0 > > --- > > > >> this can be done via the admin#listOffsets (get the earliest and > > latest > > > >> offset, which should be 0 and 1 correspondingly). > > > >> 2. After you resumed the application, check from which starting > > position > > > >> we > > > >> are restoring the changelog --- this can be done via implementing > the > > > >> `stateRestoreListener.onRestoreStart(partition, storeName, > > startOffset, > > > >> restoreEndOffset);`, should be 0 > > > >> > > > >> If both of them check out fine as expected, then from the code I > think > > > >> bufferedLimitIndex should be updated to 1. > > > >> > > > >> > > > >> Guozhang > > > >> > > > >> On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai <ude...@itrsgroup.com> > > > wrote: > > > >> > > > >> > Hi Guozhang, > > > >> > > > > >> > > > > >> > > > > >> > Here are some of the answers to your questions I see during my > > > testing: > > > >> > > > > >> > > > > >> > > > > >> > 1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected > as > > in > > > >> my > > > >> > test 1 record had been added to the store. However the > numRecords > > > >> variable > > > >> > is still set to 0 > > > >> > 2. For that particular test, `hasRestoredToEnd()` indeed > returns > > > true > > > >> > as well. But it is confusing since the store is actually empty > / > > > that > > > >> > record I added does not exist in the store when trying to check > > for > > > >> it. > > > >> > 3. N/A > > > >> > > > > >> > > > > >> > > > > >> > A little more information, the records we add to this > > store/changelog > > > >> are > > > >> > of type <CustomKey,byte[]> where the value is always set to an > empty > > > >> byte > > > >> > array `new byte[0]`. A couple other variations I have tried are > > > setting > > > >> to > > > >> > a non-empty static byte array such as `new byte[1]` or `new > > > byte[]{1}`. > > > >> > > > > >> > > > > >> > > > > >> > Hope this gives a little more clarity and hope to hear from you > > soon. > > > >> > > > > >> > > > > >> > > > > >> > Thanks, > > > >> > > > > >> > Upesh > > > >> > > > > >> > > > > >> > Upesh Desai | Senior Software Developer | * > > ude...@itrsgroup.com* > > > >> > <ude...@itrsgroup.com> > > > >> > *www.itrsgroup.com* <https://www.itrsgroup.com/> > > > >> > <https://www.itrsgroup.com/> > > > >> > > > > >> > *From: *Guozhang Wang <wangg...@gmail.com> > > > >> > *Date: *Wednesday, March 24, 2021 at 1:37 PM > > > >> > *To: *Users <users@kafka.apache.org> > > > >> > *Cc: *Bart Lilje <bli...@itrsgroup.com> > > > >> > *Subject: *Re: Kafka Streams Processor API state stores not > restored > > > via > > > >> > changelog topics > > > >> > > > > >> > Hello Upesh, > > > >> > > > > >> > Thanks for the detailed report. I looked through the code and > tried > > to > > > >> > reproduce the issue, but so far have not succeeded. I think I may > > need > > > >> some > > > >> > further information from you to help my further investigation. > > > >> > > > > >> > 1) The `bufferedLimitIndex == 0` itself does not necessarily mean > > > >> there's > > > >> > an issue, as long as it could still be bumped later (i.e. it is > > > possible > > > >> > that the restore consumer has not fetched data yet). What's key > > > though, > > > >> is > > > >> > to check `ChangelogMetadata#restoreEndOffset`: for active tasks, > it > > > >> would > > > >> > be created with null value, and then been initialized once. > > > >> ChangelogReader > > > >> > would stop restoring once the current offset has reached beyond > this > > > >> value > > > >> > or if this value itself is 0. > > > >> > > > > >> > 2) If `restoreEndOffset` is initialized to a non-zero value, then > > > check > > > >> if > > > >> > the restoration indeed completed without applying any records, > this > > is > > > >> > determined as `hasRestoredToEnd()` returning true. > > > >> > > > > >> > 3) If `restoreEndOffset` is initialized to 0, then we need to > check > > > >> why: on > > > >> > top of my head I can only think of that the consumer's end offset > > > >> request > > > >> > gets the response with 0, indicating the changelog is now empty. > > > >> > > > > >> > > > > >> > Guozhang > > > >> > > > > >> > > > > >> > On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ude...@itrsgroup.com > > > > > >> wrote: > > > >> > > > > >> > > Hi all, > > > >> > > > > > >> > > > > > >> > > > > > >> > > Our team think we discovered a bug over the weekend withing the > > > Kafka > > > >> > > Streams / Processor API. We are running 2.7.0. > > > >> > > > > > >> > > > > > >> > > > > > >> > > When configuring a state store backed by a changelog topic with > > the > > > >> > > cleanup policy configuration set to “compact,delete”: > > > >> > > > > > >> > > > > > >> > > > > > >> > > final StoreBuilder<KeyValueStore<k,v>> store = Stores > > > >> > > .*keyValueStoreBuilder*( > > > >> > > Stores.*persistentKeyValueStore*(*STORE_ID*), > > > >> > > kSerde, > > > >> > > vSerde) > > > >> > > .withLoggingEnabled(Map.*of*( > > > >> > > *RETENTION_MS_CONFIG*, "90000000"), > > > >> > > *CLEANUP_POLICY_CONFIG*, "compact,delete")) > > > >> > > .withCachingEnabled(); > > > >> > > > > > >> > > > > > >> > > > > > >> > > Here is how we reproduced the problem: > > > >> > > > > > >> > > 1. Records are written to the state store, and subsequently > > > >> produced > > > >> > > to the changelog topic. > > > >> > > 2. Store streams application > > > >> > > 3. Delete state.dir directory > > > >> > > 4. Restart streams application > > > >> > > 5. Confirm state store is initialized empty with no records > > > >> restored > > > >> > > from changelog > > > >> > > > > > >> > > > > > >> > > > > > >> > > We see this problem with both in-memory and RocksDB backed state > > > >> stores. > > > >> > > For persistent state store, if the streams application is > > restarted > > > >> > without > > > >> > > the state dir being deleted, the application still does not > > > “restore” > > > >> > from > > > >> > > the changelog, but records are still seen in the state store. > > > >> > > > > > >> > > > > > >> > > > > > >> > > When rolling back to 2.6, we do not see this issue. > > > >> > > > > > >> > > > > > >> > > > > > >> > > Doing some debugging in the source code, in the > > StoreChangelogReader > > > >> > class > > > >> > > I found that the number of records to restore is always 0 based > on > > > the > > > >> > > below snippet: > > > >> > > > > > >> > > > > > >> > > > > > >> > > private void restoreChangelog(final ChangelogMetadata > > > >> changelogMetadata) > > > >> > { > > > >> > > final ProcessorStateManager stateManager = > > > >> > changelogMetadata.stateManager; > > > >> > > final StateStoreMetadata storeMetadata = > > > >> > changelogMetadata.storeMetadata; > > > >> > > final TopicPartition partition = > > > >> storeMetadata.changelogPartition(); > > > >> > > final String storeName = storeMetadata.store().name(); > > > >> > > final int numRecords = changelogMetadata.bufferedLimitIndex; > > > >> > > > > > >> > > > > > >> > > > > > >> > > Where ‘changelogMetadata.bufferedLimitIndex’ always == 0. > > > >> > > > > > >> > > > > > >> > > > > > >> > > My question to you all is, 1) Is this expected behavior? 2) If > > not, > > > >> is it > > > >> > > a bug? > > > >> > > > > > >> > > > > > >> > > > > > >> > > Hope to get some clarity, and thanks in advance! > > > >> > > > > > >> > > > > > >> > > > > > >> > > Best, > > > >> > > Upesh > > > >> > > <https://www.itrsgroup.com/> > > > >> > > Upesh Desai > > > >> > > Senior Software Developer > > > >> > > *ude...@itrsgroup.com* <ude...@itrsgroup.com> > > > >> > > *www.itrsgroup.com* <https://www.itrsgroup.com/> > > > >> > > Internet communications are not secure and therefore the ITRS > > Group > > > >> does > > > >> > > not accept legal responsibility for the contents of this > message. > > > Any > > > >> > view > > > >> > > or opinions presented are solely those of the author and do not > > > >> > necessarily > > > >> > > represent those of the ITRS Group unless otherwise specifically > > > >> stated. > > > >> > > [itrs.email.signature] > > > >> > > > > > >> > > > > > >> > > *Disclaimer* > > > >> > > > > > >> > > The information contained in this communication from the sender > is > > > >> > > confidential. It is intended solely for use by the recipient and > > > >> others > > > >> > > authorized to receive it. If you are not the recipient, you are > > > hereby > > > >> > > notified that any disclosure, copying, distribution or taking > > action > > > >> in > > > >> > > relation of the contents of this information is strictly > > prohibited > > > >> and > > > >> > may > > > >> > > be unlawful. > > > >> > > > > > >> > > This email has been scanned for viruses and malware, and may > have > > > been > > > >> > > automatically archived by *Mimecast Ltd*, an innovator in > Software > > > as > > > >> a > > > >> > > Service (SaaS) for business. Providing a *safer* and *more > useful* > > > >> place > > > >> > > for your human generated data. Specializing in; Security, > > archiving > > > >> and > > > >> > > compliance. > > > >> > > > > > >> > > > > >> > > > > >> > -- > > > >> > -- Guozhang > > > >> > > > > >> > > > > >> > *Disclaimer* > > > >> > > > > >> > The information contained in this communication from the sender is > > > >> > confidential. It is intended solely for use by the recipient and > > > others > > > >> > authorized to receive it. If you are not the recipient, you are > > hereby > > > >> > notified that any disclosure, copying, distribution or taking > action > > > in > > > >> > relation of the contents of this information is strictly > prohibited > > > and > > > >> may > > > >> > be unlawful. > > > >> > > > > >> > This email has been scanned for viruses and malware, and may have > > been > > > >> > automatically archived by *Mimecast Ltd*, an innovator in Software > > as > > > a > > > >> > Service (SaaS) for business. Providing a *safer* and *more useful* > > > place > > > >> > for your human generated data. Specializing in; Security, > archiving > > > and > > > >> > compliance. > > > >> > > > > >> > > > >> > > > >> -- > > > >> -- Guozhang > > > >> > > > >> > > > >> *Disclaimer* > > > >> > > > >> The information contained in this communication from the sender is > > > >> confidential. It is intended solely for use by the recipient and > > others > > > >> authorized to receive it. If you are not the recipient, you are > hereby > > > >> notified that any disclosure, copying, distribution or taking action > > in > > > >> relation of the contents of this information is strictly prohibited > > and > > > may > > > >> be unlawful. > > > >> > > > >> This email has been scanned for viruses and malware, and may have > been > > > >> automatically archived by *Mimecast Ltd*, an innovator in Software > as > > a > > > >> Service (SaaS) for business. Providing a *safer* and *more useful* > > place > > > >> for your human generated data. Specializing in; Security, archiving > > and > > > >> compliance. > > > >> > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > *Disclaimer* > > > > > > The information contained in this communication from the sender is > > > confidential. It is intended solely for use by the recipient and others > > > authorized to receive it. If you are not the recipient, you are hereby > > > notified that any disclosure, copying, distribution or taking action in > > > relation of the contents of this information is strictly prohibited and > > may > > > be unlawful. > > > > > > This email has been scanned for viruses and malware, and may have been > > > automatically archived by *Mimecast Ltd*, an innovator in Software as a > > > Service (SaaS) for business. Providing a *safer* and *more useful* > place > > > for your human generated data. Specializing in; Security, archiving and > > > compliance. > > > > > > > > > -- > > -- Guozhang > > > > > > *Disclaimer* > > > > The information contained in this communication from the sender is > > confidential. It is intended solely for use by the recipient and others > > authorized to receive it. If you are not the recipient, you are hereby > > notified that any disclosure, copying, distribution or taking action in > > relation of the contents of this information is strictly prohibited and > may > > be unlawful. > > > > This email has been scanned for viruses and malware, and may have been > > automatically archived by *Mimecast Ltd*, an innovator in Software as a > > Service (SaaS) for business. Providing a *safer* and *more useful* place > > for your human generated data. Specializing in; Security, archiving and > > compliance. > > > > > -- > -- Guozhang > > > *Disclaimer* > > The information contained in this communication from the sender is > confidential. It is intended solely for use by the recipient and others > authorized to receive it. If you are not the recipient, you are hereby > notified that any disclosure, copying, distribution or taking action in > relation of the contents of this information is strictly prohibited and may > be unlawful. > > This email has been scanned for viruses and malware, and may have been > automatically archived by *Mimecast Ltd*, an innovator in Software as a > Service (SaaS) for business. Providing a *safer* and *more useful* place > for your human generated data. Specializing in; Security, archiving and > compliance. > -- -- Guozhang