[ https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16881603#comment-16881603 ]
Sophie Blee-Goldman commented on KAFKA-8630: -------------------------------------------- Yep, that's the KIP. There was some ongoing discussion but I think it got dropped on a floor a little while back as people became busy. You might be able to kick start discussion again with this as motivation – someone on the mailing list had a similar problem, but wanted to use key-value stores so for the time being they can get by with just MockProcessorContext. Of course, even though some stores don't currently need the InternalProcessorContext yet, we are actively adding more metrics and this may not be true for long. It's worth solving this holistically. That's probably a lot more work than we need to do here. You're right about the StreamsMetrics vs StreamsMetricsImpl – the window store, for example, needs to access the storeLevelSensor method, but we don't want to expose that. I agree it's pretty annoying that all this boils down to is needing to access some internal metrics/sensors, but I do think the right way to solve it is on the test side. From the source code side, everything is working as it should – it's the MockProcessorContext that fails to adequately 'mock' the real thing. I think some light refactoring to have the contexts implement some "ProcessorContextMetrics" interface could be accomplished pretty easily > Unit testing a streams processor with a WindowStore throws a > ClassCastException > ------------------------------------------------------------------------------- > > Key: KAFKA-8630 > URL: https://issues.apache.org/jira/browse/KAFKA-8630 > Project: Kafka > Issue Type: Bug > Components: streams-test-utils > Affects Versions: 2.3.0 > Reporter: Justin Fetherolf > Priority: Major > > I was attempting to write a unit test for a class implementing the > {{Processor}} interface that contained a {{WindowStore}}, but running the > test fails with a {{ClassCastException}} coming out of > {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to > {{InternalProcessorContext}}. > Minimal code to reproduce: > {code:java} > package com.cantgetthistowork; > import org.apache.kafka.streams.processor.Processor; > import org.apache.kafka.streams.processor.ProcessorContext; > import org.apache.kafka.streams.state.WindowStore; > public class InMemWindowProcessor implements Processor<String, String> { > private ProcessorContext context; > private WindowStore<String, String> windowStore; > @Override > public void init(ProcessorContext context) { > this.context = context; > windowStore = (WindowStore<String, String>) > context.getStateStore("my-win-store"); > } > @Override > public void process(String key, String value) { > } > @Override > public void close() { > } > } > {code} > {code:java} > package com.cantgetthistowork; > import java.time.Duration; > import java.time.Instant; > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.streams.processor.MockProcessorContext; > import org.apache.kafka.streams.state.Stores; > import org.apache.kafka.streams.state.WindowStore; > import org.junit.Before; > import org.junit.Test; > public class InMemWindowProcessorTest { > InMemWindowProcessor processor = null; > MockProcessorContext context = null; > @Before > public void setup() { > processor = new InMemWindowProcessor(); > context = new MockProcessorContext(); > WindowStore<String, String> store = > Stores.windowStoreBuilder( > Stores.inMemoryWindowStore( > "my-win-store", > Duration.ofMinutes(10), > Duration.ofSeconds(10), > false > ), > Serdes.String(), > Serdes.String() > ) > .withLoggingDisabled() > .build(); > store.init(context, store); > context.register(store, null); > processor.init(context); > } > @Test > public void testThings() { > Instant baseTime = Instant.now(); > context.setTimestamp(baseTime.toEpochMilli()); > context.setTopic("topic-name"); > processor.process("key1", "value1"); > } > } > {code} > > I was trying this with maven, with mvn --version outputting: > {noformat} > Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; > 2017-04-03T13:39:06-06:00) > Maven home: ~/opt/apache-maven-3.5.0 > Java version: 1.8.0_212, vendor: Oracle Corporation > Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre > Default locale: en_US, platform encoding: UTF-8 > OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: > "unix"{noformat} > And finally the stack trace: > {noformat} > ------------------------------------------------------- > T E S T S > ------------------------------------------------------- > Running com.cantgetthistowork.InMemWindowProcessorTest > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details. > Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< > FAILURE! > testThings(com.cantgetthistowork.InMemWindowProcessorTest) Time elapsed: > 0.05 sec <<< ERROR! > java.lang.ClassCastException: > org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to > org.apache.kafka.streams.processor.internals.InternalProcessorContext > at > org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90) > at > com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189) > at > org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165) > at > org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75) > Results : > Tests in error: > testThings(com.cantgetthistowork.InMemWindowProcessorTest): > org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to > org.apache.kafka.streams.processor.internals.InternalProcessorContext > Tests run: 1, Failures: 0, Errors: 1, Skipped: 0{noformat} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)