Good morning,

I have setup a Kafka Streams application with the following logic. The incoming 
messages are validated and transformed. The transformed messages are then 
published to a global state store via topic A as well as to an additional topic 
A for consumption by other applications further down the processing pipeline.

As part of the transformation I access the global state store in order to get 
the values from the previous message and use them in the transformation of the 
current message. The messages only contain changed values and these changes are 
merged with the complete data set before being sent on, hence I always hold the 
latest state in the global store in order to merge it with the incoming changed 
values.

Unfortunately, when I access the store in the transformation I do not get the 
latest state. The update of the store takes too long so when I access it in the 
transformation I either get no values or values which do not represent the 
latest state.

The following shows the build-up of my streams app:

//setup global state store
final KeyValueBytesStoreSupplier storeSupplier = 
Stores.persistentKeyValueStore( “global-store” );
final StoreBuilder<KeyValueStore<String, JSONObject>> storeBuilder = 
Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new 
JSONObjectSerde() );
builder.addGlobalStore( storeBuilder, “global-store-topic”,  Consumed.with( 
Serdes.String(), new JSONObjectSerde() ), StoreProcessor::new );

//store processor

private KeyValueStore<String, JSONObject> stateStore;

@Override
public void init( final ProcessorContext context ) {
   stateStore = (KeyValueStore<String, JSONObject>) context.getStateStore( 
“global-store” );
}



@Override
public void process( final String key, final JSONObject state ) {
   log.info( "Update state store for {}: {}.", key, state );
   lastRecentStateStore.put( key, state );
}


//streams setup

final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde();

final KStream<String, JSONObject> stream = builder.stream( “input-topic”, 
Consumed.with( Serdes.String(), jsonObjectSerde ) )

                   .transformValues( ValueTransformer::new )



stream.to( “global-store-topic”, Produced.valueSerde( jsonObjectSerde ) );

stream.to( “output-topic”, Produced.valueSerde( jsonObjectSerde ) );

//global state store access in ValueTransformer

JSONObject previousState = Optional.ofNullable( stateStore.get( key ) )
                                   .orElse( new JSONObject() );


I have set the acknowledge property for the producers to “all”.

I have tried to disable the caching by setting “cache.max.bytes.buffering” to 0 
and by disabling the cache on the store using “.withCachingDisabled()”. I also 
tried setting the commit interval to 0. All without success.

How can I setup a global state which meets the requirements as describe in the 
scenario above?

Thank you!

Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候

Mr. Georg Schmidt-Dumont
Bosch Connected Industry – BCI/ESW17
Robert Bosch GmbH | Postfach 10 60 50 | 70049 Stuttgart | GERMANY | 
www.bosch.com<http://www.bosch.com/>
Phone +49 711 811-49893  | 
georg.schmidt-dum...@bosch.com<mailto:georg.schmidt-dum...@bosch.com>

Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. Volkmar 
Denner,
Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr. 
Markus Heyn, Dr. Dirk Hoheisel,
Christoph Kübel, Uwe Raschke, Peter Tyroller

Reply via email to