Patrick—Glad you got it cleared up, and good find re: empty byte[] vs null.

KAFKA-7663 is very interesting. Just yesterday I thought of a use-case
where supplying the processor for the Global State Store would be
useful—what if you have an in-memory object that computes and caches
aggregations of what's in the state store. You can derive that view by
querying the state store, but such queries are expensive; it's far better
to have them cached in an in-memory POJO. But to keep that POJO up-to-date,
you need to be alerted every time an event comes into the state store.

I think the best way to implement that (if I were to submit a KIP) would be
to:

   - Deprecate the ability to add a processor
   - Add an optional "onChange" callback that is called every time a new
   record is processed.

There's lots of details to be ironed out; and furthermore this is a big API
change so it would be slow to implement.

Colt McNealy
*Founder, LittleHorse.io*


On Mon, Dec 12, 2022 at 3:30 AM Patrick D’Addona
<patrick.dadd...@maibornwolff.de.invalid> wrote:

> No it does not encrypt the keys. And it works fine for key like "bar"
> where the latest record on the topic is not a tombstone.
>
> But that got me thinking about how the values are actually written and
> read from kafka and I found the issue in my case, it was related to the
> serializer not writing actual "null" values onto the topic, but empty
> byte[] arrays instead.
> The serializer I used looks like this
> ```java
> public byte[] serialize(final String s, final T data) {
>     if (data == null) {
>         return new byte[0];
>     }
>     try {
>         return objectMapper.writeValueAsBytes(data);
>     } catch (final IOException e) {
>         throw new SerializationException(e);
>     }
> }
> ```
>
> And then during restore in
> `org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState`
> ```java
> for (final ConsumerRecord<byte[], byte[]> record :
> records.records(topicPartition)) {
>     if (record.key() != null) {
>         restoreRecords.add(recordConverter.convert(record));
>     }
> }
> ```
> using
> `org.apache.kafka.streams.state.internals.RecordConverters#RAW_TO_TIMESTAMED_INSTANCE`
> that empty array is turned into an 8 byte timestamp
>
> ```java
> it actually checks the value for `null` but not an empty array
> final byte[] recordValue = rawValue == null ? null :
>     ByteBuffer.allocate(8 + rawValue.length)
>         .putLong(timestamp)
>         .put(rawValue)
>         .array();
> ```
>
> that is then passed to
> `org.apache.kafka.streams.state.internals.RocksDBStore.SingleColumnFamilyAccessor#addToBatch`
> and gets "put" instead of "delete" because it's not null
> ```java
> public void addToBatch(final byte[] key,
>                        final byte[] value,
>                        final WriteBatch batch) throws RocksDBException {
>     if (value == null) {
>         batch.delete(columnFamily, key);
>     } else {
>         batch.put(columnFamily, key, value);
>     }
> }
> ```
>
> That's why `store.get("foo")` gives "null" because it actually finds the
> empty "byte[]" record and my Deserializer turns it into null.
> So Colt McNealy is right, this is exactly the issue from KAFKA-7663, I
> just did not see it until I found that the values on the topic which akhq
> shows as "null" in the visualization and are treated like null everywhere
> in my applicaton are not actually real tombstones to RocksDB and the
> restore process.
>
> So this thread can be closed, since there is nothing in the way kafka
> streams behaves here, just another implication of KAFKA-7663 that might
> confuse users.
> But also has the chance to lead them onto very interesting deep dives into
> kafka streams ;-)
>
> Thanks for all the responses!
> Kind regards,
> Patrick
>
> ________________________________
> From: Colt McNealy <c...@littlehorse.io>
> Sent: Saturday, December 10, 2022 00:33
> To: dev@kafka.apache.org <dev@kafka.apache.org>
> Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally
> restores previously deleted records
>
> Does the KeySerde that you provided to your store encrypt the keys? I've
> never done so myself, but I've seen others report similar behavior (the
> store iterator shows the correct values but store.get('foo') returns null)
> in the Confluent Community slack. Here's a relevant message:
>
> > "From the behaviour in your code snippet above, I would say that the key
> is stored in encrypted form. The deserializer decrypts it correctly (thus
> you see the key and value while iterating). But when you requests the key
> individually you are passing it in plain text (the serializer might not be
> encrypting) and it’s not found in the keystore."
>
> I can't help too much beyond that; but you may want to look into that
> issue.
>
> Colt McNealy
> *Founder, LittleHorse.io*
>
>
> On Thu, Dec 8, 2022 at 11:51 PM Patrick D’Addona
> <patrick.dadd...@maibornwolff.de.invalid> wrote:
>
> > > In your case you also delete if the value is not null and if the value
> > not-equals "deleteme", right? Ie, you use non-tombstone records as
> deletes
> > what is just not allowed/supported.
> >
> > The "deleteme" String was only for testing, the issue also happens
> without
> > it, i.e. if there is a "real" tombstone with `value == null` on the input
> > topic.
> > I do use the input topic as a changelog for my global table. tombstones
> > are sent directly to that topic from a kafka streams operation before the
> > actual store.
> >
> > > I cannot explain why all() and get(key) actually give you different
> > result with respect to `key`. If a key is resurrected during a restore,
> > both method should return it. Not sure why `get(key)` returns `null`
> > even if `all()` contains the key... I would rather expect that both
> > return the resurrected key.
> >
> > That's why I think this is different from KAFKA-7663.
> > The **foo.bar.globaltopic** topic currently looks like this
> > |timestamp|key|value|
> > |2022-08-10T14:23:51.768|foo|foo|
> > |2022-08-10T14:23:51.836|foo|foo|
> > |2022-08-10T14:23:52.126|bar|bar|
> > |2022-08-10T14:23:52.398|foo|foo|
> > |2022-08-10T14:23:53.353|bar|bar|
> > |2022-08-10T14:23:53.098|foo|<null>|
> > |2022-08-10T14:23:54.367|bar|bar|
> >
> > After I delete the kafka-streams.state.dir and restart the application, I
> > get
> > store.get("foo") -> null
> > store.get("bar") -> "bar"
> > store.all() -> "foo" and "bar"
> >
> > Hope that explains it better.
> >
> > - Patrick
> >
> >
> >
> >
> > Patrick D’Addona
> > Senior Lead IT Architect
> >
> >
> > Mobile: +49 151 544 22 161
> > patrick.dadd...@maibornwolff.de
> > Theresienhöhe 13, 80339 München
> >
> > MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany
> > www.maibornwolff.de<http://www.maibornwolff.de>, Phone +49 89 544 253
> 000
> > USt-ID DE 129 299 525, Munich District Court HRB 98058
> > Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann,
> > Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos.
> > ____________________________________________________________
> >
> > ________________________________
> > From: Matthias J. Sax <mj...@apache.org>
> > Sent: Friday, December 9, 2022 01:11
> > To: dev@kafka.apache.org <dev@kafka.apache.org>
> > Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally
> > restores previously deleted records
> >
> > > The way I see it, KAFKA-7663 says, "a global store will be exactly the
> > input topic after restore, regardless of the processor"
> >
> > Not sure what you mean by this? The issue the tickets describe is, that
> > if you don't do a plain `put(key,value)` in your processor, stuff breaks
> > right now. (Note that `delete(key)` and `put(key,null)` is the same).
> >
> >
> > It's a known issue, bad API, and also bad documentation on our side, and
> > I guess you can call it a bug if you wish. However, you can only use
> > tombstones as deletes right now. Thus, what you do "wrong" is
> >
> > > if (record.value() == null == record.value().equals("deleteme")) {
> > >             store.delete(record.key());
> > >         }
> >
> > In your case you also delete if the value is not null and if the value
> > not-equals "deleteme", right? Ie, you use non-tombstone records as
> > deletes what is just not allowed/supported.
> >
> > The issue is that during restore only `null` values, ie, actual
> > tombstones are handled as deletes and thus, if you delete a key using a
> > non-tombstone record in your processor, this key can be resurrected
> > during restore.
> >
> >
> > I cannot explain why all() and get(key) actually give you different
> > result with respect to `key`. If a key is resurrected during a restore,
> > both method should return it. Not sure why `get(key)` returns `null`
> > even if `all()` contains the key... I would rather expect that both
> > return the resurrected key.
> >
> > Hope this helps.
> >
> >
> > -Matthias
> >
> >
> > On 12/8/22 12:00 PM, Patrick D’Addona wrote:
> > > Hi,
> > >
> > > I don't think this issue is exactly the same as KAFKA-7663.
> > >
> > > The way I see it, KAFKA-7663 says, "a global store will be exactly the
> > input topic after restore, regardless of the processor"
> > > My issue here, is that the global store after restore is inconsistent
> > with the input topic and the store itself.
> > > Because it finds records with key "foo" using **store.all()** that it
> > can not find via **store.get("foo")**.
> > > The **store.get()** is consistent with my input topic, where the
> > tombstone is the latest entry for the key "foo", reflecting the
> > **delete("foo")** operation on the store.
> > > But still, looping over the store returns a record with "foo" as a key
> > and a non null value.
> > >
> > > If the store acts like a Map, where you can call **get(k)** and
> **put(k,
> > v)**, then looping over it should only find entries, that actually exist
> > and have a value when using **get(k)**.
> > > Restoring something that breaks this connection seems wrong, even if
> > that restoring ignores the processor and directly writes to the store.
> > > It should remove keys, for which the last entry is a tombstone from the
> > **all()** iterator, regardless whether the restore process uses a custom
> > processor as KAFKA-7663 wants, or simply reads the topic as it currently
> > does.
> > >
> > > Kind Regards,
> > > Patrick
> > >
> > > ________________________________
> > > From: Colt McNealy <c...@littlehorse.io>
> > > Sent: Thursday, December 8, 2022 17:54
> > > To: patrick.dadd...@maibornwolff.de.invalid
> > <patrick.dadd...@maibornwolff.de.invalid>
> > > Cc: dev@kafka.apache.org <dev@kafka.apache.org>
> > > Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally
> > restores previously deleted records
> > >
> > > Hi Patrick,
> > >
> > > Your issue is in fact identical to KAFKA-7663. As per that
> > > issue/bug/discussion, if your processor does anything other than simply
> > > pass-through records, the results of initial processing vs restoration
> > are
> > > different.
> > >
> > > Global State Stores don't have a changelog topic (for example, in the
> > > processor API, Global State Stores are only valid if the builder has
> > > .withLoggingDisabled()). That's because the processor for the global
> > store
> > > runs on each of your N streams instances, and if the processor on each
> > > instance published to the changelog, then each put/delete would be
> > written
> > > N times, which is wasteful.
> > >
> > > The implications to this are that your input topic should be "like" a
> > > changelog:
> > > - Your input topic should NOT have limited retention otherwise you'll
> > lose
> > > old data.
> > > - Your input topic should ideally be compacted if possible
> > >
> > > I agree that the API as it stands is highly confusing—why allow users
> to
> > > provide a processor if it offers a way to "shoot oneself in one's
> foot?"
> > >
> > > Changing that API would probably require a KIP. I don't quite have the
> > > bandwidth to propose + implement such a KIP right now, but if you would
> > > like to, feel free! (perhaps in the spring I may have time)
> > >
> > > Your workaround (the init() method) is a good one. Another way to do it
> > > might be to simply have a regular processing step which converts the
> > input
> > > topic into the true "changelog" format before you push it to a global
> > store.
> > >
> > > Cheers,
> > > Colt McNealy
> > > *Founder, LittleHorse.io*
> > >
> > >
> > > On Thu, Dec 8, 2022 at 8:41 AM Patrick D’Addona
> > > <patrick.dadd...@maibornwolff.de.invalid> wrote:
> > >
> > >> Hello,
> > >>
> > >> I have a quarkus application using
> > >> **org.apache.kafka:kafka-streams:3.1.0** and found that
> > >> * when creating a global table using a compacted topic as input
> > >> * entries that have been deleted at some point
> > >> * are then no longer returned when iterating over the store with
> > >> **store.all()** - as expected
> > >> * but after the pod restarts and its kafka streams state directory is
> > >> deleted, after restoring from the topic using
> > >>
> >
> **org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState**
> > >> * those formerly deleted records are once again returned by that store
> > >> when using **store.all()** - not expected
> > >> * however they return null, using **store.get("foo")** - as expected
> > >>
> > >> This is somewhat similar to
> > >> https://issues.apache.org/jira/browse/KAFKA-7663, in that I would
> like
> > to
> > >> be able to modify this restore behaviour.
> > >> However it is also different, because I think it is not documented
> > >> anywhere and it is unintuitive (to me) - since it changes how the
> > >> application behaves after restarting it even if the kafka cluster
> itself
> > >> was not changed - so I think it's more of a bug than missing
> > documentation.
> > >>
> > >> Some more information, the topic is configured like this
> > >> ```java
> > >> cleanup.policy: compact
> > >> compression.type: producer
> > >> delete.retention.ms: 86400000
> > >> max.compaction.lag.ms: 9223372036854776000
> > >> min.compaction.lag.ms: 0
> > >> retention.bytes: -1
> > >> retention.ms: 86400000
> > >> ```
> > >>
> > >> I am adding the global store like so
> > >> ```java
> > >> streamsBuilder.addGlobalStore(
> > >>          Stores.timestampedKeyValueStoreBuilder(
> > >>                  Stores.persistentTimestampedKeyValueStore("foobar"),
> > >>                  Serdes.String(),
> > >>                  Serdes.String()),
> > >>          "foo.bar.globaltopic",
> > >>          Consumed.with(Serdes.String(), Serdes.String()),
> > >>          () -> new FooBarUpdateHandler(timeService)
> > >> );
> > >> ```
> > >>
> > >> and here is the definition of 'FooBarUpdateHandler'
> > >> ```java
> > >> import java.time.Instant;
> > >> import java.util.ArrayList;
> > >> import java.util.List;
> > >> import org.apache.kafka.streams.processor.api.Processor;
> > >> import org.apache.kafka.streams.processor.api.Record;
> > >> import org.apache.kafka.streams.state.KeyValueIterator;
> > >> import org.apache.kafka.streams.state.TimestampedKeyValueStore;
> > >> import org.apache.kafka.streams.state.ValueAndTimestamp;
> > >> import org.slf4j.Logger;
> > >> import org.slf4j.LoggerFactory;
> > >>
> > >> /**
> > >>   * Internal class handling partFamily updates.
> > >>   */
> > >> public class FooBarUpdateHandler implements Processor<String, String,
> > >> Void, Void> {
> > >>
> > >>      private static final Logger logger =
> > >> LoggerFactory.getLogger(FooBarUpdateHandler.class);
> > >>      private TimestampedKeyValueStore<String, String> store;
> > >>
> > >>      @Override
> > >>      public void init(final
> > >> org.apache.kafka.streams.processor.api.ProcessorContext<Void, Void>
> > >> context) {
> > >>          store = context.getStateStore("foobar");
> > >>      }
> > >>
> > >>      @Override
> > >>      public void process(final Record<String, String> record) {
> > >>
> > >>          // handle tombstones from input topic
> > >>          if (record.value() == null ==
> > record.value().equals("deleteme")) {
> > >>              store.delete(record.key());
> > >>          } else {
> > >>              store.put(
> > >>                      record.key(),
> > >>                      ValueAndTimestamp.make(
> > >>                              record.key(),
> > >>                              Instant.now().toEpochMilli()
> > >>                      )
> > >>              );
> > >>          }
> > >>
> > >>          // this is not relevant
> > >>          // it's only to show the issue when restarting and restoring
> > the
> > >>          final List<String> existingKeys = new ArrayList<>();
> > >>          try (final KeyValueIterator<String,
> ValueAndTimestamp<String>>
> > all
> > >> = store.all()) {
> > >>              all.forEachRemaining((r) -> {
> > >>                  existingKeys.add(r.key);
> > >>              });
> > >>          }
> > >>          logger.info("Got {} records in the store, with keys {}",
> > >> existingKeys.size(), String.join(",", existingKeys));
> > >>      }
> > >> }
> > >> ```
> > >>
> > >> My workaround is to add this to the 'init' method of the
> > >> 'FooBarUpdateHandler'
> > >> ```java
> > >> try (final KeyValueIterator<String, ValueAndTimestamp<String>> all =
> > >> store.all()) {
> > >>      if (all == null) {
> > >>          return;
> > >>      }
> > >>      logger.info("Removing already deleted records from rocksdb
> > >> representing the global store {}", storeName);
> > >>      all.forEachRemaining(r -> {
> > >>          if (r != null && r.key != null && store.get(r.key) == null) {
> > >>              store.delete(r.key);
> > >>          }
> > >>      });
> > >> }
> > >> ```
> > >> Now it is again consistent across restarts.
> > >>
> > >> Kind Regards,
> > >> Patrick
> > >>
> > >>
> > >> Patrick D’Addona
> > >> Senior Lead IT Architect
> > >>
> > >>
> > >> Mobile: +49 151 544 22 161
> > >> patrick.dadd...@maibornwolff.de
> > >> Theresienhöhe 13, 80339 München
> > >>
> > >> MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany
> > >> www.maibornwolff.de<http://www.maibornwolff.de>, Phone +49 89 544 253
> > 000
> > >> USt-ID DE 129 299 525, Munich District Court HRB 98058
> > >> Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann,
> > >> Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos.
> > >> ____________________________________________________________
> > >>
> > >>
> > >>
> > >
> >
> >
> >
>

Reply via email to