[
https://issues.apache.org/jira/browse/KAFKA-13373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aleksandr Sorokoumov resolved KAFKA-13373.
------------------------------------------
Resolution: Cannot Reproduce
> ValueTransformerWithKeySupplier doesn't work with store()
> ---------------------------------------------------------
>
> Key: KAFKA-13373
> URL: https://issues.apache.org/jira/browse/KAFKA-13373
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.8.0
> Reporter: Anatoly Tsyganenko
> Assignee: Aleksandr Sorokoumov
> Priority: Minor
> Labels: newbie
>
> I'm trying to utilize stores() method in ValueTransformerWithKeySupplier like
> this:
>
> {code:java}
> public final class CustomSupplier implements
> ValueTransformerWithKeySupplier<Windowed<String>, JsonNode, JsonNode> {
> private final String storeName = "my-store";
> public Set<StoreBuilder<?>> stores() {
> final Deserializer<JsonNode> jsonDeserializer = new
> JsonDeserializer();
> final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
> final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer,
> jsonDeserializer);
> final Serde<String> stringSerde = Serdes.String();
> final StoreBuilder<TimestampedKeyValueStore<String, JsonNode>> store
> =
> Stores.timestampedKeyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
> stringSerde, jsonSerde).withLoggingDisabled();
> return Collections.singleton(store);
> }
> @Override
> public ValueTransformerWithKey<Windowed<String>, JsonNode, JsonNode>
> get() {
> return new ValueTransformerWithKey<Windowed<String>, JsonNode,
> JsonNode>() {
> private ProcessorContext context;
> private TimestampedKeyValueStore<String, JsonNode> store;
> @Override
> public void init(final ProcessorContext context) {
> this.store = context.getStateStore(storeName);
> this.context = context;
> }
> //....
> }{code}
>
> But got next error for line "this.store = context.getStateStore(storeName);"
> in init():
> {code:java}
> Caused by: org.apache.kafka.streams.errors.StreamsException: Processor
> KTABLE-TRANSFORMVALUES-0000000008 has no access to StateStore my-store as the
> store is not connected to the processor. If you add stores manually via
> '.addStateStore()' make sure to connect the added store to the processor by
> providing the processor name to '.addStateStore()' or connect them via
> '.connectProcessorAndStateStores()'. DSL users need to provide the store name
> to '.process()', '.transform()', or '.transformValues()' to connect the store
> to the corresponding operator, or they can provide a StoreBuilder by
> implementing the stores() method on the Supplier itself. If you do not add
> stores manually, please file a bug report at
> https://issues.apache.org/jira/projects/KAFKA.{code}
>
> The same code works perfect with Transform or when I adding store to builder.
> Looks like something wrong when ConnectedStoreProvider and
> ValueTransformerWithKeySupplier used together.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)