That's indeed weird.

Have you tried to run Kafka brokers with 2.6 while Kafka Streams client
with 2.7?

On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai <ude...@itrsgroup.com> wrote:

> Hello Guozhang,
>
>
>
> I have tried your suggestions with an inMemoryStore FYI and seen the
> following:
>
>
>
>    1. I have the record added to the state store, stopped the
>    application, and check the earliest and latest offsets via the command line
>    tools. This shows that the earliest offset is 1, and the latest offset is
>    also 1. Does this mean that the record has been marked for deletion
>    already? My retention.ms config is set to 3 days (259200000 ms), so it
>    should not be marked for deletion if added a couple minutes prior?
>    2. Following the above, this makes sense as well. When logging the
>    starting offset, it is not 0, but rather 1:
>
>    *topic: streamapp-teststore-changelog, partition: 4, start offset: 1,
>    end offset: 1*
>
>
>
> I also confirmed different behavior when we change the changelog topic
> cleanup policy from “*compact,delete”* to just “*compact”*. We DO NOT see
> this issue when the changelog is just set to compact. We also confirmed
> that this does not happen when we run everything on Kafka version 2.6.
>
>
>
> Thanks,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> <ude...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang <wangg...@gmail.com>
> *Date: *Thursday, March 25, 2021 at 4:01 PM
> *To: *Users <users@kafka.apache.org>
> *Cc: *Bart Lilje <bli...@itrsgroup.com>
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Hello Upesh,
>
> Could you confirm a few more things for me:
>
> 1. After you stopped the application, and wiped out the state dir; check if
> the corresponding changelog topic has one record indeed at offset 0 ---
> this can be done via the admin#listOffsets (get the earliest and latest
> offset, which should be 0 and 1 correspondingly).
> 2. After you resumed the application, check from which starting position we
> are restoring the changelog --- this can be done via implementing the
> `stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
> restoreEndOffset);`, should be 0
>
> If both of them check out fine as expected, then from the code I think
> bufferedLimitIndex should be updated to 1.
>
>
> Guozhang
>
> On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai <ude...@itrsgroup.com> wrote:
>
> > Hi Guozhang,
> >
> >
> >
> > Here are some of the answers to your questions I see during my testing:
> >
> >
> >
> >    1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in my
> >    test 1 record had been added to the store. However the numRecords
> variable
> >    is still set to 0
> >    2. For that particular test, `hasRestoredToEnd()` indeed returns true
> >    as well. But it is confusing since the store is actually empty / that
> >    record I added does not exist in the store when trying to check for
> it.
> >    3. N/A
> >
> >
> >
> > A little more information, the records we add to this store/changelog are
> > of type <CustomKey,byte[]> where the value is always set to an empty byte
> > array `new byte[0]`. A couple other variations I have tried are setting
> to
> > a non-empty static byte array such as `new byte[1]` or `new byte[]{1}`.
> >
> >
> >
> > Hope this gives a little more clarity and hope to hear from you soon.
> >
> >
> >
> > Thanks,
> >
> > Upesh
> >
> >
> > Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> > <ude...@itrsgroup.com>
> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > <https://www.itrsgroup.com/>
> >
> > *From: *Guozhang Wang <wangg...@gmail.com>
> > *Date: *Wednesday, March 24, 2021 at 1:37 PM
> > *To: *Users <users@kafka.apache.org>
> > *Cc: *Bart Lilje <bli...@itrsgroup.com>
> > *Subject: *Re: Kafka Streams Processor API state stores not restored via
> > changelog topics
> >
> > Hello Upesh,
> >
> > Thanks for the detailed report. I looked through the code and tried to
> > reproduce the issue, but so far have not succeeded. I think I may need
> some
> > further information from you to help my further investigation.
> >
> > 1) The `bufferedLimitIndex == 0` itself does not necessarily mean there's
> > an issue, as long as it could still be bumped later (i.e. it is possible
> > that the restore consumer has not fetched data yet). What's key though,
> is
> > to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it would
> > be created with null value, and then been initialized once.
> ChangelogReader
> > would stop restoring once the current offset has reached beyond this
> value
> > or if this value itself is 0.
> >
> > 2) If `restoreEndOffset` is initialized to a non-zero value, then check
> if
> > the restoration indeed completed without applying any records, this is
> > determined as `hasRestoredToEnd()` returning true.
> >
> > 3) If `restoreEndOffset` is initialized to 0, then we need to check why:
> on
> > top of my head I can only think of that the consumer's end offset request
> > gets the response with 0, indicating the changelog is now empty.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ude...@itrsgroup.com>
> wrote:
> >
> > > Hi all,
> > >
> > >
> > >
> > > Our team think we discovered a bug over the weekend withing the Kafka
> > > Streams / Processor API. We are running 2.7.0.
> > >
> > >
> > >
> > > When configuring a state store backed by a changelog topic with the
> > > cleanup policy configuration set to “compact,delete”:
> > >
> > >
> > >
> > > final StoreBuilder<KeyValueStore<k,v>> store = Stores
> > >   .*keyValueStoreBuilder*(
> > >     Stores.*persistentKeyValueStore*(*STORE_ID*),
> > >     kSerde,
> > >     vSerde)
> > >   .withLoggingEnabled(Map.*of*(
> > >     *RETENTION_MS_CONFIG*, "90000000"),
> > >     *CLEANUP_POLICY_CONFIG*, "compact,delete"))
> > >   .withCachingEnabled();
> > >
> > >
> > >
> > > Here is how we reproduced the problem:
> > >
> > >    1. Records are written to the state store, and subsequently produced
> > >    to the changelog topic.
> > >    2. Store streams application
> > >    3. Delete state.dir directory
> > >    4. Restart streams application
> > >    5. Confirm state store is initialized empty with no records restored
> > >    from changelog
> > >
> > >
> > >
> > > We see this problem with both in-memory and RocksDB backed state
> stores.
> > > For persistent state store, if the streams application is restarted
> > without
> > > the state dir being deleted, the application still does not “restore”
> > from
> > > the changelog, but records are still seen in the state store.
> > >
> > >
> > >
> > > When rolling back to 2.6, we do not see this issue.
> > >
> > >
> > >
> > > Doing some debugging in the source code, in the StoreChangelogReader
> > class
> > > I found that the number of records to restore is always 0 based on the
> > > below snippet:
> > >
> > >
> > >
> > > private void restoreChangelog(final ChangelogMetadata
> changelogMetadata)
> > {
> > >     final ProcessorStateManager stateManager =
> > changelogMetadata.stateManager;
> > >     final StateStoreMetadata storeMetadata =
> > changelogMetadata.storeMetadata;
> > >     final TopicPartition partition =
> storeMetadata.changelogPartition();
> > >     final String storeName = storeMetadata.store().name();
> > >     final int numRecords = changelogMetadata.bufferedLimitIndex;
> > >
> > >
> > >
> > > Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
> > >
> > >
> > >
> > > My question to you all is, 1) Is this expected behavior? 2) If not, is
> it
> > > a bug?
> > >
> > >
> > >
> > > Hope to get some clarity, and thanks in advance!
> > >
> > >
> > >
> > > Best,
> > > Upesh
> > > <https://www.itrsgroup.com/>
> > > Upesh Desai​
> > > Senior Software Developer
> > > *ude...@itrsgroup.com* <ude...@itrsgroup.com>
> > > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > > Internet communications are not secure and therefore the ITRS Group
> does
> > > not accept legal responsibility for the contents of this message. Any
> > view
> > > or opinions presented are solely those of the author and do not
> > necessarily
> > > represent those of the ITRS Group unless otherwise specifically stated.
> > > [itrs.email.signature]
> > >
> > >
> > > *Disclaimer*
> > >
> > > The information contained in this communication from the sender is
> > > confidential. It is intended solely for use by the recipient and others
> > > authorized to receive it. If you are not the recipient, you are hereby
> > > notified that any disclosure, copying, distribution or taking action in
> > > relation of the contents of this information is strictly prohibited and
> > may
> > > be unlawful.
> > >
> > > This email has been scanned for viruses and malware, and may have been
> > > automatically archived by *Mimecast Ltd*, an innovator in Software as a
> > > Service (SaaS) for business. Providing a *safer* and *more useful*
> place
> > > for your human generated data. Specializing in; Security, archiving and
> > > compliance.
> > >
> >
> >
> > --
> > -- Guozhang
> >
> >
> > *Disclaimer*
> >
> > The information contained in this communication from the sender is
> > confidential. It is intended solely for use by the recipient and others
> > authorized to receive it. If you are not the recipient, you are hereby
> > notified that any disclosure, copying, distribution or taking action in
> > relation of the contents of this information is strictly prohibited and
> may
> > be unlawful.
> >
> > This email has been scanned for viruses and malware, and may have been
> > automatically archived by *Mimecast Ltd*, an innovator in Software as a
> > Service (SaaS) for business. Providing a *safer* and *more useful* place
> > for your human generated data. Specializing in; Security, archiving and
> > compliance.
> >
>
>
> --
> -- Guozhang
>
>
> *Disclaimer*
>
> The information contained in this communication from the sender is
> confidential. It is intended solely for use by the recipient and others
> authorized to receive it. If you are not the recipient, you are hereby
> notified that any disclosure, copying, distribution or taking action in
> relation of the contents of this information is strictly prohibited and may
> be unlawful.
>
> This email has been scanned for viruses and malware, and may have been
> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> Service (SaaS) for business. Providing a *safer* and *more useful* place
> for your human generated data. Specializing in; Security, archiving and
> compliance.
>


-- 
-- Guozhang

Reply via email to