[ 
https://issues.apache.org/jira/browse/KAFKA-19602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florens Pauwels updated KAFKA-19602:
------------------------------------
    Description: 
I believe for this to occur you need
 # transformValues on a KTable, followed by a KTable join or leftJoin
 # The transformValues is not materialized (no store name given)
 # The transformValues accesses at least one extra store

Tested on 3.6.1 and 3.9.1

Example code:
{code:java}
@Component
class TestCase {
    private static final StoreBuilder<TimestampedKeyValueStore<String, String>> 
TRANSFORMER_STORE =
          Stores.timestampedKeyValueStoreBuilder(
                Stores.persistentTimestampedKeyValueStore("transformer-store"),
                Serdes.String(),
                Serdes.String()
          );

    private final StreamsBuilder streamsBuilder;

    TestCase(StreamsBuilder streamsBuilder) {
       this.streamsBuilder = streamsBuilder;
    }

    @PostConstruct
    void configure() {
       streamsBuilder.addStateStore(TRANSFORMER_STORE);

       var aggregateTable = streamsBuilder
             .stream("input", Consumed.with(Serdes.String(), 
Serdes.String()).withName("input-to-stream"))
             .toTable(Named.as("to-table"), 
MaterializedAs.keyValue("aggregate-store",
                   Serdes.String(), Serdes.String()))
             .transformValues(MyTransformer::new,
                   Materialized.with(Serdes.String(), Serdes.String()),
                   Named.as("my-transformer"), TRANSFORMER_STORE.name());

       aggregateTable
             .join(aggregateTable,
                   (value, _) -> value,
                   Named.as("after-transformer"),
                   Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("after-transformer-store")
                         .withKeySerde(Serdes.String())
                         .withValueSerde(Serdes.String()))
             .toStream(Named.as("aggregate-to-stream"))
             .to("output", Produced.with(Serdes.String(), 
Serdes.String()).withName("output-to-topic"));

       System.out.println(streamsBuilder.build().describe().toString());
    }

    private static class MyTransformer implements 
ValueTransformerWithKey<String, String , String> {
       @Override
       public void init(ProcessorContext context) {
          context.getStateStore(TRANSFORMER_STORE.name());
       }

       @Override
       public String transform(String readOnlyKey, String value) {
          return value;
       }

       @Override
       public void close() {
       }
    }
}
 {code}
Result of the above code:

 
{noformat}
org.apache.kafka.streams.errors.StreamsException: failed to initialize 
processor after-transformer-join-this
    at 
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:131)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:140)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:1089)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:295)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.TaskManager.transitRestoredTaskToRunning(TaskManager.java:980)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.TaskManager.handleRestoredTasksFromStateUpdater(TaskManager.java:1055)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:920)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1191)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:999)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:713)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:672)
 ~[kafka-streams-3.9.1.jar:na]
Caused by: org.apache.kafka.streams.errors.StreamsException: Processor 
after-transformer-join-this has no access to StateStore transformer-store as 
the store is not connected to the processor. If you add stores manually via 
'.addStateStore()' make sure to connect the added store to the processor by 
providing the processor name to '.addStateStore()' or connect them via 
'.connectProcessorAndStateStores()'. DSL users need to provide the store name 
to '.process()', '.transform()', or '.transformValues()' to connect the store 
to the corresponding operator, or they can provide a StoreBuilder by 
implementing the stores() method on the Supplier itself. If you do not add 
stores manually, please file a bug report at 
https://issues.apache.org/jira/projects/KAFKA.
    at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:174)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext.getStateStore(ForwardingDisabledProcessorContext.java:90)
 ~[kafka-streams-3.9.1.jar:na]
    at 
be.florens.kafkaspringtest.selfjoin.TestCase$MyTransformer.init(TestCase.java:63)
 ~[main/:na]
    at 
org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesGetter.init(KTableTransformValues.java:156)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin$KTableKTableJoinProcessor.init(KTableKTableInnerJoin.java:83)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:123)
 ~[kafka-streams-3.9.1.jar:na]
    ... 10 common frames omitted{noformat}
 

 

  was:
I believe for this to occur you need
 # transformValues on a KTable, followed by a KTable join or leftJoin
 # The transformValues is not materialized (no store name given)
 # The transformValues accesses at least extra store

Tested on 3.6.1 and 3.9.1

