Hello Guozhang,

I have tried your suggestions with an inMemoryStore FYI and seen the following:


  1.  I have the record added to the state store, stopped the application, and 
check the earliest and latest offsets via the command line tools. This shows 
that the earliest offset is 1, and the latest offset is also 1. Does this mean 
that the record has been marked for deletion already? My retention.ms config is 
set to 3 days (259200000 ms), so it should not be marked for deletion if added 
a couple minutes prior?
  2.  Following the above, this makes sense as well. When logging the starting 
offset, it is not 0, but rather 1:

topic: streamapp-teststore-changelog, partition: 4, start offset: 1, end 
offset: 1

I also confirmed different behavior when we change the changelog topic cleanup 
policy from “compact,delete” to just “compact”. We DO NOT see this issue when 
the changelog is just set to compact. We also confirmed that this does not 
happen when we run everything on Kafka version 2.6.

Thanks,
Upesh


Upesh Desai | Senior Software Developer | ude...@itrsgroup.com
www.itrsgroup.com
From: Guozhang Wang <wangg...@gmail.com>
Date: Thursday, March 25, 2021 at 4:01 PM
To: Users <users@kafka.apache.org>
Cc: Bart Lilje <bli...@itrsgroup.com>
Subject: Re: Kafka Streams Processor API state stores not restored via 
changelog topics
Hello Upesh,

Could you confirm a few more things for me:

1. After you stopped the application, and wiped out the state dir; check if
the corresponding changelog topic has one record indeed at offset 0 ---
this can be done via the admin#listOffsets (get the earliest and latest
offset, which should be 0 and 1 correspondingly).
2. After you resumed the application, check from which starting position we
are restoring the changelog --- this can be done via implementing the
`stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
restoreEndOffset);`, should be 0

If both of them check out fine as expected, then from the code I think
bufferedLimitIndex should be updated to 1.


Guozhang

On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai <ude...@itrsgroup.com> wrote:

> Hi Guozhang,
>
>
>
> Here are some of the answers to your questions I see during my testing:
>
>
>
>    1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in my
>    test 1 record had been added to the store. However the numRecords variable
>    is still set to 0
>    2. For that particular test, `hasRestoredToEnd()` indeed returns true
>    as well. But it is confusing since the store is actually empty / that
>    record I added does not exist in the store when trying to check for it.
>    3. N/A
>
>
>
> A little more information, the records we add to this store/changelog are
> of type <CustomKey,byte[]> where the value is always set to an empty byte
> array `new byte[0]`. A couple other variations I have tried are setting to
> a non-empty static byte array such as `new byte[1]` or `new byte[]{1}`.
>
>
>
> Hope this gives a little more clarity and hope to hear from you soon.
>
>
>
> Thanks,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> <ude...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang <wangg...@gmail.com>
> *Date: *Wednesday, March 24, 2021 at 1:37 PM
> *To: *Users <users@kafka.apache.org>
> *Cc: *Bart Lilje <bli...@itrsgroup.com>
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Hello Upesh,
>
> Thanks for the detailed report. I looked through the code and tried to
> reproduce the issue, but so far have not succeeded. I think I may need some
> further information from you to help my further investigation.
>
> 1) The `bufferedLimitIndex == 0` itself does not necessarily mean there's
> an issue, as long as it could still be bumped later (i.e. it is possible
> that the restore consumer has not fetched data yet). What's key though, is
> to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it would
> be created with null value, and then been initialized once. ChangelogReader
> would stop restoring once the current offset has reached beyond this value
> or if this value itself is 0.
>
> 2) If `restoreEndOffset` is initialized to a non-zero value, then check if
> the restoration indeed completed without applying any records, this is
> determined as `hasRestoredToEnd()` returning true.
>
> 3) If `restoreEndOffset` is initialized to 0, then we need to check why: on
> top of my head I can only think of that the consumer's end offset request
> gets the response with 0, indicating the changelog is now empty.
>
>
> Guozhang
>
>
> On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ude...@itrsgroup.com> wrote:
>
> > Hi all,
> >
> >
> >
> > Our team think we discovered a bug over the weekend withing the Kafka
> > Streams / Processor API. We are running 2.7.0.
> >
> >
> >
> > When configuring a state store backed by a changelog topic with the
> > cleanup policy configuration set to “compact,delete”:
> >
> >
> >
> > final StoreBuilder<KeyValueStore<k,v>> store = Stores
> >   .*keyValueStoreBuilder*(
> >     Stores.*persistentKeyValueStore*(*STORE_ID*),
> >     kSerde,
> >     vSerde)
> >   .withLoggingEnabled(Map.*of*(
> >     *RETENTION_MS_CONFIG*, "90000000"),
> >     *CLEANUP_POLICY_CONFIG*, "compact,delete"))
> >   .withCachingEnabled();
> >
> >
> >
> > Here is how we reproduced the problem:
> >
> >    1. Records are written to the state store, and subsequently produced
> >    to the changelog topic.
> >    2. Store streams application
> >    3. Delete state.dir directory
> >    4. Restart streams application
> >    5. Confirm state store is initialized empty with no records restored
> >    from changelog
> >
> >
> >
> > We see this problem with both in-memory and RocksDB backed state stores.
> > For persistent state store, if the streams application is restarted
> without
> > the state dir being deleted, the application still does not “restore”
> from
> > the changelog, but records are still seen in the state store.
> >
> >
> >
> > When rolling back to 2.6, we do not see this issue.
> >
> >
> >
> > Doing some debugging in the source code, in the StoreChangelogReader
> class
> > I found that the number of records to restore is always 0 based on the
> > below snippet:
> >
> >
> >
> > private void restoreChangelog(final ChangelogMetadata changelogMetadata)
> {
> >     final ProcessorStateManager stateManager =
> changelogMetadata.stateManager;
> >     final StateStoreMetadata storeMetadata =
> changelogMetadata.storeMetadata;
> >     final TopicPartition partition = storeMetadata.changelogPartition();
> >     final String storeName = storeMetadata.store().name();
> >     final int numRecords = changelogMetadata.bufferedLimitIndex;
> >
> >
> >
> > Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
> >
> >
> >
> > My question to you all is, 1) Is this expected behavior? 2) If not, is it
> > a bug?
> >
> >
> >
> > Hope to get some clarity, and thanks in advance!
> >
> >
> >
> > Best,
> > Upesh
> > <https://www.itrsgroup.com/>
> > Upesh Desai​
> > Senior Software Developer
> > *ude...@itrsgroup.com* <ude...@itrsgroup.com>
> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > Internet communications are not secure and therefore the ITRS Group does
> > not accept legal responsibility for the contents of this message. Any
> view
> > or opinions presented are solely those of the author and do not
> necessarily
> > represent those of the ITRS Group unless otherwise specifically stated.
> > [itrs.email.signature]
> >
> >
> > *Disclaimer*
> >
> > The information contained in this communication from the sender is
> > confidential. It is intended solely for use by the recipient and others
> > authorized to receive it. If you are not the recipient, you are hereby
> > notified that any disclosure, copying, distribution or taking action in
> > relation of the contents of this information is strictly prohibited and
> may
> > be unlawful.
> >
> > This email has been scanned for viruses and malware, and may have been
> > automatically archived by *Mimecast Ltd*, an innovator in Software as a
> > Service (SaaS) for business. Providing a *safer* and *more useful* place
> > for your human generated data. Specializing in; Security, archiving and
> > compliance.
> >
>
>
> --
> -- Guozhang
>
>
> *Disclaimer*
>
> The information contained in this communication from the sender is
> confidential. It is intended solely for use by the recipient and others
> authorized to receive it. If you are not the recipient, you are hereby
> notified that any disclosure, copying, distribution or taking action in
> relation of the contents of this information is strictly prohibited and may
> be unlawful.
>
> This email has been scanned for viruses and malware, and may have been
> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> Service (SaaS) for business. Providing a *safer* and *more useful* place
> for your human generated data. Specializing in; Security, archiving and
> compliance.
>


--
-- Guozhang

Disclaimer

The information contained in this communication from the sender is 
confidential. It is intended solely for use by the recipient and others 
authorized to receive it. If you are not the recipient, you are hereby notified 
that any disclosure, copying, distribution or taking action in relation of the 
contents of this information is strictly prohibited and may be unlawful.

This email has been scanned for viruses and malware, and may have been 
automatically archived by Mimecast Ltd, an innovator in Software as a Service 
(SaaS) for business. Providing a safer and more useful place for your human 
generated data. Specializing in; Security, archiving and compliance. To find 
out more visit the Mimecast website.

Reply via email to