Hello Upesh,

Thanks for the detailed report. I looked through the code and tried to
reproduce the issue, but so far have not succeeded. I think I may need some
further information from you to help my further investigation.

1) The `bufferedLimitIndex == 0` itself does not necessarily mean there's
an issue, as long as it could still be bumped later (i.e. it is possible
that the restore consumer has not fetched data yet). What's key though, is
to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it would
be created with null value, and then been initialized once. ChangelogReader
would stop restoring once the current offset has reached beyond this value
or if this value itself is 0.

2) If `restoreEndOffset` is initialized to a non-zero value, then check if
the restoration indeed completed without applying any records, this is
determined as `hasRestoredToEnd()` returning true.

3) If `restoreEndOffset` is initialized to 0, then we need to check why: on
top of my head I can only think of that the consumer's end offset request
gets the response with 0, indicating the changelog is now empty.


Guozhang


On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ude...@itrsgroup.com> wrote:

> Hi all,
>
>
>
> Our team think we discovered a bug over the weekend withing the Kafka
> Streams / Processor API. We are running 2.7.0.
>
>
>
> When configuring a state store backed by a changelog topic with the
> cleanup policy configuration set to “compact,delete”:
>
>
>
> final StoreBuilder<KeyValueStore<k,v>> store = Stores
>   .*keyValueStoreBuilder*(
>     Stores.*persistentKeyValueStore*(*STORE_ID*),
>     kSerde,
>     vSerde)
>   .withLoggingEnabled(Map.*of*(
>     *RETENTION_MS_CONFIG*, "90000000"),
>     *CLEANUP_POLICY_CONFIG*, "compact,delete"))
>   .withCachingEnabled();
>
>
>
> Here is how we reproduced the problem:
>
>    1. Records are written to the state store, and subsequently produced
>    to the changelog topic.
>    2. Store streams application
>    3. Delete state.dir directory
>    4. Restart streams application
>    5. Confirm state store is initialized empty with no records restored
>    from changelog
>
>
>
> We see this problem with both in-memory and RocksDB backed state stores.
> For persistent state store, if the streams application is restarted without
> the state dir being deleted, the application still does not “restore” from
> the changelog, but records are still seen in the state store.
>
>
>
> When rolling back to 2.6, we do not see this issue.
>
>
>
> Doing some debugging in the source code, in the StoreChangelogReader class
> I found that the number of records to restore is always 0 based on the
> below snippet:
>
>
>
> private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
>     final ProcessorStateManager stateManager = changelogMetadata.stateManager;
>     final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
>     final TopicPartition partition = storeMetadata.changelogPartition();
>     final String storeName = storeMetadata.store().name();
>     final int numRecords = changelogMetadata.bufferedLimitIndex;
>
>
>
> Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
>
>
>
> My question to you all is, 1) Is this expected behavior? 2) If not, is it
> a bug?
>
>
>
> Hope to get some clarity, and thanks in advance!
>
>
>
> Best,
> Upesh
> <https://www.itrsgroup.com/>
> Upesh Desai​
> Senior Software Developer
> *ude...@itrsgroup.com* <ude...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> Internet communications are not secure and therefore the ITRS Group does
> not accept legal responsibility for the contents of this message. Any view
> or opinions presented are solely those of the author and do not necessarily
> represent those of the ITRS Group unless otherwise specifically stated.
> [itrs.email.signature]
>
>
> *Disclaimer*
>
> The information contained in this communication from the sender is
> confidential. It is intended solely for use by the recipient and others
> authorized to receive it. If you are not the recipient, you are hereby
> notified that any disclosure, copying, distribution or taking action in
> relation of the contents of this information is strictly prohibited and may
> be unlawful.
>
> This email has been scanned for viruses and malware, and may have been
> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> Service (SaaS) for business. Providing a *safer* and *more useful* place
> for your human generated data. Specializing in; Security, archiving and
> compliance.
>


-- 
-- Guozhang

Reply via email to