Patrick—Glad you got it cleared up, and good find re: empty byte[] vs null.
KAFKA-7663 is very interesting. Just yesterday I thought of a use-case where supplying the processor for the Global State Store would be useful—what if you have an in-memory object that computes and caches aggregations of what's in the state store. You can derive that view by querying the state store, but such queries are expensive; it's far better to have them cached in an in-memory POJO. But to keep that POJO up-to-date, you need to be alerted every time an event comes into the state store. I think the best way to implement that (if I were to submit a KIP) would be to: - Deprecate the ability to add a processor - Add an optional "onChange" callback that is called every time a new record is processed. There's lots of details to be ironed out; and furthermore this is a big API change so it would be slow to implement. Colt McNealy *Founder, LittleHorse.io* On Mon, Dec 12, 2022 at 3:30 AM Patrick D’Addona <patrick.dadd...@maibornwolff.de.invalid> wrote: > No it does not encrypt the keys. And it works fine for key like "bar" > where the latest record on the topic is not a tombstone. > > But that got me thinking about how the values are actually written and > read from kafka and I found the issue in my case, it was related to the > serializer not writing actual "null" values onto the topic, but empty > byte[] arrays instead. > The serializer I used looks like this > ```java > public byte[] serialize(final String s, final T data) { > if (data == null) { > return new byte[0]; > } > try { > return objectMapper.writeValueAsBytes(data); > } catch (final IOException e) { > throw new SerializationException(e); > } > } > ``` > > And then during restore in > `org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState` > ```java > for (final ConsumerRecord<byte[], byte[]> record : > records.records(topicPartition)) { > if (record.key() != null) { > restoreRecords.add(recordConverter.convert(record)); > } > } > ``` > using > `org.apache.kafka.streams.state.internals.RecordConverters#RAW_TO_TIMESTAMED_INSTANCE` > that empty array is turned into an 8 byte timestamp > > ```java > it actually checks the value for `null` but not an empty array > final byte[] recordValue = rawValue == null ? null : > ByteBuffer.allocate(8 + rawValue.length) > .putLong(timestamp) > .put(rawValue) > .array(); > ``` > > that is then passed to > `org.apache.kafka.streams.state.internals.RocksDBStore.SingleColumnFamilyAccessor#addToBatch` > and gets "put" instead of "delete" because it's not null > ```java > public void addToBatch(final byte[] key, > final byte[] value, > final WriteBatch batch) throws RocksDBException { > if (value == null) { > batch.delete(columnFamily, key); > } else { > batch.put(columnFamily, key, value); > } > } > ``` > > That's why `store.get("foo")` gives "null" because it actually finds the > empty "byte[]" record and my Deserializer turns it into null. > So Colt McNealy is right, this is exactly the issue from KAFKA-7663, I > just did not see it until I found that the values on the topic which akhq > shows as "null" in the visualization and are treated like null everywhere > in my applicaton are not actually real tombstones to RocksDB and the > restore process. > > So this thread can be closed, since there is nothing in the way kafka > streams behaves here, just another implication of KAFKA-7663 that might > confuse users. > But also has the chance to lead them onto very interesting deep dives into > kafka streams ;-) > > Thanks for all the responses! > Kind regards, > Patrick > > ________________________________ > From: Colt McNealy <c...@littlehorse.io> > Sent: Saturday, December 10, 2022 00:33 > To: dev@kafka.apache.org <dev@kafka.apache.org> > Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally > restores previously deleted records > > Does the KeySerde that you provided to your store encrypt the keys? I've > never done so myself, but I've seen others report similar behavior (the > store iterator shows the correct values but store.get('foo') returns null) > in the Confluent Community slack. Here's a relevant message: > > > "From the behaviour in your code snippet above, I would say that the key > is stored in encrypted form. The deserializer decrypts it correctly (thus > you see the key and value while iterating). But when you requests the key > individually you are passing it in plain text (the serializer might not be > encrypting) and it’s not found in the keystore." > > I can't help too much beyond that; but you may want to look into that > issue. > > Colt McNealy > *Founder, LittleHorse.io* > > > On Thu, Dec 8, 2022 at 11:51 PM Patrick D’Addona > <patrick.dadd...@maibornwolff.de.invalid> wrote: > > > > In your case you also delete if the value is not null and if the value > > not-equals "deleteme", right? Ie, you use non-tombstone records as > deletes > > what is just not allowed/supported. > > > > The "deleteme" String was only for testing, the issue also happens > without > > it, i.e. if there is a "real" tombstone with `value == null` on the input > > topic. > > I do use the input topic as a changelog for my global table. tombstones > > are sent directly to that topic from a kafka streams operation before the > > actual store. > > > > > I cannot explain why all() and get(key) actually give you different > > result with respect to `key`. If a key is resurrected during a restore, > > both method should return it. Not sure why `get(key)` returns `null` > > even if `all()` contains the key... I would rather expect that both > > return the resurrected key. > > > > That's why I think this is different from KAFKA-7663. > > The **foo.bar.globaltopic** topic currently looks like this > > |timestamp|key|value| > > |2022-08-10T14:23:51.768|foo|foo| > > |2022-08-10T14:23:51.836|foo|foo| > > |2022-08-10T14:23:52.126|bar|bar| > > |2022-08-10T14:23:52.398|foo|foo| > > |2022-08-10T14:23:53.353|bar|bar| > > |2022-08-10T14:23:53.098|foo|<null>| > > |2022-08-10T14:23:54.367|bar|bar| > > > > After I delete the kafka-streams.state.dir and restart the application, I > > get > > store.get("foo") -> null > > store.get("bar") -> "bar" > > store.all() -> "foo" and "bar" > > > > Hope that explains it better. > > > > - Patrick > > > > > > > > > > Patrick D’Addona > > Senior Lead IT Architect > > > > > > Mobile: +49 151 544 22 161 > > patrick.dadd...@maibornwolff.de > > Theresienhöhe 13, 80339 München > > > > MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany > > www.maibornwolff.de<http://www.maibornwolff.de>, Phone +49 89 544 253 > 000 > > USt-ID DE 129 299 525, Munich District Court HRB 98058 > > Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann, > > Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos. > > ____________________________________________________________ > > > > ________________________________ > > From: Matthias J. Sax <mj...@apache.org> > > Sent: Friday, December 9, 2022 01:11 > > To: dev@kafka.apache.org <dev@kafka.apache.org> > > Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally > > restores previously deleted records > > > > > The way I see it, KAFKA-7663 says, "a global store will be exactly the > > input topic after restore, regardless of the processor" > > > > Not sure what you mean by this? The issue the tickets describe is, that > > if you don't do a plain `put(key,value)` in your processor, stuff breaks > > right now. (Note that `delete(key)` and `put(key,null)` is the same). > > > > > > It's a known issue, bad API, and also bad documentation on our side, and > > I guess you can call it a bug if you wish. However, you can only use > > tombstones as deletes right now. Thus, what you do "wrong" is > > > > > if (record.value() == null == record.value().equals("deleteme")) { > > > store.delete(record.key()); > > > } > > > > In your case you also delete if the value is not null and if the value > > not-equals "deleteme", right? Ie, you use non-tombstone records as > > deletes what is just not allowed/supported. > > > > The issue is that during restore only `null` values, ie, actual > > tombstones are handled as deletes and thus, if you delete a key using a > > non-tombstone record in your processor, this key can be resurrected > > during restore. > > > > > > I cannot explain why all() and get(key) actually give you different > > result with respect to `key`. If a key is resurrected during a restore, > > both method should return it. Not sure why `get(key)` returns `null` > > even if `all()` contains the key... I would rather expect that both > > return the resurrected key. > > > > Hope this helps. > > > > > > -Matthias > > > > > > On 12/8/22 12:00 PM, Patrick D’Addona wrote: > > > Hi, > > > > > > I don't think this issue is exactly the same as KAFKA-7663. > > > > > > The way I see it, KAFKA-7663 says, "a global store will be exactly the > > input topic after restore, regardless of the processor" > > > My issue here, is that the global store after restore is inconsistent > > with the input topic and the store itself. > > > Because it finds records with key "foo" using **store.all()** that it > > can not find via **store.get("foo")**. > > > The **store.get()** is consistent with my input topic, where the > > tombstone is the latest entry for the key "foo", reflecting the > > **delete("foo")** operation on the store. > > > But still, looping over the store returns a record with "foo" as a key > > and a non null value. > > > > > > If the store acts like a Map, where you can call **get(k)** and > **put(k, > > v)**, then looping over it should only find entries, that actually exist > > and have a value when using **get(k)**. > > > Restoring something that breaks this connection seems wrong, even if > > that restoring ignores the processor and directly writes to the store. > > > It should remove keys, for which the last entry is a tombstone from the > > **all()** iterator, regardless whether the restore process uses a custom > > processor as KAFKA-7663 wants, or simply reads the topic as it currently > > does. > > > > > > Kind Regards, > > > Patrick > > > > > > ________________________________ > > > From: Colt McNealy <c...@littlehorse.io> > > > Sent: Thursday, December 8, 2022 17:54 > > > To: patrick.dadd...@maibornwolff.de.invalid > > <patrick.dadd...@maibornwolff.de.invalid> > > > Cc: dev@kafka.apache.org <dev@kafka.apache.org> > > > Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally > > restores previously deleted records > > > > > > Hi Patrick, > > > > > > Your issue is in fact identical to KAFKA-7663. As per that > > > issue/bug/discussion, if your processor does anything other than simply > > > pass-through records, the results of initial processing vs restoration > > are > > > different. > > > > > > Global State Stores don't have a changelog topic (for example, in the > > > processor API, Global State Stores are only valid if the builder has > > > .withLoggingDisabled()). That's because the processor for the global > > store > > > runs on each of your N streams instances, and if the processor on each > > > instance published to the changelog, then each put/delete would be > > written > > > N times, which is wasteful. > > > > > > The implications to this are that your input topic should be "like" a > > > changelog: > > > - Your input topic should NOT have limited retention otherwise you'll > > lose > > > old data. > > > - Your input topic should ideally be compacted if possible > > > > > > I agree that the API as it stands is highly confusing—why allow users > to > > > provide a processor if it offers a way to "shoot oneself in one's > foot?" > > > > > > Changing that API would probably require a KIP. I don't quite have the > > > bandwidth to propose + implement such a KIP right now, but if you would > > > like to, feel free! (perhaps in the spring I may have time) > > > > > > Your workaround (the init() method) is a good one. Another way to do it > > > might be to simply have a regular processing step which converts the > > input > > > topic into the true "changelog" format before you push it to a global > > store. > > > > > > Cheers, > > > Colt McNealy > > > *Founder, LittleHorse.io* > > > > > > > > > On Thu, Dec 8, 2022 at 8:41 AM Patrick D’Addona > > > <patrick.dadd...@maibornwolff.de.invalid> wrote: > > > > > >> Hello, > > >> > > >> I have a quarkus application using > > >> **org.apache.kafka:kafka-streams:3.1.0** and found that > > >> * when creating a global table using a compacted topic as input > > >> * entries that have been deleted at some point > > >> * are then no longer returned when iterating over the store with > > >> **store.all()** - as expected > > >> * but after the pod restarts and its kafka streams state directory is > > >> deleted, after restoring from the topic using > > >> > > > **org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState** > > >> * those formerly deleted records are once again returned by that store > > >> when using **store.all()** - not expected > > >> * however they return null, using **store.get("foo")** - as expected > > >> > > >> This is somewhat similar to > > >> https://issues.apache.org/jira/browse/KAFKA-7663, in that I would > like > > to > > >> be able to modify this restore behaviour. > > >> However it is also different, because I think it is not documented > > >> anywhere and it is unintuitive (to me) - since it changes how the > > >> application behaves after restarting it even if the kafka cluster > itself > > >> was not changed - so I think it's more of a bug than missing > > documentation. > > >> > > >> Some more information, the topic is configured like this > > >> ```java > > >> cleanup.policy: compact > > >> compression.type: producer > > >> delete.retention.ms: 86400000 > > >> max.compaction.lag.ms: 9223372036854776000 > > >> min.compaction.lag.ms: 0 > > >> retention.bytes: -1 > > >> retention.ms: 86400000 > > >> ``` > > >> > > >> I am adding the global store like so > > >> ```java > > >> streamsBuilder.addGlobalStore( > > >> Stores.timestampedKeyValueStoreBuilder( > > >> Stores.persistentTimestampedKeyValueStore("foobar"), > > >> Serdes.String(), > > >> Serdes.String()), > > >> "foo.bar.globaltopic", > > >> Consumed.with(Serdes.String(), Serdes.String()), > > >> () -> new FooBarUpdateHandler(timeService) > > >> ); > > >> ``` > > >> > > >> and here is the definition of 'FooBarUpdateHandler' > > >> ```java > > >> import java.time.Instant; > > >> import java.util.ArrayList; > > >> import java.util.List; > > >> import org.apache.kafka.streams.processor.api.Processor; > > >> import org.apache.kafka.streams.processor.api.Record; > > >> import org.apache.kafka.streams.state.KeyValueIterator; > > >> import org.apache.kafka.streams.state.TimestampedKeyValueStore; > > >> import org.apache.kafka.streams.state.ValueAndTimestamp; > > >> import org.slf4j.Logger; > > >> import org.slf4j.LoggerFactory; > > >> > > >> /** > > >> * Internal class handling partFamily updates. > > >> */ > > >> public class FooBarUpdateHandler implements Processor<String, String, > > >> Void, Void> { > > >> > > >> private static final Logger logger = > > >> LoggerFactory.getLogger(FooBarUpdateHandler.class); > > >> private TimestampedKeyValueStore<String, String> store; > > >> > > >> @Override > > >> public void init(final > > >> org.apache.kafka.streams.processor.api.ProcessorContext<Void, Void> > > >> context) { > > >> store = context.getStateStore("foobar"); > > >> } > > >> > > >> @Override > > >> public void process(final Record<String, String> record) { > > >> > > >> // handle tombstones from input topic > > >> if (record.value() == null == > > record.value().equals("deleteme")) { > > >> store.delete(record.key()); > > >> } else { > > >> store.put( > > >> record.key(), > > >> ValueAndTimestamp.make( > > >> record.key(), > > >> Instant.now().toEpochMilli() > > >> ) > > >> ); > > >> } > > >> > > >> // this is not relevant > > >> // it's only to show the issue when restarting and restoring > > the > > >> final List<String> existingKeys = new ArrayList<>(); > > >> try (final KeyValueIterator<String, > ValueAndTimestamp<String>> > > all > > >> = store.all()) { > > >> all.forEachRemaining((r) -> { > > >> existingKeys.add(r.key); > > >> }); > > >> } > > >> logger.info("Got {} records in the store, with keys {}", > > >> existingKeys.size(), String.join(",", existingKeys)); > > >> } > > >> } > > >> ``` > > >> > > >> My workaround is to add this to the 'init' method of the > > >> 'FooBarUpdateHandler' > > >> ```java > > >> try (final KeyValueIterator<String, ValueAndTimestamp<String>> all = > > >> store.all()) { > > >> if (all == null) { > > >> return; > > >> } > > >> logger.info("Removing already deleted records from rocksdb > > >> representing the global store {}", storeName); > > >> all.forEachRemaining(r -> { > > >> if (r != null && r.key != null && store.get(r.key) == null) { > > >> store.delete(r.key); > > >> } > > >> }); > > >> } > > >> ``` > > >> Now it is again consistent across restarts. > > >> > > >> Kind Regards, > > >> Patrick > > >> > > >> > > >> Patrick D’Addona > > >> Senior Lead IT Architect > > >> > > >> > > >> Mobile: +49 151 544 22 161 > > >> patrick.dadd...@maibornwolff.de > > >> Theresienhöhe 13, 80339 München > > >> > > >> MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany > > >> www.maibornwolff.de<http://www.maibornwolff.de>, Phone +49 89 544 253 > > 000 > > >> USt-ID DE 129 299 525, Munich District Court HRB 98058 > > >> Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann, > > >> Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos. > > >> ____________________________________________________________ > > >> > > >> > > >> > > > > > > > > > >