Hi, for me and my team, the following problem already took many resources from us and after one year of struggeling we couldn't solve it yet.
In one of our spring based applications we use Kafka State Stores with the following initialization. @Bean public KafkaStreams myStuffMessagesStream(@Qualifier("mystuffEvents") final StreamsBuilderFactoryBean streamsBuilderFactoryBean) throws Exception { final StreamsBuilder myicpQueryStreamBuilder = Objects.requireNonNull(streamsBuilderFactoryBean.getObject()); final StoreBuilder<KeyValueStore<String, List<Command<MyStuffPayload>>>> keyValueStoreBuilder = keyValueStoreBuilder(inMemoryKeyValueStore(storeName), Serdes.String(), new CommandListSerde<>()); myicpQueryStreamBuilder.addStateStore(keyValueStoreBuilder); //@formatter:off myicpQueryStreamBuilder .stream(kafkaTopicNames.getMyStuffMessageTopic(), Consumed.with(Serdes.String(), new CommandSerde<>())) .mapValues(this::mapPayloadToMyIcpPayload) .transformValues(() -> new CommandTransformer(storeName, maxStoreSize), storeName); //@formatter:on final KafkaStreams kafkaStreams = new KafkaStreams(myicpQueryStreamBuilder.build(), Objects.requireNonNull(streamsBuilderFactoryBean.getStreamsConfiguration())); kafkaStreams.start(); return kafkaStreams; } When we now query the state store, the possibility to grab the state store is unpredictable. @Override @Cacheable(value = MYICP_NOTIFICATIONS, key = "#emailAddress", unless = "#result == null || #result.cachedObject == null || #result.cachedObject.isEmpty()") public GenericCacheable<List<MyStuffNotification>> getMyStuffNotificationsForUser(final String uuid, final String emailAddress) throws InterruptedException { if (!hasText(emailAddress)) { LOGGER.error("[{}]: getMyStuffNotificationsForUser was called with an invalid email address.", uuid); return new GenericCacheable<>(Collections.emptyList(), null); } if (keyValueStore == null) { initializeStore(uuid); } if (keyValueStore == null) { LOGGER.error("[{}]: Key value store is not initialized.", uuid); int numberOfTries = kafkaRestartingProperties.increaseStateStoreNotInitialized(); restartSystemIfNeeded(numberOfTries); return new GenericCacheable<>(Collections.emptyList(), null); } final List<Command<MyIcpPayload>> commandList = keyValueStore.get(emailAddress.toLowerCase()); if (commandList == null) { return new GenericCacheable<>(Collections.emptyList(), null); } //@formatter:off final List<MyStuffNotification> list = commandList .stream() .map(this::mapToNotification) .collect(Collectors.toList()); //@formatter:on return new GenericCacheable<>(list, LocalDateTime.now()); } private void initializeStore(final String uuid) throws InterruptedException { int counter = 0; while (counter < 5) { try { final StoreQueryParameters<ReadOnlyKeyValueStore<String, List<Command<MyIcpPayload>>>> storeQueryParameters = StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()); keyValueStore = myIcpMessagesStream.store(storeQueryParameters.enableStaleStores()); return; } catch (final Exception e) { LOGGER.warn("[{}]: Error while loading the state store [{}]", uuid, e.getMessage()); Thread.sleep(1000); counter++; } } } private void restartSystemIfNeeded(final int numberOfTries) { if(kafkaRestartingProperties.hasApplicationReachedExceptionLimit()){ LOGGER.error("After [{}] State Store retries. System is shut down", numberOfTries); System.exit(kafkaStateStoreCannotBeInitializedThusRestartExitCode); } } In 50% of the cases the state store reading throws and InvalidStateStore Exception, State Store might migrated to another instance. When we receive this message, the application never could heal itself after it. Always a restart was needed. We implemented several waiting mechanism to make request to the state store, but none helped. We are working in Kubernetes environment with a recreate strategy (first shutdown - then start) We have 1 topic with one partition and one consumer only. We are not using the rocksdb state store implementation, as we don't see the need to have it persistet on the filesystem. This would also mean for us, that we would need to create a Kubernetes stateful set instead of a Kubernetes deployment. It would be really great if someone could help us here. Because of this, we start to think to get rid of Kafka again as it brings us no value of what we have expected. Regards Thomas ________________________________ InMediasP GmbH Neuendorfstraße 18a 16761 Hennigsdorf Fon: +49 3302 559-420 Fax: +49 3302 559-124 www.inmediasp.de Geschäftsführer: Dr. Volker Kleinhans, Dr. Jörg Lüddemann, Dr. Armin Ulbrich Amtsgericht Neuruppin HRB 4654 | USt.-ID: DE194541601