> In your case you also delete if the value is not null and if the value 
> not-equals "deleteme", right? Ie, you use non-tombstone records as deletes 
> what is just not allowed/supported.

The "deleteme" String was only for testing, the issue also happens without it, 
i.e. if there is a "real" tombstone with `value == null` on the input topic.
I do use the input topic as a changelog for my global table. tombstones are 
sent directly to that topic from a kafka streams operation before the actual 
store.

> I cannot explain why all() and get(key) actually give you different
result with respect to `key`. If a key is resurrected during a restore,
both method should return it. Not sure why `get(key)` returns `null`
even if `all()` contains the key... I would rather expect that both
return the resurrected key.

That's why I think this is different from KAFKA-7663.
The **foo.bar.globaltopic** topic currently looks like this
|timestamp|key|value|
|2022-08-10T14:23:51.768|foo|foo|
|2022-08-10T14:23:51.836|foo|foo|
|2022-08-10T14:23:52.126|bar|bar|
|2022-08-10T14:23:52.398|foo|foo|
|2022-08-10T14:23:53.353|bar|bar|
|2022-08-10T14:23:53.098|foo|<null>|
|2022-08-10T14:23:54.367|bar|bar|

After I delete the kafka-streams.state.dir and restart the application, I get
store.get("foo") -> null
store.get("bar") -> "bar"
store.all() -> "foo" and "bar"

Hope that explains it better.

- Patrick




Patrick D’Addona
Senior Lead IT Architect


Mobile: +49 151 544 22 161
patrick.dadd...@maibornwolff.de
Theresienhöhe 13, 80339 München

MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany
www.maibornwolff.de, Phone +49 89 544 253 000
USt-ID DE 129 299 525, Munich District Court HRB 98058
Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann,
Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos.
____________________________________________________________

________________________________
From: Matthias J. Sax <mj...@apache.org>
Sent: Friday, December 9, 2022 01:11
To: dev@kafka.apache.org <dev@kafka.apache.org>
Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally restores 
previously deleted records

> The way I see it, KAFKA-7663 says, "a global store will be exactly the input 
> topic after restore, regardless of the processor"

Not sure what you mean by this? The issue the tickets describe is, that
if you don't do a plain `put(key,value)` in your processor, stuff breaks
right now. (Note that `delete(key)` and `put(key,null)` is the same).


It's a known issue, bad API, and also bad documentation on our side, and
I guess you can call it a bug if you wish. However, you can only use
tombstones as deletes right now. Thus, what you do "wrong" is

> if (record.value() == null == record.value().equals("deleteme")) {
>             store.delete(record.key());
>         }

In your case you also delete if the value is not null and if the value
not-equals "deleteme", right? Ie, you use non-tombstone records as
deletes what is just not allowed/supported.

The issue is that during restore only `null` values, ie, actual
tombstones are handled as deletes and thus, if you delete a key using a
non-tombstone record in your processor, this key can be resurrected
during restore.


I cannot explain why all() and get(key) actually give you different
result with respect to `key`. If a key is resurrected during a restore,
both method should return it. Not sure why `get(key)` returns `null`
even if `all()` contains the key... I would rather expect that both
return the resurrected key.

Hope this helps.


-Matthias


