Hi,
for me and my team, the following problem already took many resources from us 
and after one year of struggeling we couldn't solve it yet.

In one of our spring based applications we use Kafka State Stores with the 
following initialization.


@Bean
public KafkaStreams myStuffMessagesStream(@Qualifier("mystuffEvents") final 
StreamsBuilderFactoryBean streamsBuilderFactoryBean) throws Exception {
    final StreamsBuilder myicpQueryStreamBuilder = 
Objects.requireNonNull(streamsBuilderFactoryBean.getObject());

    final StoreBuilder<KeyValueStore<String, List<Command<MyStuffPayload>>>> 
keyValueStoreBuilder = keyValueStoreBuilder(inMemoryKeyValueStore(storeName), 
Serdes.String(), new CommandListSerde<>());
    myicpQueryStreamBuilder.addStateStore(keyValueStoreBuilder);

    //@formatter:off
    myicpQueryStreamBuilder
            .stream(kafkaTopicNames.getMyStuffMessageTopic(), 
Consumed.with(Serdes.String(), new CommandSerde<>()))
            .mapValues(this::mapPayloadToMyIcpPayload)
            .transformValues(() -> new CommandTransformer(storeName, 
maxStoreSize), storeName);
    //@formatter:on

    final KafkaStreams kafkaStreams = new 
KafkaStreams(myicpQueryStreamBuilder.build(), 
Objects.requireNonNull(streamsBuilderFactoryBean.getStreamsConfiguration()));
    kafkaStreams.start();

    return kafkaStreams;
}


When we now query the state store, the possibility to grab the state store is 
unpredictable.



@Override
@Cacheable(value = MYICP_NOTIFICATIONS, key = "#emailAddress", unless = 
"#result == null || #result.cachedObject == null || 
#result.cachedObject.isEmpty()")
public GenericCacheable<List<MyStuffNotification>> 
getMyStuffNotificationsForUser(final String uuid, final String emailAddress) 
throws InterruptedException {
    if (!hasText(emailAddress)) {
        LOGGER.error("[{}]: getMyStuffNotificationsForUser was called with an 
invalid email address.", uuid);
        return new GenericCacheable<>(Collections.emptyList(), null);
    }

    if (keyValueStore == null) {
        initializeStore(uuid);
    }

    if (keyValueStore == null) {
        LOGGER.error("[{}]: Key value store is not initialized.", uuid);
        int numberOfTries = 
kafkaRestartingProperties.increaseStateStoreNotInitialized();
        restartSystemIfNeeded(numberOfTries);
        return new GenericCacheable<>(Collections.emptyList(), null);
    }

    final List<Command<MyIcpPayload>> commandList = 
keyValueStore.get(emailAddress.toLowerCase());
    if (commandList == null) {
        return new GenericCacheable<>(Collections.emptyList(), null);
    }

    //@formatter:off
    final List<MyStuffNotification> list = commandList
            .stream()
            .map(this::mapToNotification)
            .collect(Collectors.toList());
    //@formatter:on

    return new GenericCacheable<>(list, LocalDateTime.now());
}

private void initializeStore(final String uuid) throws InterruptedException {
    int counter = 0;
    while (counter < 5) {
        try {
            final StoreQueryParameters<ReadOnlyKeyValueStore<String, 
List<Command<MyIcpPayload>>>> storeQueryParameters =
                    StoreQueryParameters.fromNameAndType(storeName, 
QueryableStoreTypes.keyValueStore());
            keyValueStore = 
myIcpMessagesStream.store(storeQueryParameters.enableStaleStores());
            return;
        } catch (final Exception e) {
            LOGGER.warn("[{}]: Error while loading the state store [{}]", uuid, 
e.getMessage());
            Thread.sleep(1000);
            counter++;
        }
    }
}

private void restartSystemIfNeeded(final int numberOfTries) {
    if(kafkaRestartingProperties.hasApplicationReachedExceptionLimit()){
        LOGGER.error("After [{}] State Store retries. System is shut down", 
numberOfTries);
        System.exit(kafkaStateStoreCannotBeInitializedThusRestartExitCode);
    }
}

In 50% of the cases the state store reading throws and InvalidStateStore 
Exception, State Store might migrated to another instance. When we receive this 
message, the application never could heal itself after it. Always a restart was 
needed. We implemented several waiting mechanism to make request to the state 
store, but none helped.


We are working in Kubernetes environment with a recreate strategy (first 
shutdown - then start)
We have 1 topic with one partition and one consumer only.

We are not using the rocksdb state store implementation, as we don't see the 
need to have it persistet on the filesystem. This would also mean for us, that 
we would need to create a Kubernetes stateful set instead of a Kubernetes 
deployment.

It would be really great if someone could help us here.

Because of this, we start to think to get rid of Kafka again as it brings us no 
value of what we have expected.

Regards
Thomas


________________________________

InMediasP GmbH
Neuendorfstraße 18a
16761 Hennigsdorf

Fon: +49 3302 559-420
Fax: +49 3302 559-124
www.inmediasp.de

Geschäftsführer: Dr. Volker Kleinhans, Dr. Jörg Lüddemann, Dr. Armin Ulbrich

Amtsgericht Neuruppin HRB 4654 | USt.-ID: DE194541601

Reply via email to