Good morning, I have setup a Kafka Streams application with the following logic. The incoming messages are validated and transformed. The transformed messages are then published to a global state store via topic A as well as to an additional topic A for consumption by other applications further down the processing pipeline.
As part of the transformation I access the global state store in order to get the values from the previous message and use them in the transformation of the current message. The messages only contain changed values and these changes are merged with the complete data set before being sent on, hence I always hold the latest state in the global store in order to merge it with the incoming changed values. Unfortunately, when I access the store in the transformation I do not get the latest state. The update of the store takes too long so when I access it in the transformation I either get no values or values which do not represent the latest state. The following shows the build-up of my streams app: //setup global state store final KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore( “global-store” ); final StoreBuilder<KeyValueStore<String, JSONObject>> storeBuilder = Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new JSONObjectSerde() ); builder.addGlobalStore( storeBuilder, “global-store-topic”, Consumed.with( Serdes.String(), new JSONObjectSerde() ), StoreProcessor::new ); //store processor private KeyValueStore<String, JSONObject> stateStore; @Override public void init( final ProcessorContext context ) { stateStore = (KeyValueStore<String, JSONObject>) context.getStateStore( “global-store” ); } @Override public void process( final String key, final JSONObject state ) { log.info( "Update state store for {}: {}.", key, state ); lastRecentStateStore.put( key, state ); } //streams setup final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde(); final KStream<String, JSONObject> stream = builder.stream( “input-topic”, Consumed.with( Serdes.String(), jsonObjectSerde ) ) .transformValues( ValueTransformer::new ) stream.to( “global-store-topic”, Produced.valueSerde( jsonObjectSerde ) ); stream.to( “output-topic”, Produced.valueSerde( jsonObjectSerde ) ); //global state store access in ValueTransformer JSONObject previousState = Optional.ofNullable( stateStore.get( key ) ) .orElse( new JSONObject() ); I have set the acknowledge property for the producers to “all”. I have tried to disable the caching by setting “cache.max.bytes.buffering” to 0 and by disabling the cache on the store using “.withCachingDisabled()”. I also tried setting the commit interval to 0. All without success. How can I setup a global state which meets the requirements as describe in the scenario above? Thank you! Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候 Mr. Georg Schmidt-Dumont Bosch Connected Industry – BCI/ESW17 Robert Bosch GmbH | Postfach 10 60 50 | 70049 Stuttgart | GERMANY | www.bosch.com<http://www.bosch.com/> Phone +49 711 811-49893 | georg.schmidt-dum...@bosch.com<mailto:georg.schmidt-dum...@bosch.com> Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000; Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. Volkmar Denner, Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr. Markus Heyn, Dr. Dirk Hoheisel, Christoph Kübel, Uwe Raschke, Peter Tyroller