On 12/8/22 12:00 PM, Patrick D’Addona wrote:
> Hi,
>
> I don't think this issue is exactly the same as KAFKA-7663.
>
> The way I see it, KAFKA-7663 says, "a global store will be exactly the input 
> topic after restore, regardless of the processor"
> My issue here, is that the global store after restore is inconsistent with 
> the input topic and the store itself.
> Because it finds records with key "foo" using **store.all()** that it can not 
> find via **store.get("foo")**.
> The **store.get()** is consistent with my input topic, where the tombstone is 
> the latest entry for the key "foo", reflecting the **delete("foo")** 
> operation on the store.
> But still, looping over the store returns a record with "foo" as a key and a 
> non null value.
>
> If the store acts like a Map, where you can call **get(k)** and **put(k, 
> v)**, then looping over it should only find entries, that actually exist and 
> have a value when using **get(k)**.
> Restoring something that breaks this connection seems wrong, even if that 
> restoring ignores the processor and directly writes to the store.
> It should remove keys, for which the last entry is a tombstone from the 
> **all()** iterator, regardless whether the restore process uses a custom 
> processor as KAFKA-7663 wants, or simply reads the topic as it currently does.
>
> Kind Regards,
> Patrick
>
> ________________________________
> From: Colt McNealy <c...@littlehorse.io>
> Sent: Thursday, December 8, 2022 17:54
> To: patrick.dadd...@maibornwolff.de.invalid 
> <patrick.dadd...@maibornwolff.de.invalid>
> Cc: dev@kafka.apache.org <dev@kafka.apache.org>
> Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally restores 
> previously deleted records
>
> Hi Patrick,
>
> Your issue is in fact identical to KAFKA-7663. As per that
> issue/bug/discussion, if your processor does anything other than simply
> pass-through records, the results of initial processing vs restoration are
> different.
>
> Global State Stores don't have a changelog topic (for example, in the
> processor API, Global State Stores are only valid if the builder has
> .withLoggingDisabled()). That's because the processor for the global store
> runs on each of your N streams instances, and if the processor on each
> instance published to the changelog, then each put/delete would be written
> N times, which is wasteful.
>
> The implications to this are that your input topic should be "like" a
> changelog:
> - Your input topic should NOT have limited retention otherwise you'll lose
> old data.
> - Your input topic should ideally be compacted if possible
>
> I agree that the API as it stands is highly confusing—why allow users to
> provide a processor if it offers a way to "shoot oneself in one's foot?"
>
> Changing that API would probably require a KIP. I don't quite have the
> bandwidth to propose + implement such a KIP right now, but if you would
> like to, feel free! (perhaps in the spring I may have time)
>
> Your workaround (the init() method) is a good one. Another way to do it
> might be to simply have a regular processing step which converts the input
> topic into the true "changelog" format before you push it to a global store.
>
> Cheers,
> Colt McNealy
> *Founder, LittleHorse.io*
>
>
> On Thu, Dec 8, 2022 at 8:41 AM Patrick D’Addona
> <patrick.dadd...@maibornwolff.de.invalid> wrote:
>
>> Hello,
>>
>> I have a quarkus application using
>> **org.apache.kafka:kafka-streams:3.1.0** and found that
>> * when creating a global table using a compacted topic as input
>> * entries that have been deleted at some point
>> * are then no longer returned when iterating over the store with
>> **store.all()** - as expected
>> * but after the pod restarts and its kafka streams state directory is
>> deleted, after restoring from the topic using
>> **org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState**
>> * those formerly deleted records are once again returned by that store
>> when using **store.all()** - not expected
>> * however they return null, using **store.get("foo")** - as expected
>>
>> This is somewhat similar to
>> https://issues.apache.org/jira/browse/KAFKA-7663, in that I would like to
>> be able to modify this restore behaviour.
>> However it is also different, because I think it is not documented
>> anywhere and it is unintuitive (to me) - since it changes how the
>> application behaves after restarting it even if the kafka cluster itself
>> was not changed - so I think it's more of a bug than missing documentation.
>>
>> Some more information, the topic is configured like this
>> ```java
>> cleanup.policy: compact
>> compression.type: producer
>> delete.retention.ms: 86400000
>> max.compaction.lag.ms: 9223372036854776000
>> min.compaction.lag.ms: 0
>> retention.bytes: -1
>> retention.ms: 86400000
>> ```
>>
>> I am adding the global store like so
>> ```java
>> streamsBuilder.addGlobalStore(
>>          Stores.timestampedKeyValueStoreBuilder(
>>                  Stores.persistentTimestampedKeyValueStore("foobar"),
>>                  Serdes.String(),
>>                  Serdes.String()),
>>          "foo.bar.globaltopic",
>>          Consumed.with(Serdes.String(), Serdes.String()),
>>          () -> new FooBarUpdateHandler(timeService)
>> );
>> ```
>>
>> and here is the definition of 'FooBarUpdateHandler'
>> ```java
>> import java.time.Instant;
>> import java.util.ArrayList;
>> import java.util.List;
>> import org.apache.kafka.streams.processor.api.Processor;
>> import org.apache.kafka.streams.processor.api.Record;
>> import org.apache.kafka.streams.state.KeyValueIterator;
>> import org.apache.kafka.streams.state.TimestampedKeyValueStore;
>> import org.apache.kafka.streams.state.ValueAndTimestamp;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> /**
>>   * Internal class handling partFamily updates.
>>   */
>> public class FooBarUpdateHandler implements Processor<String, String,
>> Void, Void> {
>>
>>      private static final Logger logger =
>> LoggerFactory.getLogger(FooBarUpdateHandler.class);
>>      private TimestampedKeyValueStore<String, String> store;
>>
>>      @Override
>>      public void init(final
>> org.apache.kafka.streams.processor.api.ProcessorContext<Void, Void>
>> context) {
>>          store = context.getStateStore("foobar");
>>      }
>>
>>      @Override
>>      public void process(final Record<String, String> record) {
>>
>>          // handle tombstones from input topic
>>          if (record.value() == null == record.value().equals("deleteme")) {
>>              store.delete(record.key());
>>          } else {
>>              store.put(
>>                      record.key(),
>>                      ValueAndTimestamp.make(
>>                              record.key(),
>>                              Instant.now().toEpochMilli()
>>                      )
>>              );
>>          }
>>
>>          // this is not relevant
>>          // it's only to show the issue when restarting and restoring the
>>          final List<String> existingKeys = new ArrayList<>();
>>          try (final KeyValueIterator<String, ValueAndTimestamp<String>> all
>> = store.all()) {
>>              all.forEachRemaining((r) -> {
>>                  existingKeys.add(r.key);
>>              });
>>          }
>>          logger.info("Got {} records in the store, with keys {}",
>> existingKeys.size(), String.join(",", existingKeys));
>>      }
>> }
>> ```
>>
>> My workaround is to add this to the 'init' method of the
>> 'FooBarUpdateHandler'
>> ```java
>> try (final KeyValueIterator<String, ValueAndTimestamp<String>> all =
>> store.all()) {
>>      if (all == null) {
>>          return;
>>      }
>>      logger.info("Removing already deleted records from rocksdb
>> representing the global store {}", storeName);
>>      all.forEachRemaining(r -> {
>>          if (r != null && r.key != null && store.get(r.key) == null) {
>>              store.delete(r.key);
>>          }
>>      });
>> }
>> ```
>> Now it is again consistent across restarts.
>>
>> Kind Regards,
>> Patrick
>>
>>
>> Patrick D’Addona
>> Senior Lead IT Architect
>>
>>
>> Mobile: +49 151 544 22 161
>> patrick.dadd...@maibornwolff.de
>> Theresienhöhe 13, 80339 München
>>
>> MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany
>> www.maibornwolff.de<http://www.maibornwolff.de>, Phone +49 89 544 253 000
>> USt-ID DE 129 299 525, Munich District Court HRB 98058
>> Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann,
>> Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos.
>> ____________________________________________________________
>>
>>
>>
>


Reply via email to