Hello,

I have a quarkus application using **org.apache.kafka:kafka-streams:3.1.0** and 
found that
* when creating a global table using a compacted topic as input
* entries that have been deleted at some point
* are then no longer returned when iterating over the store with 
**store.all()** - as expected
* but after the pod restarts and its kafka streams state directory is deleted, 
after restoring from the topic using 
**org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState**
* those formerly deleted records are once again returned by that store when 
using **store.all()** - not expected
* however they return null, using **store.get("foo")** - as expected

This is somewhat similar to https://issues.apache.org/jira/browse/KAFKA-7663, 
in that I would like to be able to modify this restore behaviour.
However it is also different, because I think it is not documented anywhere and 
it is unintuitive (to me) - since it changes how the application behaves after 
restarting it even if the kafka cluster itself was not changed - so I think 
it's more of a bug than missing documentation.

Some more information, the topic is configured like this
```java
cleanup.policy: compact
compression.type: producer
delete.retention.ms: 86400000
max.compaction.lag.ms: 9223372036854776000
min.compaction.lag.ms: 0
retention.bytes: -1
retention.ms: 86400000
```

I am adding the global store like so
```java
streamsBuilder.addGlobalStore(
        Stores.timestampedKeyValueStoreBuilder(
                Stores.persistentTimestampedKeyValueStore("foobar"),
                Serdes.String(),
                Serdes.String()),
        "foo.bar.globaltopic",
        Consumed.with(Serdes.String(), Serdes.String()),
        () -> new FooBarUpdateHandler(timeService)
);
```

and here is the definition of 'FooBarUpdateHandler'
```java
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Internal class handling partFamily updates.
 */
public class FooBarUpdateHandler implements Processor<String, String, Void, 
Void> {

    private static final Logger logger = 
LoggerFactory.getLogger(FooBarUpdateHandler.class);
    private TimestampedKeyValueStore<String, String> store;

    @Override
    public void init(final 
org.apache.kafka.streams.processor.api.ProcessorContext<Void, Void> context) {
        store = context.getStateStore("foobar");
    }

    @Override
    public void process(final Record<String, String> record) {

        // handle tombstones from input topic
        if (record.value() == null == record.value().equals("deleteme")) {
            store.delete(record.key());
        } else {
            store.put(
                    record.key(),
                    ValueAndTimestamp.make(
                            record.key(),
                            Instant.now().toEpochMilli()
                    )
            );
        }

        // this is not relevant
        // it's only to show the issue when restarting and restoring the
        final List<String> existingKeys = new ArrayList<>();
        try (final KeyValueIterator<String, ValueAndTimestamp<String>> all = 
store.all()) {
            all.forEachRemaining((r) -> {
                existingKeys.add(r.key);
            });
        }
        logger.info("Got {} records in the store, with keys {}", 
existingKeys.size(), String.join(",", existingKeys));
    }
}
```

My workaround is to add this to the 'init' method of the 'FooBarUpdateHandler'
```java
try (final KeyValueIterator<String, ValueAndTimestamp<String>> all = 
store.all()) {
    if (all == null) {
        return;
    }
    logger.info("Removing already deleted records from rocksdb representing the 
global store {}", storeName);
    all.forEachRemaining(r -> {
        if (r != null && r.key != null && store.get(r.key) == null) {
            store.delete(r.key);
        }
    });
}
```
Now it is again consistent across restarts.

Kind Regards,
Patrick


Patrick D’Addona
Senior Lead IT Architect


Mobile: +49 151 544 22 161
patrick.dadd...@maibornwolff.de
Theresienhöhe 13, 80339 München

MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany
www.maibornwolff.de, Phone +49 89 544 253 000
USt-ID DE 129 299 525, Munich District Court HRB 98058
Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann,
Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos.
____________________________________________________________


Reply via email to