Hi all,

Our team think we discovered a bug over the weekend withing the Kafka Streams / 
Processor API. We are running 2.7.0.

When configuring a state store backed by a changelog topic with the cleanup 
policy configuration set to “compact,delete”:

final StoreBuilder<KeyValueStore<k,v>> store = Stores
  .keyValueStoreBuilder(
    Stores.persistentKeyValueStore(STORE_ID),
    kSerde,
    vSerde)
  .withLoggingEnabled(Map.of(
    RETENTION_MS_CONFIG, "90000000"),
    CLEANUP_POLICY_CONFIG, "compact,delete"))
  .withCachingEnabled();

Here is how we reproduced the problem:

  1.  Records are written to the state store, and subsequently produced to the 
changelog topic.
  2.  Store streams application
  3.  Delete state.dir directory
  4.  Restart streams application
  5.  Confirm state store is initialized empty with no records restored from 
changelog

We see this problem with both in-memory and RocksDB backed state stores. For 
persistent state store, if the streams application is restarted without the 
state dir being deleted, the application still does not “restore” from the 
changelog, but records are still seen in the state store.

When rolling back to 2.6, we do not see this issue.

Doing some debugging in the source code, in the StoreChangelogReader class I 
found that the number of records to restore is always 0 based on the below 
snippet:


private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
    final ProcessorStateManager stateManager = changelogMetadata.stateManager;
    final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
    final TopicPartition partition = storeMetadata.changelogPartition();
    final String storeName = storeMetadata.store().name();
    final int numRecords = changelogMetadata.bufferedLimitIndex;

Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.

My question to you all is, 1) Is this expected behavior? 2) If not, is it a bug?

Hope to get some clarity, and thanks in advance!

Best,
Upesh

Upesh Desai
Senior Software Developer
ude...@itrsgroup.com
www.itrsgroup.com
Internet communications are not secure and therefore the ITRS Group does not 
accept legal responsibility for the contents of this message. Any view or 
opinions presented are solely those of the author and do not necessarily 
represent those of the ITRS Group unless otherwise specifically stated.
[itrs.email.signature]

Disclaimer

The information contained in this communication from the sender is 
confidential. It is intended solely for use by the recipient and others 
authorized to receive it. If you are not the recipient, you are hereby notified 
that any disclosure, copying, distribution or taking action in relation of the 
contents of this information is strictly prohibited and may be unlawful.

This email has been scanned for viruses and malware, and may have been 
automatically archived by Mimecast Ltd, an innovator in Software as a Service 
(SaaS) for business. Providing a safer and more useful place for your human 
generated data. Specializing in; Security, archiving and compliance. To find 
out more visit the Mimecast website.

Reply via email to