Example code:
{code:java}
@Component
class TestCase {
    private static final StoreBuilder<TimestampedKeyValueStore<String, String>> 
TRANSFORMER_STORE =
          Stores.timestampedKeyValueStoreBuilder(
                Stores.persistentTimestampedKeyValueStore("transformer-store"),
                Serdes.String(),
                Serdes.String()
          );

    private final StreamsBuilder streamsBuilder;

    TestCase(StreamsBuilder streamsBuilder) {
       this.streamsBuilder = streamsBuilder;
    }

    @PostConstruct
    void configure() {
       streamsBuilder.addStateStore(TRANSFORMER_STORE);

       var aggregateTable = streamsBuilder
             .stream("input", Consumed.with(Serdes.String(), 
Serdes.String()).withName("input-to-stream"))
             .toTable(Named.as("to-table"), 
MaterializedAs.keyValue("aggregate-store",
                   Serdes.String(), Serdes.String()))
             .transformValues(MyTransformer::new,
                   Materialized.with(Serdes.String(), Serdes.String()),
                   Named.as("my-transformer"), TRANSFORMER_STORE.name());

       aggregateTable
             .join(aggregateTable,
                   (value, _) -> value,
                   Named.as("after-transformer"),
                   Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("after-transformer-store")
                         .withKeySerde(Serdes.String())
                         .withValueSerde(Serdes.String()))
             .toStream(Named.as("aggregate-to-stream"))
             .to("output", Produced.with(Serdes.String(), 
Serdes.String()).withName("output-to-topic"));

       System.out.println(streamsBuilder.build().describe().toString());
    }

    private static class MyTransformer implements 
ValueTransformerWithKey<String, String , String> {
       @Override
       public void init(ProcessorContext context) {
          context.getStateStore(TRANSFORMER_STORE.name());
       }

       @Override
       public String transform(String readOnlyKey, String value) {
          return value;
       }

       @Override
       public void close() {
       }
    }
}
 {code}
Result of the above code:

 
{noformat}
org.apache.kafka.streams.errors.StreamsException: failed to initialize 
processor after-transformer-join-this
    at 
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:131)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:140)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:1089)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:295)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.TaskManager.transitRestoredTaskToRunning(TaskManager.java:980)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.TaskManager.handleRestoredTasksFromStateUpdater(TaskManager.java:1055)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:920)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1191)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:999)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:713)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:672)
 ~[kafka-streams-3.9.1.jar:na]
Caused by: org.apache.kafka.streams.errors.StreamsException: Processor 
after-transformer-join-this has no access to StateStore transformer-store as 
the store is not connected to the processor. If you add stores manually via 
'.addStateStore()' make sure to connect the added store to the processor by 
providing the processor name to '.addStateStore()' or connect them via 
'.connectProcessorAndStateStores()'. DSL users need to provide the store name 
to '.process()', '.transform()', or '.transformValues()' to connect the store 
to the corresponding operator, or they can provide a StoreBuilder by 
implementing the stores() method on the Supplier itself. If you do not add 
stores manually, please file a bug report at 
https://issues.apache.org/jira/projects/KAFKA.
    at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:174)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext.getStateStore(ForwardingDisabledProcessorContext.java:90)
 ~[kafka-streams-3.9.1.jar:na]
    at 
be.florens.kafkaspringtest.selfjoin.TestCase$MyTransformer.init(TestCase.java:63)
 ~[main/:na]
    at 
org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesGetter.init(KTableTransformValues.java:156)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin$KTableKTableJoinProcessor.init(KTableKTableInnerJoin.java:83)
 ~[kafka-streams-3.9.1.jar:na]
    at 
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:123)
 ~[kafka-streams-3.9.1.jar:na]
    ... 10 common frames omitted{noformat}
 

 


