Does the KeySerde that you provided to your store encrypt the keys? I've
never done so myself, but I've seen others report similar behavior (the
store iterator shows the correct values but store.get('foo') returns null)
in the Confluent Community slack. Here's a relevant message:

> "From the behaviour in your code snippet above, I would say that the key
is stored in encrypted form. The deserializer decrypts it correctly (thus
you see the key and value while iterating). But when you requests the key
individually you are passing it in plain text (the serializer might not be
encrypting) and it’s not found in the keystore."

I can't help too much beyond that; but you may want to look into that issue.

Colt McNealy
*Founder, LittleHorse.io*


On Thu, Dec 8, 2022 at 11:51 PM Patrick D’Addona
<patrick.dadd...@maibornwolff.de.invalid> wrote:

> > In your case you also delete if the value is not null and if the value
> not-equals "deleteme", right? Ie, you use non-tombstone records as deletes
> what is just not allowed/supported.
>
> The "deleteme" String was only for testing, the issue also happens without
> it, i.e. if there is a "real" tombstone with `value == null` on the input
> topic.
> I do use the input topic as a changelog for my global table. tombstones
> are sent directly to that topic from a kafka streams operation before the
> actual store.
>
> > I cannot explain why all() and get(key) actually give you different
> result with respect to `key`. If a key is resurrected during a restore,
> both method should return it. Not sure why `get(key)` returns `null`
> even if `all()` contains the key... I would rather expect that both
> return the resurrected key.
>
> That's why I think this is different from KAFKA-7663.
> The **foo.bar.globaltopic** topic currently looks like this
> |timestamp|key|value|
> |2022-08-10T14:23:51.768|foo|foo|
> |2022-08-10T14:23:51.836|foo|foo|
> |2022-08-10T14:23:52.126|bar|bar|
> |2022-08-10T14:23:52.398|foo|foo|
> |2022-08-10T14:23:53.353|bar|bar|
> |2022-08-10T14:23:53.098|foo|<null>|
> |2022-08-10T14:23:54.367|bar|bar|
>
> After I delete the kafka-streams.state.dir and restart the application, I
> get
> store.get("foo") -> null
> store.get("bar") -> "bar"
> store.all() -> "foo" and "bar"
>
> Hope that explains it better.
>
> - Patrick
>
>
>
>
> Patrick D’Addona
> Senior Lead IT Architect
>
>
> Mobile: +49 151 544 22 161
> patrick.dadd...@maibornwolff.de
> Theresienhöhe 13, 80339 München
>
> MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany
> www.maibornwolff.de, Phone +49 89 544 253 000
> USt-ID DE 129 299 525, Munich District Court HRB 98058
> Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann,
> Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos.
> ____________________________________________________________
>
> ________________________________
> From: Matthias J. Sax <mj...@apache.org>
> Sent: Friday, December 9, 2022 01:11
> To: dev@kafka.apache.org <dev@kafka.apache.org>
> Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally
> restores previously deleted records
>
> > The way I see it, KAFKA-7663 says, "a global store will be exactly the
> input topic after restore, regardless of the processor"
>
> Not sure what you mean by this? The issue the tickets describe is, that
> if you don't do a plain `put(key,value)` in your processor, stuff breaks
> right now. (Note that `delete(key)` and `put(key,null)` is the same).
>
>
> It's a known issue, bad API, and also bad documentation on our side, and
> I guess you can call it a bug if you wish. However, you can only use
> tombstones as deletes right now. Thus, what you do "wrong" is
>
> > if (record.value() == null == record.value().equals("deleteme")) {
> >             store.delete(record.key());
> >         }
>
> In your case you also delete if the value is not null and if the value
> not-equals "deleteme", right? Ie, you use non-tombstone records as
> deletes what is just not allowed/supported.
>
> The issue is that during restore only `null` values, ie, actual
> tombstones are handled as deletes and thus, if you delete a key using a
> non-tombstone record in your processor, this key can be resurrected
> during restore.
>
>
> I cannot explain why all() and get(key) actually give you different
> result with respect to `key`. If a key is resurrected during a restore,
> both method should return it. Not sure why `get(key)` returns `null`
> even if `all()` contains the key... I would rather expect that both
> return the resurrected key.
>
> Hope this helps.
>
>
> -Matthias
>
>
> On 12/8/22 12:00 PM, Patrick D’Addona wrote:
> > Hi,
> >
> > I don't think this issue is exactly the same as KAFKA-7663.
> >
> > The way I see it, KAFKA-7663 says, "a global store will be exactly the
> input topic after restore, regardless of the processor"
> > My issue here, is that the global store after restore is inconsistent
> with the input topic and the store itself.
> > Because it finds records with key "foo" using **store.all()** that it
> can not find via **store.get("foo")**.
> > The **store.get()** is consistent with my input topic, where the
> tombstone is the latest entry for the key "foo", reflecting the
> **delete("foo")** operation on the store.
> > But still, looping over the store returns a record with "foo" as a key
> and a non null value.
> >
> > If the store acts like a Map, where you can call **get(k)** and **put(k,
> v)**, then looping over it should only find entries, that actually exist
> and have a value when using **get(k)**.
> > Restoring something that breaks this connection seems wrong, even if
> that restoring ignores the processor and directly writes to the store.
> > It should remove keys, for which the last entry is a tombstone from the
> **all()** iterator, regardless whether the restore process uses a custom
> processor as KAFKA-7663 wants, or simply reads the topic as it currently
> does.
> >
> > Kind Regards,
> > Patrick
> >
> > ________________________________
> > From: Colt McNealy <c...@littlehorse.io>
> > Sent: Thursday, December 8, 2022 17:54
> > To: patrick.dadd...@maibornwolff.de.invalid
> <patrick.dadd...@maibornwolff.de.invalid>
> > Cc: dev@kafka.apache.org <dev@kafka.apache.org>
> > Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally
> restores previously deleted records
> >
> > Hi Patrick,
> >
> > Your issue is in fact identical to KAFKA-7663. As per that
> > issue/bug/discussion, if your processor does anything other than simply
> > pass-through records, the results of initial processing vs restoration
> are
> > different.
> >
> > Global State Stores don't have a changelog topic (for example, in the
> > processor API, Global State Stores are only valid if the builder has
> > .withLoggingDisabled()). That's because the processor for the global
> store
> > runs on each of your N streams instances, and if the processor on each
> > instance published to the changelog, then each put/delete would be
> written
> > N times, which is wasteful.
> >
> > The implications to this are that your input topic should be "like" a
> > changelog:
> > - Your input topic should NOT have limited retention otherwise you'll
> lose
> > old data.
> > - Your input topic should ideally be compacted if possible
> >
> > I agree that the API as it stands is highly confusing—why allow users to
> > provide a processor if it offers a way to "shoot oneself in one's foot?"
> >
> > Changing that API would probably require a KIP. I don't quite have the
> > bandwidth to propose + implement such a KIP right now, but if you would
> > like to, feel free! (perhaps in the spring I may have time)
> >
> > Your workaround (the init() method) is a good one. Another way to do it
> > might be to simply have a regular processing step which converts the
> input
> > topic into the true "changelog" format before you push it to a global
> store.
> >
> > Cheers,
> > Colt McNealy
> > *Founder, LittleHorse.io*
> >
> >
> > On Thu, Dec 8, 2022 at 8:41 AM Patrick D’Addona
> > <patrick.dadd...@maibornwolff.de.invalid> wrote:
> >
> >> Hello,
> >>
> >> I have a quarkus application using
> >> **org.apache.kafka:kafka-streams:3.1.0** and found that
> >> * when creating a global table using a compacted topic as input
> >> * entries that have been deleted at some point
> >> * are then no longer returned when iterating over the store with
> >> **store.all()** - as expected
> >> * but after the pod restarts and its kafka streams state directory is
> >> deleted, after restoring from the topic using
> >>
> **org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState**
> >> * those formerly deleted records are once again returned by that store
> >> when using **store.all()** - not expected
> >> * however they return null, using **store.get("foo")** - as expected
> >>
> >> This is somewhat similar to
> >> https://issues.apache.org/jira/browse/KAFKA-7663, in that I would like
> to
> >> be able to modify this restore behaviour.
> >> However it is also different, because I think it is not documented
> >> anywhere and it is unintuitive (to me) - since it changes how the
> >> application behaves after restarting it even if the kafka cluster itself
> >> was not changed - so I think it's more of a bug than missing
> documentation.
> >>
> >> Some more information, the topic is configured like this
> >> ```java
> >> cleanup.policy: compact
> >> compression.type: producer
> >> delete.retention.ms: 86400000
> >> max.compaction.lag.ms: 9223372036854776000
> >> min.compaction.lag.ms: 0
> >> retention.bytes: -1
> >> retention.ms: 86400000
> >> ```
> >>
> >> I am adding the global store like so
> >> ```java
> >> streamsBuilder.addGlobalStore(
> >>          Stores.timestampedKeyValueStoreBuilder(
> >>                  Stores.persistentTimestampedKeyValueStore("foobar"),
> >>                  Serdes.String(),
> >>                  Serdes.String()),
> >>          "foo.bar.globaltopic",
> >>          Consumed.with(Serdes.String(), Serdes.String()),
> >>          () -> new FooBarUpdateHandler(timeService)
> >> );
> >> ```
> >>
> >> and here is the definition of 'FooBarUpdateHandler'
> >> ```java
> >> import java.time.Instant;
> >> import java.util.ArrayList;
> >> import java.util.List;
> >> import org.apache.kafka.streams.processor.api.Processor;
> >> import org.apache.kafka.streams.processor.api.Record;
> >> import org.apache.kafka.streams.state.KeyValueIterator;
> >> import org.apache.kafka.streams.state.TimestampedKeyValueStore;
> >> import org.apache.kafka.streams.state.ValueAndTimestamp;
> >> import org.slf4j.Logger;
> >> import org.slf4j.LoggerFactory;
> >>
> >> /**
> >>   * Internal class handling partFamily updates.
> >>   */
> >> public class FooBarUpdateHandler implements Processor<String, String,
> >> Void, Void> {
> >>
> >>      private static final Logger logger =
> >> LoggerFactory.getLogger(FooBarUpdateHandler.class);
> >>      private TimestampedKeyValueStore<String, String> store;
> >>
> >>      @Override
> >>      public void init(final
> >> org.apache.kafka.streams.processor.api.ProcessorContext<Void, Void>
> >> context) {
> >>          store = context.getStateStore("foobar");
> >>      }
> >>
> >>      @Override
> >>      public void process(final Record<String, String> record) {
> >>
> >>          // handle tombstones from input topic
> >>          if (record.value() == null ==
> record.value().equals("deleteme")) {
> >>              store.delete(record.key());
> >>          } else {
> >>              store.put(
> >>                      record.key(),
> >>                      ValueAndTimestamp.make(
> >>                              record.key(),
> >>                              Instant.now().toEpochMilli()
> >>                      )
> >>              );
> >>          }
> >>
> >>          // this is not relevant
> >>          // it's only to show the issue when restarting and restoring
> the
> >>          final List<String> existingKeys = new ArrayList<>();
> >>          try (final KeyValueIterator<String, ValueAndTimestamp<String>>
> all
> >> = store.all()) {
> >>              all.forEachRemaining((r) -> {
> >>                  existingKeys.add(r.key);
> >>              });
> >>          }
> >>          logger.info("Got {} records in the store, with keys {}",
> >> existingKeys.size(), String.join(",", existingKeys));
> >>      }
> >> }
> >> ```
> >>
> >> My workaround is to add this to the 'init' method of the
> >> 'FooBarUpdateHandler'
> >> ```java
> >> try (final KeyValueIterator<String, ValueAndTimestamp<String>> all =
> >> store.all()) {
> >>      if (all == null) {
> >>          return;
> >>      }
> >>      logger.info("Removing already deleted records from rocksdb
> >> representing the global store {}", storeName);
> >>      all.forEachRemaining(r -> {
> >>          if (r != null && r.key != null && store.get(r.key) == null) {
> >>              store.delete(r.key);
> >>          }
> >>      });
> >> }
> >> ```
> >> Now it is again consistent across restarts.
> >>
> >> Kind Regards,
> >> Patrick
> >>
> >>
> >> Patrick D’Addona
> >> Senior Lead IT Architect
> >>
> >>
> >> Mobile: +49 151 544 22 161
> >> patrick.dadd...@maibornwolff.de
> >> Theresienhöhe 13, 80339 München
> >>
> >> MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany
> >> www.maibornwolff.de<http://www.maibornwolff.de>, Phone +49 89 544 253
> 000
> >> USt-ID DE 129 299 525, Munich District Court HRB 98058
> >> Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann,
> >> Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos.
> >> ____________________________________________________________
> >>
> >>
> >>
> >
>
>
>

Reply via email to