Hi Georg, >From your description, I do not see why you need to use a global state instead of a local one. Are there any specific reasons for that? With a local state store you would have the previous record immediately available.
Best, Bruno On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17) <georg.schmidt-dum...@de.bosch.com.invalid> wrote: > > Good morning, > > I have setup a Kafka Streams application with the following logic. The > incoming messages are validated and transformed. The transformed messages are > then published to a global state store via topic A as well as to an > additional topic A for consumption by other applications further down the > processing pipeline. > > As part of the transformation I access the global state store in order to get > the values from the previous message and use them in the transformation of > the current message. The messages only contain changed values and these > changes are merged with the complete data set before being sent on, hence I > always hold the latest state in the global store in order to merge it with > the incoming changed values. > > Unfortunately, when I access the store in the transformation I do not get the > latest state. The update of the store takes too long so when I access it in > the transformation I either get no values or values which do not represent > the latest state. > > The following shows the build-up of my streams app: > > //setup global state store > final KeyValueBytesStoreSupplier storeSupplier = > Stores.persistentKeyValueStore( “global-store” ); > final StoreBuilder<KeyValueStore<String, JSONObject>> storeBuilder = > Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new > JSONObjectSerde() ); > builder.addGlobalStore( storeBuilder, “global-store-topic”, Consumed.with( > Serdes.String(), new JSONObjectSerde() ), StoreProcessor::new ); > > //store processor > > private KeyValueStore<String, JSONObject> stateStore; > > @Override > public void init( final ProcessorContext context ) { > stateStore = (KeyValueStore<String, JSONObject>) context.getStateStore( > “global-store” ); > } > > > > @Override > public void process( final String key, final JSONObject state ) { > log.info( "Update state store for {}: {}.", key, state ); > lastRecentStateStore.put( key, state ); > } > > > //streams setup > > final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde(); > > final KStream<String, JSONObject> stream = builder.stream( “input-topic”, > Consumed.with( Serdes.String(), jsonObjectSerde ) ) > > .transformValues( ValueTransformer::new ) > > > > stream.to( “global-store-topic”, Produced.valueSerde( jsonObjectSerde ) ); > > stream.to( “output-topic”, Produced.valueSerde( jsonObjectSerde ) ); > > //global state store access in ValueTransformer > > JSONObject previousState = Optional.ofNullable( stateStore.get( key ) ) > .orElse( new JSONObject() ); > > > I have set the acknowledge property for the producers to “all”. > > I have tried to disable the caching by setting “cache.max.bytes.buffering” to > 0 and by disabling the cache on the store using “.withCachingDisabled()”. I > also tried setting the commit interval to 0. All without success. > > How can I setup a global state which meets the requirements as describe in > the scenario above? > > Thank you! > > Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候 > > Mr. Georg Schmidt-Dumont > Bosch Connected Industry – BCI/ESW17 > Robert Bosch GmbH | Postfach 10 60 50 | 70049 Stuttgart | GERMANY | > www.bosch.com<http://www.bosch.com/> > Phone +49 711 811-49893 | > georg.schmidt-dum...@bosch.com<mailto:georg.schmidt-dum...@bosch.com> > > Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000; > Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. Volkmar > Denner, > Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr. > Markus Heyn, Dr. Dirk Hoheisel, > Christoph Kübel, Uwe Raschke, Peter Tyroller >