[ 
https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486257#comment-17486257
 ] 

Ulrik commented on KAFKA-13638:
-------------------------------

I tried the following combination of versions:


 * streams 2.8.1 & rocksdb 6.0.1 : fast
 * streams 2.8.1 & rocksdb 6.1.1 : fast
 * streams 2.8.1 & rocksdb 6.2.2 : fast
 * streams 2.8.1 & rocksdb 6.3.6 : fast
 * streams 2.8.1 & rocksdb 6.4.6 : fast
 * streams 2.8.1 & rocksdb 6.5.2 : fast
 * streams 2.8.1 & rocksdb 6.6.4 : fast
 * streams 2.8.1 & rocksdb 6.7.3 : fast
 * streams 2.8.1 & rocksdb 6.11.6 : fast
 * streams 2.8.1 & rocksdb 6.12.7 : fast
 * streams 2.8.1 & rocksdb >= 6.13.3 : Runtime error:

 
{code:java}
org.apache.kafka.streams.errors.ProcessorStateException: Error opening store 
table at location 
/var/folders/_4/ks5l_9vj6zbfbpf5w6180t3j8kb4pd/T/kafka-streams/dummy-a36d66ed-ba41-4e6d-884f-3c2a652d245d/0_0/rocksdb/table
    at 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:87)
    at 
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:186)
    at 
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:254)
    at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
    at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:55)
    at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
    at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:75)
    at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
    at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$1(MeteredKeyValueStore.java:122)
    at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
    at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:122)
    at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:201)
    at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:103)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:216)
    at 
org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:537)
    at 
org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:385)
    at 
org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:313)
    at 
org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:272)
    at kafkatest.KafkaTest.validateTopologyCanProcessData(KafkaTest.java:79)
    at kafkatest.KafkaTest.multipleForwardsFromTransformer(KafkaTest.java:69)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
    at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
    at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at 
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at 
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
    at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
    at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
    at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
    at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
    at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
    at 
org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at 
org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
    at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
    at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
    at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
    at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
    at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
    at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
    at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
    at 
org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
    at 
org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
    at 
com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
    at 
com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
    at 
com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
    at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
    at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: org.rocksdb.RocksDBException: Column family not found: 
keyValueWithTimestamp
    at org.rocksdb.RocksDB.open(Native Method)
    at org.rocksdb.RocksDB.open(RocksDB.java:304)
    at 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:75)
    ... 88 more {code}
 * streams 3.0.0 & rocksdb 6.13.3 : slow
 * streams 3.0.0 & rocksdb <= 6.13.3 : Same error as above

 

Not sure this reveals any more info on where the problem is. Too bad I cant run 
streams 2.8 with rocksdb above 6.12 or streams 3.0 with rocksdb below 6.13

> Slow KTable update when forwarding multiple values from transformer
> -------------------------------------------------------------------
>
>                 Key: KAFKA-13638
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13638
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.1.0, 3.0.0
>            Reporter: Ulrik
>            Priority: Major
>         Attachments: KafkaTest.java
>
>
> I have a topology where I stream messages from an input topic, transform the 
> message to multiple messages (via context.forward), and then store those 
> messages in a KTable.
> Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my 
> tests take significantly longer time to run. 
>  
> I have attached a test class to demonstrate my scenario. When running this 
> test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
> numbers:
>  
> *Version 2.8.1*
>  * one input message and one output message: 541 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 919 ms
>  
> *Version 3.1.0*
>  * one input message and one output message: 908 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 6 sec 94 ms
>  
> Even when the transformer just transforms and forwards one input message to 
> one output message, the test takes approx. 400 ms longer to run.
> When transforming 8 input messages to 240 output messages it takes approx 5 
> seconds longer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to