Hello, I have a quarkus application using **org.apache.kafka:kafka-streams:3.1.0** and found that * when creating a global table using a compacted topic as input * entries that have been deleted at some point * are then no longer returned when iterating over the store with **store.all()** - as expected * but after the pod restarts and its kafka streams state directory is deleted, after restoring from the topic using **org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState** * those formerly deleted records are once again returned by that store when using **store.all()** - not expected * however they return null, using **store.get("foo")** - as expected
This is somewhat similar to https://issues.apache.org/jira/browse/KAFKA-7663, in that I would like to be able to modify this restore behaviour. However it is also different, because I think it is not documented anywhere and it is unintuitive (to me) - since it changes how the application behaves after restarting it even if the kafka cluster itself was not changed - so I think it's more of a bug than missing documentation. Some more information, the topic is configured like this ```java cleanup.policy: compact compression.type: producer delete.retention.ms: 86400000 max.compaction.lag.ms: 9223372036854776000 min.compaction.lag.ms: 0 retention.bytes: -1 retention.ms: 86400000 ``` I am adding the global store like so ```java streamsBuilder.addGlobalStore( Stores.timestampedKeyValueStoreBuilder( Stores.persistentTimestampedKeyValueStore("foobar"), Serdes.String(), Serdes.String()), "foo.bar.globaltopic", Consumed.with(Serdes.String(), Serdes.String()), () -> new FooBarUpdateHandler(timeService) ); ``` and here is the definition of 'FooBarUpdateHandler' ```java import java.time.Instant; import java.util.ArrayList; import java.util.List; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Internal class handling partFamily updates. */ public class FooBarUpdateHandler implements Processor<String, String, Void, Void> { private static final Logger logger = LoggerFactory.getLogger(FooBarUpdateHandler.class); private TimestampedKeyValueStore<String, String> store; @Override public void init(final org.apache.kafka.streams.processor.api.ProcessorContext<Void, Void> context) { store = context.getStateStore("foobar"); } @Override public void process(final Record<String, String> record) { // handle tombstones from input topic if (record.value() == null == record.value().equals("deleteme")) { store.delete(record.key()); } else { store.put( record.key(), ValueAndTimestamp.make( record.key(), Instant.now().toEpochMilli() ) ); } // this is not relevant // it's only to show the issue when restarting and restoring the final List<String> existingKeys = new ArrayList<>(); try (final KeyValueIterator<String, ValueAndTimestamp<String>> all = store.all()) { all.forEachRemaining((r) -> { existingKeys.add(r.key); }); } logger.info("Got {} records in the store, with keys {}", existingKeys.size(), String.join(",", existingKeys)); } } ``` My workaround is to add this to the 'init' method of the 'FooBarUpdateHandler' ```java try (final KeyValueIterator<String, ValueAndTimestamp<String>> all = store.all()) { if (all == null) { return; } logger.info("Removing already deleted records from rocksdb representing the global store {}", storeName); all.forEachRemaining(r -> { if (r != null && r.key != null && store.get(r.key) == null) { store.delete(r.key); } }); } ``` Now it is again consistent across restarts. Kind Regards, Patrick Patrick D’Addona Senior Lead IT Architect Mobile: +49 151 544 22 161 patrick.dadd...@maibornwolff.de Theresienhöhe 13, 80339 München MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany www.maibornwolff.de, Phone +49 89 544 253 000 USt-ID DE 129 299 525, Munich District Court HRB 98058 Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann, Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos. ____________________________________________________________