> In your case you also delete if the value is not null and if the value > not-equals "deleteme", right? Ie, you use non-tombstone records as deletes > what is just not allowed/supported.
The "deleteme" String was only for testing, the issue also happens without it, i.e. if there is a "real" tombstone with `value == null` on the input topic. I do use the input topic as a changelog for my global table. tombstones are sent directly to that topic from a kafka streams operation before the actual store. > I cannot explain why all() and get(key) actually give you different result with respect to `key`. If a key is resurrected during a restore, both method should return it. Not sure why `get(key)` returns `null` even if `all()` contains the key... I would rather expect that both return the resurrected key. That's why I think this is different from KAFKA-7663. The **foo.bar.globaltopic** topic currently looks like this |timestamp|key|value| |2022-08-10T14:23:51.768|foo|foo| |2022-08-10T14:23:51.836|foo|foo| |2022-08-10T14:23:52.126|bar|bar| |2022-08-10T14:23:52.398|foo|foo| |2022-08-10T14:23:53.353|bar|bar| |2022-08-10T14:23:53.098|foo|<null>| |2022-08-10T14:23:54.367|bar|bar| After I delete the kafka-streams.state.dir and restart the application, I get store.get("foo") -> null store.get("bar") -> "bar" store.all() -> "foo" and "bar" Hope that explains it better. - Patrick Patrick D’Addona Senior Lead IT Architect Mobile: +49 151 544 22 161 patrick.dadd...@maibornwolff.de Theresienhöhe 13, 80339 München MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany www.maibornwolff.de, Phone +49 89 544 253 000 USt-ID DE 129 299 525, Munich District Court HRB 98058 Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann, Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos. ____________________________________________________________ ________________________________ From: Matthias J. Sax <mj...@apache.org> Sent: Friday, December 9, 2022 01:11 To: dev@kafka.apache.org <dev@kafka.apache.org> Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records > The way I see it, KAFKA-7663 says, "a global store will be exactly the input > topic after restore, regardless of the processor" Not sure what you mean by this? The issue the tickets describe is, that if you don't do a plain `put(key,value)` in your processor, stuff breaks right now. (Note that `delete(key)` and `put(key,null)` is the same). It's a known issue, bad API, and also bad documentation on our side, and I guess you can call it a bug if you wish. However, you can only use tombstones as deletes right now. Thus, what you do "wrong" is > if (record.value() == null == record.value().equals("deleteme")) { > store.delete(record.key()); > } In your case you also delete if the value is not null and if the value not-equals "deleteme", right? Ie, you use non-tombstone records as deletes what is just not allowed/supported. The issue is that during restore only `null` values, ie, actual tombstones are handled as deletes and thus, if you delete a key using a non-tombstone record in your processor, this key can be resurrected during restore. I cannot explain why all() and get(key) actually give you different result with respect to `key`. If a key is resurrected during a restore, both method should return it. Not sure why `get(key)` returns `null` even if `all()` contains the key... I would rather expect that both return the resurrected key. Hope this helps. -Matthias On 12/8/22 12:00 PM, Patrick D’Addona wrote: > Hi, > > I don't think this issue is exactly the same as KAFKA-7663. > > The way I see it, KAFKA-7663 says, "a global store will be exactly the input > topic after restore, regardless of the processor" > My issue here, is that the global store after restore is inconsistent with > the input topic and the store itself. > Because it finds records with key "foo" using **store.all()** that it can not > find via **store.get("foo")**. > The **store.get()** is consistent with my input topic, where the tombstone is > the latest entry for the key "foo", reflecting the **delete("foo")** > operation on the store. > But still, looping over the store returns a record with "foo" as a key and a > non null value. > > If the store acts like a Map, where you can call **get(k)** and **put(k, > v)**, then looping over it should only find entries, that actually exist and > have a value when using **get(k)**. > Restoring something that breaks this connection seems wrong, even if that > restoring ignores the processor and directly writes to the store. > It should remove keys, for which the last entry is a tombstone from the > **all()** iterator, regardless whether the restore process uses a custom > processor as KAFKA-7663 wants, or simply reads the topic as it currently does. > > Kind Regards, > Patrick > > ________________________________ > From: Colt McNealy <c...@littlehorse.io> > Sent: Thursday, December 8, 2022 17:54 > To: patrick.dadd...@maibornwolff.de.invalid > <patrick.dadd...@maibornwolff.de.invalid> > Cc: dev@kafka.apache.org <dev@kafka.apache.org> > Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally restores > previously deleted records > > Hi Patrick, > > Your issue is in fact identical to KAFKA-7663. As per that > issue/bug/discussion, if your processor does anything other than simply > pass-through records, the results of initial processing vs restoration are > different. > > Global State Stores don't have a changelog topic (for example, in the > processor API, Global State Stores are only valid if the builder has > .withLoggingDisabled()). That's because the processor for the global store > runs on each of your N streams instances, and if the processor on each > instance published to the changelog, then each put/delete would be written > N times, which is wasteful. > > The implications to this are that your input topic should be "like" a > changelog: > - Your input topic should NOT have limited retention otherwise you'll lose > old data. > - Your input topic should ideally be compacted if possible > > I agree that the API as it stands is highly confusing—why allow users to > provide a processor if it offers a way to "shoot oneself in one's foot?" > > Changing that API would probably require a KIP. I don't quite have the > bandwidth to propose + implement such a KIP right now, but if you would > like to, feel free! (perhaps in the spring I may have time) > > Your workaround (the init() method) is a good one. Another way to do it > might be to simply have a regular processing step which converts the input > topic into the true "changelog" format before you push it to a global store. > > Cheers, > Colt McNealy > *Founder, LittleHorse.io* > > > On Thu, Dec 8, 2022 at 8:41 AM Patrick D’Addona > <patrick.dadd...@maibornwolff.de.invalid> wrote: > >> Hello, >> >> I have a quarkus application using >> **org.apache.kafka:kafka-streams:3.1.0** and found that >> * when creating a global table using a compacted topic as input >> * entries that have been deleted at some point >> * are then no longer returned when iterating over the store with >> **store.all()** - as expected >> * but after the pod restarts and its kafka streams state directory is >> deleted, after restoring from the topic using >> **org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState** >> * those formerly deleted records are once again returned by that store >> when using **store.all()** - not expected >> * however they return null, using **store.get("foo")** - as expected >> >> This is somewhat similar to >> https://issues.apache.org/jira/browse/KAFKA-7663, in that I would like to >> be able to modify this restore behaviour. >> However it is also different, because I think it is not documented >> anywhere and it is unintuitive (to me) - since it changes how the >> application behaves after restarting it even if the kafka cluster itself >> was not changed - so I think it's more of a bug than missing documentation. >> >> Some more information, the topic is configured like this >> ```java >> cleanup.policy: compact >> compression.type: producer >> delete.retention.ms: 86400000 >> max.compaction.lag.ms: 9223372036854776000 >> min.compaction.lag.ms: 0 >> retention.bytes: -1 >> retention.ms: 86400000 >> ``` >> >> I am adding the global store like so >> ```java >> streamsBuilder.addGlobalStore( >> Stores.timestampedKeyValueStoreBuilder( >> Stores.persistentTimestampedKeyValueStore("foobar"), >> Serdes.String(), >> Serdes.String()), >> "foo.bar.globaltopic", >> Consumed.with(Serdes.String(), Serdes.String()), >> () -> new FooBarUpdateHandler(timeService) >> ); >> ``` >> >> and here is the definition of 'FooBarUpdateHandler' >> ```java >> import java.time.Instant; >> import java.util.ArrayList; >> import java.util.List; >> import org.apache.kafka.streams.processor.api.Processor; >> import org.apache.kafka.streams.processor.api.Record; >> import org.apache.kafka.streams.state.KeyValueIterator; >> import org.apache.kafka.streams.state.TimestampedKeyValueStore; >> import org.apache.kafka.streams.state.ValueAndTimestamp; >> import org.slf4j.Logger; >> import org.slf4j.LoggerFactory; >> >> /** >> * Internal class handling partFamily updates. >> */ >> public class FooBarUpdateHandler implements Processor<String, String, >> Void, Void> { >> >> private static final Logger logger = >> LoggerFactory.getLogger(FooBarUpdateHandler.class); >> private TimestampedKeyValueStore<String, String> store; >> >> @Override >> public void init(final >> org.apache.kafka.streams.processor.api.ProcessorContext<Void, Void> >> context) { >> store = context.getStateStore("foobar"); >> } >> >> @Override >> public void process(final Record<String, String> record) { >> >> // handle tombstones from input topic >> if (record.value() == null == record.value().equals("deleteme")) { >> store.delete(record.key()); >> } else { >> store.put( >> record.key(), >> ValueAndTimestamp.make( >> record.key(), >> Instant.now().toEpochMilli() >> ) >> ); >> } >> >> // this is not relevant >> // it's only to show the issue when restarting and restoring the >> final List<String> existingKeys = new ArrayList<>(); >> try (final KeyValueIterator<String, ValueAndTimestamp<String>> all >> = store.all()) { >> all.forEachRemaining((r) -> { >> existingKeys.add(r.key); >> }); >> } >> logger.info("Got {} records in the store, with keys {}", >> existingKeys.size(), String.join(",", existingKeys)); >> } >> } >> ``` >> >> My workaround is to add this to the 'init' method of the >> 'FooBarUpdateHandler' >> ```java >> try (final KeyValueIterator<String, ValueAndTimestamp<String>> all = >> store.all()) { >> if (all == null) { >> return; >> } >> logger.info("Removing already deleted records from rocksdb >> representing the global store {}", storeName); >> all.forEachRemaining(r -> { >> if (r != null && r.key != null && store.get(r.key) == null) { >> store.delete(r.key); >> } >> }); >> } >> ``` >> Now it is again consistent across restarts. >> >> Kind Regards, >> Patrick >> >> >> Patrick D’Addona >> Senior Lead IT Architect >> >> >> Mobile: +49 151 544 22 161 >> patrick.dadd...@maibornwolff.de >> Theresienhöhe 13, 80339 München >> >> MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany >> www.maibornwolff.de<http://www.maibornwolff.de>, Phone +49 89 544 253 000 >> USt-ID DE 129 299 525, Munich District Court HRB 98058 >> Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann, >> Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos. >> ____________________________________________________________ >> >> >> >