> Kafka Streams join after unmaterialized transformValues on KTable with extra 
> store fails
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19602
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19602
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 3.6.1, 3.9.1
>            Reporter: Florens Pauwels
>            Priority: Minor
>
> I believe for this to occur you need
>  # transformValues on a KTable, followed by a KTable join or leftJoin
>  # The transformValues is not materialized (no store name given)
>  # The transformValues accesses at least one extra store
> Tested on 3.6.1 and 3.9.1
> Example code:
> {code:java}
> @Component
> class TestCase {
>     private static final StoreBuilder<TimestampedKeyValueStore<String, 
> String>> TRANSFORMER_STORE =
>           Stores.timestampedKeyValueStoreBuilder(
>                 
> Stores.persistentTimestampedKeyValueStore("transformer-store"),
>                 Serdes.String(),
>                 Serdes.String()
>           );
>     private final StreamsBuilder streamsBuilder;
>     TestCase(StreamsBuilder streamsBuilder) {
>        this.streamsBuilder = streamsBuilder;
>     }
>     @PostConstruct
>     void configure() {
>        streamsBuilder.addStateStore(TRANSFORMER_STORE);
>        var aggregateTable = streamsBuilder
>              .stream("input", Consumed.with(Serdes.String(), 
> Serdes.String()).withName("input-to-stream"))
>              .toTable(Named.as("to-table"), 
> MaterializedAs.keyValue("aggregate-store",
>                    Serdes.String(), Serdes.String()))
>              .transformValues(MyTransformer::new,
>                    Materialized.with(Serdes.String(), Serdes.String()),
>                    Named.as("my-transformer"), TRANSFORMER_STORE.name());
>        aggregateTable
>              .join(aggregateTable,
>                    (value, _) -> value,
>                    Named.as("after-transformer"),
>                    Materialized.<String, String, KeyValueStore<Bytes, 
> byte[]>>as("after-transformer-store")
>                          .withKeySerde(Serdes.String())
>                          .withValueSerde(Serdes.String()))
>              .toStream(Named.as("aggregate-to-stream"))
>              .to("output", Produced.with(Serdes.String(), 
> Serdes.String()).withName("output-to-topic"));
>        System.out.println(streamsBuilder.build().describe().toString());
>     }
>     private static class MyTransformer implements 
> ValueTransformerWithKey<String, String , String> {
>        @Override
>        public void init(ProcessorContext context) {
>           context.getStateStore(TRANSFORMER_STORE.name());
>        }
>        @Override
>        public String transform(String readOnlyKey, String value) {
>           return value;
>        }
>        @Override
>        public void close() {
>        }
>     }
> }
>  {code}
> Result of the above code:
>  
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: failed to initialize 
> processor after-transformer-join-this
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:131)
>  ~[kafka-streams-3.9.1.jar:na]
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:140)
>  ~[kafka-streams-3.9.1.jar:na]
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:1089)
>  ~[kafka-streams-3.9.1.jar:na]
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:295)
>  ~[kafka-streams-3.9.1.jar:na]
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.transitRestoredTaskToRunning(TaskManager.java:980)
>  ~[kafka-streams-3.9.1.jar:na]
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRestoredTasksFromStateUpdater(TaskManager.java:1055)
>  ~[kafka-streams-3.9.1.jar:na]
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:920)
>  ~[kafka-streams-3.9.1.jar:na]
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1191)
>  ~[kafka-streams-3.9.1.jar:na]
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:999)
>  ~[kafka-streams-3.9.1.jar:na]
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:713)
>  ~[kafka-streams-3.9.1.jar:na]
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:672)
>  ~[kafka-streams-3.9.1.jar:na]
> Caused by: org.apache.kafka.streams.errors.StreamsException: Processor 
> after-transformer-join-this has no access to StateStore transformer-store as 
> the store is not connected to the processor. If you add stores manually via 
> '.addStateStore()' make sure to connect the added store to the processor by 
> providing the processor name to '.addStateStore()' or connect them via 
> '.connectProcessorAndStateStores()'. DSL users need to provide the store name 
> to '.process()', '.transform()', or '.transformValues()' to connect the store 
> to the corresponding operator, or they can provide a StoreBuilder by 
> implementing the stores() method on the Supplier itself. If you do not add 
> stores manually, please file a bug report at 
> https://issues.apache.org/jira/projects/KAFKA.
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:174)
>  ~[kafka-streams-3.9.1.jar:na]
>     at 
> org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext.getStateStore(ForwardingDisabledProcessorContext.java:90)
>  ~[kafka-streams-3.9.1.jar:na]
>     at 
> be.florens.kafkaspringtest.selfjoin.TestCase$MyTransformer.init(TestCase.java:63)
>  ~[main/:na]
>     at 
> org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesGetter.init(KTableTransformValues.java:156)
>  ~[kafka-streams-3.9.1.jar:na]
>     at 
> org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin$KTableKTableJoinProcessor.init(KTableKTableInnerJoin.java:83)
>  ~[kafka-streams-3.9.1.jar:na]
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:123)
>  ~[kafka-streams-3.9.1.jar:na]
>     ... 10 common frames omitted{noformat}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to