Hi Georg,

>From your description, I do not see why you need to use a global state
instead of a local one. Are there any specific reasons for that? With
a local state store you would have the previous record immediately
available.

Best,
Bruno

On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17)
<georg.schmidt-dum...@de.bosch.com.invalid> wrote:
>
> Good morning,
>
> I have setup a Kafka Streams application with the following logic. The 
> incoming messages are validated and transformed. The transformed messages are 
> then published to a global state store via topic A as well as to an 
> additional topic A for consumption by other applications further down the 
> processing pipeline.
>
> As part of the transformation I access the global state store in order to get 
> the values from the previous message and use them in the transformation of 
> the current message. The messages only contain changed values and these 
> changes are merged with the complete data set before being sent on, hence I 
> always hold the latest state in the global store in order to merge it with 
> the incoming changed values.
>
> Unfortunately, when I access the store in the transformation I do not get the 
> latest state. The update of the store takes too long so when I access it in 
> the transformation I either get no values or values which do not represent 
> the latest state.
>
> The following shows the build-up of my streams app:
>
> //setup global state store
> final KeyValueBytesStoreSupplier storeSupplier = 
> Stores.persistentKeyValueStore( “global-store” );
> final StoreBuilder<KeyValueStore<String, JSONObject>> storeBuilder = 
> Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new 
> JSONObjectSerde() );
> builder.addGlobalStore( storeBuilder, “global-store-topic”,  Consumed.with( 
> Serdes.String(), new JSONObjectSerde() ), StoreProcessor::new );
>
> //store processor
>
> private KeyValueStore<String, JSONObject> stateStore;
>
> @Override
> public void init( final ProcessorContext context ) {
>    stateStore = (KeyValueStore<String, JSONObject>) context.getStateStore( 
> “global-store” );
> }
>
>
>
> @Override
> public void process( final String key, final JSONObject state ) {
>    log.info( "Update state store for {}: {}.", key, state );
>    lastRecentStateStore.put( key, state );
> }
>
>
> //streams setup
>
> final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde();
>
> final KStream<String, JSONObject> stream = builder.stream( “input-topic”, 
> Consumed.with( Serdes.String(), jsonObjectSerde ) )
>
>                    .transformValues( ValueTransformer::new )
>
>
>
> stream.to( “global-store-topic”, Produced.valueSerde( jsonObjectSerde ) );
>
> stream.to( “output-topic”, Produced.valueSerde( jsonObjectSerde ) );
>
> //global state store access in ValueTransformer
>
> JSONObject previousState = Optional.ofNullable( stateStore.get( key ) )
>                                    .orElse( new JSONObject() );
>
>
> I have set the acknowledge property for the producers to “all”.
>
> I have tried to disable the caching by setting “cache.max.bytes.buffering” to 
> 0 and by disabling the cache on the store using “.withCachingDisabled()”. I 
> also tried setting the commit interval to 0. All without success.
>
> How can I setup a global state which meets the requirements as describe in 
> the scenario above?
>
> Thank you!
>
> Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候
>
> Mr. Georg Schmidt-Dumont
> Bosch Connected Industry – BCI/ESW17
> Robert Bosch GmbH | Postfach 10 60 50 | 70049 Stuttgart | GERMANY | 
> www.bosch.com<http://www.bosch.com/>
> Phone +49 711 811-49893  | 
> georg.schmidt-dum...@bosch.com<mailto:georg.schmidt-dum...@bosch.com>
>
> Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
> Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. Volkmar 
> Denner,
> Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr. 
> Markus Heyn, Dr. Dirk Hoheisel,
> Christoph Kübel, Uwe Raschke, Peter Tyroller
>

Reply via email to