[jira] [Created] (KAFKA-7493) Rewrite test_broker_type_bounce_at_start
John Roesler created KAFKA-7493: --- Summary: Rewrite test_broker_type_bounce_at_start Key: KAFKA-7493 URL: https://issues.apache.org/jira/browse/KAFKA-7493 Project: Kafka Issue Type: Improvement Components: streams, system tests Reporter: John Roesler Currently, the test test_broker_type_bounce_at_start in streams_broker_bounce_test.py is ignored. As written, there are a couple of race conditions that lead to flakiness. It should be possible to re-write the test to wait on log messages, as the other tests do, instead of just sleeping to more deterministically transition the test from one state to the next. Once the test is fixed, the fix should be back-ported to all prior branches, back to 0.10. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7539) ConsumerBounceTest.testClose transient failure
John Roesler created KAFKA-7539: --- Summary: ConsumerBounceTest.testClose transient failure Key: KAFKA-7539 URL: https://issues.apache.org/jira/browse/KAFKA-7539 Project: Kafka Issue Type: Sub-task Reporter: John Roesler Assignee: Fangmin Lv Fix For: 0.9.0.0 {code} kafka.api.ConsumerBounceTest > testSeekAndCommitWithBrokerFailures FAILED java.lang.AssertionError: expected:<1000> but was:<976> at org.junit.Assert.fail(Assert.java:92) at org.junit.Assert.failNotEquals(Assert.java:689) at org.junit.Assert.assertEquals(Assert.java:127) at org.junit.Assert.assertEquals(Assert.java:514) at org.junit.Assert.assertEquals(Assert.java:498) at kafka.api.ConsumerBounceTest.seekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:117) at kafka.api.ConsumerBounceTest.testSeekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:98) kafka.api.ConsumerBounceTest > testSeekAndCommitWithBrokerFailures FAILED java.lang.AssertionError: expected:<1000> but was:<913> at org.junit.Assert.fail(Assert.java:92) at org.junit.Assert.failNotEquals(Assert.java:689) at org.junit.Assert.assertEquals(Assert.java:127) at org.junit.Assert.assertEquals(Assert.java:514) at org.junit.Assert.assertEquals(Assert.java:498) at kafka.api.ConsumerBounceTest.seekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:117) at kafka.api.ConsumerBounceTest.testSeekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:98) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7539) ConsumerBounceTest.testClose transient failure
[ https://issues.apache.org/jira/browse/KAFKA-7539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-7539. - Resolution: Invalid Reviewer: (was: Jason Gustafson) Cloning to create this issue was a mistake. I'm going to create a fresh one. > ConsumerBounceTest.testClose transient failure > -- > > Key: KAFKA-7539 > URL: https://issues.apache.org/jira/browse/KAFKA-7539 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Major > Labels: flaky-test > > {code:java} > java.lang.ArrayIndexOutOfBoundsException: -1 > at > kafka.integration.KafkaServerTestHarness.killBroker(KafkaServerTestHarness.scala:146) > at > kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:238) > at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:211) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117) > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155) > at > org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConne
[jira] [Created] (KAFKA-7540) Transient failure: kafka.api.ConsumerBounceTest.testClose
John Roesler created KAFKA-7540: --- Summary: Transient failure: kafka.api.ConsumerBounceTest.testClose Key: KAFKA-7540 URL: https://issues.apache.org/jira/browse/KAFKA-7540 Project: Kafka Issue Type: Bug Reporter: John Roesler Observed on Java 8: [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/17314/testReport/junit/kafka.api/ConsumerBounceTest/testClose/] Stacktrace: {noformat} java.lang.ArrayIndexOutOfBoundsException: -1 at kafka.integration.KafkaServerTestHarness.killBroker(KafkaServerTestHarness.scala:146) at kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:238) at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:211) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117) at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137) at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404) at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPoli
[jira] [Created] (KAFKA-7541) Transient Failure: kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable
John Roesler created KAFKA-7541: --- Summary: Transient Failure: kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable Key: KAFKA-7541 URL: https://issues.apache.org/jira/browse/KAFKA-7541 Project: Kafka Issue Type: Bug Reporter: John Roesler Observed on Java 11: [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/264/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/testUncleanLeaderElectionEnable/] Stacktrace: {noformat} java.lang.AssertionError: Unclean leader not elected at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable(DynamicBrokerReconfigurationTest.scala:487) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnec
[jira] [Created] (KAFKA-7542) Transient Failure: kafka.server.GssapiAuthenticationTest.testServerAuthenticationFailure
John Roesler created KAFKA-7542: --- Summary: Transient Failure: kafka.server.GssapiAuthenticationTest.testServerAuthenticationFailure Key: KAFKA-7542 URL: https://issues.apache.org/jira/browse/KAFKA-7542 Project: Kafka Issue Type: Bug Reporter: John Roesler Observed on java 11: [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/264/testReport/junit/kafka.server/GssapiAuthenticationTest/testServerAuthenticationFailure/] at [https://github.com/apache/kafka/pull/5795/commits/5bdcd0e023c6f406d585155399f6541bb6a9f9c2] Stacktrace: {noformat} org.scalatest.junit.JUnitTestFailedError: at org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:100) at org.scalatest.junit.JUnitSuite.newAssertionFailedException(JUnitSuite.scala:71) at org.scalatest.Assertions$class.fail(Assertions.scala:1075) at org.scalatest.junit.JUnitSuite.fail(JUnitSuite.scala:71) at kafka.server.GssapiAuthenticationTest.testServerAuthenticationFailure(GssapiAuthenticationTest.scala:128) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.i
[jira] [Created] (KAFKA-7544) Transient Failure: org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFails
John Roesler created KAFKA-7544: --- Summary: Transient Failure: org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFails Key: KAFKA-7544 URL: https://issues.apache.org/jira/browse/KAFKA-7544 Project: Kafka Issue Type: Bug Components: streams Reporter: John Roesler Observed on Java 11: [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/264/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/shouldNotViolateEosIfOneTaskFails/] at [https://github.com/apache/kafka/pull/5795/commits/5bdcd0e023c6f406d585155399f6541bb6a9f9c2] stacktrace: {noformat} java.lang.AssertionError: Expected: <[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 2), KeyValue(0, 3), KeyValue(0, 4), KeyValue(0, 5), KeyValue(0, 6), KeyValue(0, 7), KeyValue(0, 8), KeyValue(0, 9)]> but: was <[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 2), KeyValue(0, 3), KeyValue(0, 4), KeyValue(0, 5), KeyValue(0, 6), KeyValue(0, 7), KeyValue(0, 8), KeyValue(0, 9), KeyValue(0, 10), KeyValue(0, 11), KeyValue(0, 12), KeyValue(0, 13), KeyValue(0, 14)]> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8) at org.apache.kafka.streams.integration.EosIntegrationTest.checkResultPerKey(EosIntegrationTest.java:218) at org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFails(EosIntegrationTest.java:360) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117) at java.base/jdk.internal.reflect.NativeMethodAccessorImp
[jira] [Created] (KAFKA-7553) Jenkins PR tests hung
John Roesler created KAFKA-7553: --- Summary: Jenkins PR tests hung Key: KAFKA-7553 URL: https://issues.apache.org/jira/browse/KAFKA-7553 Project: Kafka Issue Type: Bug Reporter: John Roesler Attachments: consoleText.txt I wouldn't worry about this unless it continues to happen, but I wanted to document it. This was a Java 11 build: [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/266/] It was for this PR: [https://github.com/apache/kafka/pull/5795] And this commit: [https://github.com/apache/kafka/pull/5795/commits/5bdcd0e023c6f406d585155399f6541bb6a9f9c2] It looks like the tests just hung after 46 minutes, until the build timed out at 180 minutes. End of the output: {noformat} ... 00:46:27.275 kafka.server.ServerGenerateBrokerIdTest > testConsistentBrokerIdFromUserConfigAndMetaProps STARTED 00:46:29.775 00:46:29.775 kafka.server.ServerGenerateBrokerIdTest > testConsistentBrokerIdFromUserConfigAndMetaProps PASSED 03:00:51.124 Build timed out (after 180 minutes). Marking the build as aborted. 03:00:51.440 Build was aborted 03:00:51.492 [FINDBUGS] Skipping publisher since build result is ABORTED 03:00:51.492 Recording test results 03:00:51.495 Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1 03:00:58.017 Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1 03:00:59.330 Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1 03:00:59.331 Adding one-line test results to commit status... 03:00:59.332 Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1 03:00:59.334 Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1 03:00:59.335 Setting status of 5bdcd0e023c6f406d585155399f6541bb6a9f9c2 to FAILURE with url https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/266/ and message: 'FAILURE 03:00:59.335 9053 tests run, 1 skipped, 0 failed.' 03:00:59.335 Using context: JDK 11 and Scala 2.12 03:00:59.541 Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1 03:00:59.542 Finished: ABORTED{noformat} I did find one test that started but did not finish: {noformat} 00:23:29.576 kafka.api.PlaintextConsumerTest > testLowMaxFetchSizeForRequestAndPartition STARTED {noformat} But note that the tests continued to run for another 23 minutes after this one started. Just for completeness, there were 4 failures: {noformat} 00:22:06.875 kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsNotExistingGroup FAILED 00:22:06.875 java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 00:22:06.875 at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) 00:22:06.875 at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) 00:22:06.875 at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) 00:22:06.876 at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262) 00:22:06.876 at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:307) 00:22:06.876 at kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup(ResetConsumerGroupOffsetTest.scala:89) 00:22:06.876 00:22:06.876 Caused by: 00:22:06.876 org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.{noformat} {noformat} 00:25:22.175 kafka.api.CustomQuotaCallbackTest > testCustomQuotaCallback FAILED 00:25:22.175 java.lang.AssertionError: Partition [group1_largeTopic,69] metadata not propagated after 15000 ms 00:25:22.176 at kafka.utils.TestUtils$.fail(TestUtils.scala:351) 00:25:22.176 at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:741) 00:25:22.176 at kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:831) 00:25:22.176 at kafka.utils.TestUtils$$anonfun$createTopic$2.apply(TestUtils.scala:330) 00:25:22.176 at kafka.utils.TestUtils$$anonfun$createTopic$2.apply(TestUtils.scala:329) 00:25:22.176 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 00:25:22.176 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 00:25:22.176 at scala.collection.Iterator$class.foreach(Iterator.scala:891) 00:25:22.176 at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 00:25:22.176 at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174) 00:25:22.176 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 00:25:22.176 at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) 00:25:22.176 at scala.collection.SetLike$class.map(SetLike.scala:92) 00:25:22.17
[jira] [Created] (KAFKA-7806) Windowed Aggregations should wrap default key serde if none is specified
John Roesler created KAFKA-7806: --- Summary: Windowed Aggregations should wrap default key serde if none is specified Key: KAFKA-7806 URL: https://issues.apache.org/jira/browse/KAFKA-7806 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler In Streams, windowing a stream by either time or session windows causes the stream's keys to be transformed from `K` to `Windowed`. Since this is a well defined transition, it's not necessary for developers to explicitly provide a `Serde>`. For convenience, Streams, which already knows the key serde (`Serde`) automatically wraps it in case it's needed by downstream operators. However, this automatic wrapping only takes place if the key serde has been explicitly provided in the topology. If the topology relies on the `default.key.serde` configuration, no wrapping takes place, and downstream operators will encounter a ClassCastException trying to cast a `Windowed` (the windowed key) to whatever type the default serde handles (which is the key wrapped inside the windowed key). Specifically, they key serde forwarding logic is: in `org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl`: `materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null` and in `org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImpl`: `materializedInternal.keySerde() != null ? new WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null` This pattern of not "solidifying" the default key serde is common in Streams. Not all operators need a serde, and the default serde may not be applicable to all operators. So, it would be a mistake to arbitrary operators to grab the default serde and pass it downstream as if it had been explicitly set. However, in this case specifically, all windowed aggregations are stateful, so if we don't have an explicit key serde at this point, we know that we have used the default serde in the window store. If the default serde were incorrect, an exception would be thrown by the windowed aggregation itself. So it actually is safe to wrap the default serde in a windowed serde and pass it downstream, which would result in a better development experience. Unfortunately, the default serde is set via config, but the windowed serde wrapping happens during DSL building, when the config is not generally available. Therefore, we would need a special windowed serde wrapper that signals that it wraps the default serde, which would be fully resolved during operators' init call. For example, something of this nature: `materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : FullTimeWindowedSerde.wrapDefault(windows.size())` etc. Complicating the situation slightly, all the windowed serializers and deserializers will resolve a runtime inner class using `default.windowed.key.serde.inner` if given a null inner serde to wrap. However, at this point in the topology build, we do know that the windowed aggregation has specifically used the `default.key.serde`, not the `default.windowed.key.serde.inner` to persist its state to the window store, therefore, it should be correct to wrap the default key serde specifically and not use the `default.windowed.key.serde.inner`. In addition to fixing this for TimeWindowed and SessionWindowed streams, we need to have good test coverage of the new code. There is clearly a blind spot in the tests, or we would have noticed this sooner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (KAFKA-7741) Bad dependency via SBT
[ https://issues.apache.org/jira/browse/KAFKA-7741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reopened KAFKA-7741: - > Bad dependency via SBT > -- > > Key: KAFKA-7741 > URL: https://issues.apache.org/jira/browse/KAFKA-7741 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0, 2.0.1, 2.1.0 > Environment: Windows 10 professional, IntelliJ IDEA 2017.1 >Reporter: sacha barber >Assignee: John Roesler >Priority: Major > Fix For: 2.2.0, 2.1.1, 2.0.2 > > > I am using the Kafka-Streams-Scala 2.1.0 JAR. > And if I create a new Scala project using SBT with these dependencies > {code} > name := "ScalaKafkaStreamsDemo" > version := "1.0" > scalaVersion := "2.12.1" > libraryDependencies += "org.apache.kafka" %% "kafka" % "2.0.0" > libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.0" > libraryDependencies += "org.apache.kafka" % "kafka-streams" % "2.0.0" > libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.0.0" > //TEST > libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % Test > libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % > "2.0.0" % Test > {code} > I get this error > > {code} > SBT 'ScalaKafkaStreamsDemo' project refresh failed > Error:Error while importing SBT project:...[info] Resolving > jline#jline;2.14.1 ... > [warn] [FAILED ] > javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}: (0ms) > [warn] local: tried > [warn] > C:\Users\sacha\.ivy2\local\javax.ws.rs\javax.ws.rs-api\2.1.1\${packaging.type}s\javax.ws.rs-api.${packaging.type} > [warn] public: tried > [warn] > https://repo1.maven.org/maven2/javax/ws/rs/javax.ws.rs-api/2.1.1/javax.ws.rs-api-2.1.1.${packaging.type} > [info] downloading > https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-test-utils/2.1.0/kafka-streams-test-utils-2.1.0.jar > ... > [info] [SUCCESSFUL ] > org.apache.kafka#kafka-streams-test-utils;2.1.0!kafka-streams-test-utils.jar > (344ms) > [warn] :: > [warn] :: FAILED DOWNLOADS :: > [warn] :: ^ see resolution messages for details ^ :: > [warn] :: > [warn] :: javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type} > [warn] :: > [trace] Stack trace suppressed: run 'last *:ssExtractDependencies' for the > full output. > [trace] Stack trace suppressed: run 'last *:update' for the full output. > [error] (*:ssExtractDependencies) sbt.ResolveException: download failed: > javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type} > [error] (*:update) sbt.ResolveException: download failed: > javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type} > [error] Total time: 8 s, completed 16-Dec-2018 19:27:21 > Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=384M; > support was removed in 8.0See complete log in href="file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log">file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log > {code} > This seems to be a common issue with bad dependency from Kafka to > javax.ws.rs-api. > if I drop the Kafka version down to 2.0.0 and add this line to my SBT file > this error goes away > {code} > libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" > artifacts(Artifact("javax.ws.rs-api", "jar", "jar"))` > {code} > > However I would like to work with 2.1.0 version. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-4325) Improve processing of late records for window operations
[ https://issues.apache.org/jira/browse/KAFKA-4325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-4325. - Resolution: Fixed Fix Version/s: 2.1.0 As [~mjsax] noted, this need was satisfied by KAFKA-7222. > Improve processing of late records for window operations > > > Key: KAFKA-4325 > URL: https://issues.apache.org/jira/browse/KAFKA-4325 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Fix For: 2.1.0 > > > Windows are kept until their retention time passed. If a late arriving record > is processed that is older than any window kept, a new window is created > containing this single late arriving record, the aggregation is computed and > the window is immediately discarded afterward (as it is older than retention > time). > This behavior might case problems for downstream application as the original > window aggregate might we overwritten with the late single-record- aggregate > value. Thus, we should rather not process the late arriving record for this > case. > However, data loss might not be acceptable for all use cases. In order to > enable the use to not lose any data, window operators should allow to > register a handler function that is called instead of just dropping the late > arriving record. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (KAFKA-5697) StreamThread.shutdown() need to interrupt the stream threads to break the loop
[ https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reopened KAFKA-5697: - Assignee: John Roesler > StreamThread.shutdown() need to interrupt the stream threads to break the loop > -- > > Key: KAFKA-5697 > URL: https://issues.apache.org/jira/browse/KAFKA-5697 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: John Roesler >Priority: Major > Labels: newbie > Fix For: 2.0.0 > > > In {{StreamThread.shutdown()}} we currently do nothing but set the state, > hoping the stream thread may eventually check it and shutdown itself. > However, under certain scenarios the thread may get blocked within a single > loop and hence will never check on this state enum. For example, it's > {{consumer.poll}} call trigger {{ensureCoordinatorReady()}} which will block > until the coordinator can be found. If the coordinator broker is never up and > running then the Stream instance will be blocked forever. > A simple way to produce this issue is to start the work count demo without > starting the ZK / Kafka broker, and then it will get stuck in a single loop > and even `ctrl-C` will not stop it since its set state will never be read by > the thread: > {code:java} > [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6783) consumer poll(timeout) blocked infinitely when no available bootstrap server
[ https://issues.apache.org/jira/browse/KAFKA-6783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-6783. - Resolution: Duplicate Hey [~koqizhao], We've merged [https://github.com/apache/kafka/commit/c470ff70d3e829c8b12f6eb6cc812c4162071a1f] under KAFKA-5697, which should fix this issue. In retrospect, your ticket would have been a more appropriate scope for the work, but it's too late to change the commit title now. I'm marking this ticket as a duplicate just to provide a pointer to the ticket under which the issue is fixed. If you feel the issue still persists in some form, please feel free to re-open the ticket or start a new one. Also note, there is a similar issue with several other consumer methods, which we plan to fix under KIP-266. Poll is likely to be the only one to make it into 2.0, though. Thanks, -John > consumer poll(timeout) blocked infinitely when no available bootstrap server > > > Key: KAFKA-6783 > URL: https://issues.apache.org/jira/browse/KAFKA-6783 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.0 >Reporter: Qiang Zhao >Priority: Major > Labels: features > Fix For: 2.0.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > {code:java} > @Test > public void testPollWithAllBootstrapServersDown() throws Exception { > ExecutorService executor = Executors.newSingleThreadExecutor(); > try { > final long pollTimeout = 1000; > final AtomicBoolean pollComplete = new AtomicBoolean(); > executor.submit(new Runnable() { > @Override > public void run() { > Properties props = new Properties(); > props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:29092"); > try (KafkaConsumer consumer = > newConsumer(props)) { > consumer.subscribe(Arrays.asList(topic)); > try { > consumer.poll(pollTimeout); > } catch (Exception ex) { > ex.printStackTrace(); > } finally { > pollComplete.set(true); > } > } > } > }); > Thread.sleep(pollTimeout * 2); > Assert.assertTrue("poll timeout not work when all servers down", > pollComplete.get()); > } finally { > executor.shutdown(); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6978) Make Streams Window retention time strict
John Roesler created KAFKA-6978: --- Summary: Make Streams Window retention time strict Key: KAFKA-6978 URL: https://issues.apache.org/jira/browse/KAFKA-6978 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Currently, the configured retention time for windows is a lower bound. We actually keep the window around until it's time to roll a new segment. At that time, we drop all windows in the oldest segment. As long as a window is still in a segment, we will continue to add late-arriving records to it and also serve IQ queries from it. This is sort of nice, because it makes optimistic use of the fact that the windows live for some time after their retention expires. However, it is also a source of (apparent) non-determinism, and it's arguably better for programability if we adhere strictly to the configured constraints. Therefore, the new behavior will be: * once the retention time for a window passes, Streams will drop any later-arriving records (with a warning log and a metric) * likewise, IQ will first check whether the window is younger than its retention time before answering queries. No changes need to be made to the underlying segment management, this is purely to make the behavior more strict wrt the configuration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7000) KafkaConsumer.position should wait for assignment metadata
John Roesler created KAFKA-7000: --- Summary: KafkaConsumer.position should wait for assignment metadata Key: KAFKA-7000 URL: https://issues.apache.org/jira/browse/KAFKA-7000 Project: Kafka Issue Type: Improvement Reporter: John Roesler Assignee: John Roesler While updating Kafka Streams to stop using the deprecated Consumer.poll(long), I found that this code unexpectedly throws an exception: {code:java} consumer.subscribe(topics); // consumer.poll(0); <- I've removed this line, which shouldn't be necessary here. final Set partitions = new HashSet<>(); for (final String topic : topics) { for (final PartitionInfo partition : consumer.partitionsFor(topic)) { partitions.add(new TopicPartition(partition.topic(), partition.partition())); } } for (final TopicPartition tp : partitions) { final long offset = consumer.position(tp); committedOffsets.put(tp, offset); }{code} Here is the exception: {code:java} Exception in thread "main" java.lang.IllegalStateException: You can only check the position for partitions assigned to this consumer. at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1620) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1586) at org.apache.kafka.streams.tests.EosTestDriver.getCommittedOffsets(EosTestDriver.java:275) at org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:148) at org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:69){code} As you can see in the commented code in my snippet, we used to block for assignment with a poll(0), which is now deprecated. It seems reasonable to me for position() to do the same thing that poll() does, which is call `coordinator.poll(timeout.toMillis())` early in processing to ensure an up-to-date assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7072) Kafka Streams may drop rocksb window segments before they expire
John Roesler created KAFKA-7072: --- Summary: Kafka Streams may drop rocksb window segments before they expire Key: KAFKA-7072 URL: https://issues.apache.org/jira/browse/KAFKA-7072 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.1, 1.1.0, 0.11.0.2, 1.0.0, 0.11.0.1, 0.11.0.0, 2.0.0 Reporter: John Roesler Assignee: John Roesler Fix For: 2.1.0 The current implementation of Segments used by Rocks Session and Time window stores is in conflict with our current timestamp management model. The current segmentation approach allows configuration of a fixed number of segments (let's say *4*) and a fixed retention time. We essentially divide up the retention time into the available number of segments: {quote}{{<-|-|}} {{ expiration date right now}} {{ \---retention time/}} {{ | seg 0 | seg 1 | seg 2 | seg 3 |}} {quote} Note that we keep one extra segment so that we can record new events, while some events in seg 0 are actually expired (but we only drop whole segments, so they just get to hang around. {quote}{{<-|-|}} {{ expiration date right now}} {{ \---retention time/}} {{ | seg 0 | seg 1 | seg 2 | seg 3 |}} {quote} When it's time to provision segment 4, we know that segment 0 is completely expired, so we drop it: {quote}{{<---|-|}} {{ expiration date right now}} {{ \---retention time/}} {{ | seg 1 | seg 2 | seg 3 | seg 4 |}} {quote} However, the current timestamp management model allows for records from the future. Namely, because we define stream time as the minimum buffered timestamp (nondecreasing), we can have a buffer like this: [ 5, 2, 6 ], and our stream time will be 2, but we'll handle a record with timestamp 5 next. referring to the example, this means we could wind up having to provision segment 4 before segment 0 expires! Let's say "f" is our future event: {quote}{{<---|-|f}} {{ expiration date right now}} {{ \---retention time/}} {{ | seg 1 | seg 2 | seg 3 | seg 4 |}} {quote} {{}}Should we drop segment 0 prematurely? Or should we crash and refuse to process "f"? Today, we do the former, and this is probably the better choice. If we refuse to process "f", then we cannot make progress ever again. Dropping segment 0 prematurely is a bummer, but users could also set the retention time high enough that they don't think they'll actually get any events late enough to need segment 0. Worst case, since we can have many future events without advancing stream time, sparse enough to each require their own segment, which would eat deeply into the retention time, dropping many segments that should be live. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7080) WindowStoreBuilder incorrectly initializes CachingWindowStore
John Roesler created KAFKA-7080: --- Summary: WindowStoreBuilder incorrectly initializes CachingWindowStore Key: KAFKA-7080 URL: https://issues.apache.org/jira/browse/KAFKA-7080 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.1, 1.1.0, 1.0.0, 2.0.0 Reporter: John Roesler Assignee: John Roesler Fix For: 2.1.0 When caching is enabled on the WindowStoreBuilder, it creates a CachingWindowStore. However, it incorrectly passes storeSupplier.segments() (the number of segments) to the segmentInterval argument. The impact is low, since any valid number of segments is also a valid segment size, but it likely results in much smaller segments than intended. For example, the segments may be sized 3ms instead of 60,000ms. Ideally the WindowBytesStoreSupplier interface would allow suppliers to advertise their segment size instead of segment count. I plan to create a KIP to propose this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6978) Make Streams Window retention time strict
[ https://issues.apache.org/jira/browse/KAFKA-6978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-6978. - Resolution: Fixed Fix Version/s: 2.1.0 This feature was merged in https://github.com/apache/kafka/commit/954be11bf2d3dc9fa11a69830d2ef5ff580ff533 > Make Streams Window retention time strict > - > > Key: KAFKA-6978 > URL: https://issues.apache.org/jira/browse/KAFKA-6978 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.1.0 > > > Currently, the configured retention time for windows is a lower bound. We > actually keep the window around until it's time to roll a new segment. At > that time, we drop all windows in the oldest segment. > As long as a window is still in a segment, we will continue to add > late-arriving records to it and also serve IQ queries from it. This is sort > of nice, because it makes optimistic use of the fact that the windows live > for some time after their retention expires. However, it is also a source of > (apparent) non-determinism, and it's arguably better for programability if we > adhere strictly to the configured constraints. > Therefore, the new behavior will be: > * once the retention time for a window passes, Streams will drop any > later-arriving records (with a warning log and a metric) > * likewise, IQ will first check whether the window is younger than its > retention time before answering queries. > No changes need to be made to the underlying segment management, this is > purely to make the behavior more strict wrt the configuration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7106) Remove segment/segmentInterval from Window definition
John Roesler created KAFKA-7106: --- Summary: Remove segment/segmentInterval from Window definition Key: KAFKA-7106 URL: https://issues.apache.org/jira/browse/KAFKA-7106 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Currently, Window configures segment and segmentInterval properties, but these aren't truly properties of a window in general. Rather, they are properties of the particular implementation that we currently have: a segmented store. Therefore, these properties should be moved to configure only that implementation. This may be related to KAFKA-4730, since an in-memory window store wouldn't necessarily need to be segmented. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7107) Ability to configure state store for JoinWindows in KStream-KStream join
John Roesler created KAFKA-7107: --- Summary: Ability to configure state store for JoinWindows in KStream-KStream join Key: KAFKA-7107 URL: https://issues.apache.org/jira/browse/KAFKA-7107 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Currently, the KStream-KStream join operation internally provisions window stores to support the JoinWindow configuration. However, unlike all the other stateful processors, it does not allow configuration of the stores. We should consider adding DSL methods taking Materialized configs for these stores. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7080) WindowStoreBuilder incorrectly initializes CachingWindowStore
[ https://issues.apache.org/jira/browse/KAFKA-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-7080. - Resolution: Fixed Fixed in https://github.com/apache/kafka/pull/5257 > WindowStoreBuilder incorrectly initializes CachingWindowStore > - > > Key: KAFKA-7080 > URL: https://issues.apache.org/jira/browse/KAFKA-7080 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0, 1.0.1, 1.1.0, 2.0.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.1.0 > > > When caching is enabled on the WindowStoreBuilder, it creates a > CachingWindowStore. However, it incorrectly passes storeSupplier.segments() > (the number of segments) to the segmentInterval argument. > > The impact is low, since any valid number of segments is also a valid segment > size, but it likely results in much smaller segments than intended. For > example, the segments may be sized 3ms instead of 60,000ms. > > Ideally the WindowBytesStoreSupplier interface would allow suppliers to > advertise their segment size instead of segment count. I plan to create a KIP > to propose this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7161) KTable Reduce should check for invalid conditions
John Roesler created KAFKA-7161: --- Summary: KTable Reduce should check for invalid conditions Key: KAFKA-7161 URL: https://issues.apache.org/jira/browse/KAFKA-7161 Project: Kafka Issue Type: Improvement Reporter: John Roesler Assignee: John Roesler KTableReduce has the opportunity to explicitly check if the state is inconsistent with the oldValues arriving from the stream. If it did so, it could help detect topology changes that needed an app reset and fail fast before any data corruption occurs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7222) KIP-328: Add Window Grace Period (and deprecate Window Retention)
John Roesler created KAFKA-7222: --- Summary: KIP-328: Add Window Grace Period (and deprecate Window Retention) Key: KAFKA-7222 URL: https://issues.apache.org/jira/browse/KAFKA-7222 Project: Kafka Issue Type: Improvement Reporter: John Roesler Assignee: John Roesler As described in [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables] This ticket only covers the grace period portion of the work. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7223) KIP-328: Add in-memory Suppression
John Roesler created KAFKA-7223: --- Summary: KIP-328: Add in-memory Suppression Key: KAFKA-7223 URL: https://issues.apache.org/jira/browse/KAFKA-7223 Project: Kafka Issue Type: Improvement Reporter: John Roesler As described in [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.] This ticket is to implement Suppress, but only for in-memory buffers. (depends on KAFKA-7222) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7224) KIP-328: Add spill-to-disk for Suppression
John Roesler created KAFKA-7224: --- Summary: KIP-328: Add spill-to-disk for Suppression Key: KAFKA-7224 URL: https://issues.apache.org/jira/browse/KAFKA-7224 Project: Kafka Issue Type: Improvement Reporter: John Roesler Assignee: John Roesler As described in [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables] Following on KAFKA-7223, implement the spill-to-disk buffering strategy. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7267) KafkaStreams Scala DSL process method should accept a ProcessorSupplier
John Roesler created KAFKA-7267: --- Summary: KafkaStreams Scala DSL process method should accept a ProcessorSupplier Key: KAFKA-7267 URL: https://issues.apache.org/jira/browse/KAFKA-7267 Project: Kafka Issue Type: Improvement Reporter: John Roesler scala.KafkaStreams#process currently expects a ()=>Processor, which is semantically equivalent to a ProcessorSupplier, but it's the only such method to do so. All the similar methods in the Scala DSL take a Supplier like their Java counterparts. Note that on Scala 2.12+, SAM conversion allows callers to pass either a function or a supplier when the parameter is a ProcessorSupplier. (But if the parameter is a function, you must pass a function) But on scala 2.11-, callers will have to pass a function if the parameter is a function and a supplier if the parameter is a supplier. This means that currently, 2.11 users are confronted with an api that demands they construct suppliers for all the methods *except* process, which demands a function. Mitigating factor: we have some implicits available to convert a Function0 to a supplier, and we could add an implicit from ProcessorSupplier to Function0 to smooth over the API. What to do about it? We could just change the existing method to take a ProcessorSupplier instead. * 2.12+ users would not notice a difference during compilation, as SAM conversion would kick in. However, if they just swap in the new jar without recompiling, I think they'd get a MethodDefNotFound error. * 2.11- users would not be able to compile their existing code. They'd have to swap their function out for a ProcessorSupplier or pull the implicit conversion into scope. * Note that we can delay this action until we drop 2.11 support, and we would break no one. We could deprecate the existing method and add a new one taking a ProcessorSupplier. * All scala users would be able to compile their existing code and also swap in the new version at runtime. * Anyone explicitly passing a function would get a deprecation warning, though, regardless of SAM conversion or implicit conversion, since neither conversion won't kick in if there's actually a method overload expecting a function. This would drive everyone to explicitly create a supplier (unnecessarily) We could leave the existing method without deprecating it and add a new one taking a ProcessorSupplier. * All scala users would be able to compile their existing code and also swap in the new version at runtime. * There would be no unfortunate deprecation warnings. * The interface would list two process methods, which is untidy. * Once we drop 2.11 support, we would just drop the function variant. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7269) KStream.merge is not documented
John Roesler created KAFKA-7269: --- Summary: KStream.merge is not documented Key: KAFKA-7269 URL: https://issues.apache.org/jira/browse/KAFKA-7269 Project: Kafka Issue Type: Improvement Reporter: John Roesler If I understand the operator correctly, it should be documented as a stateless transformation at https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#stateless-transformations -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7270) Add latest releases to streams_upgrade_test.py
John Roesler created KAFKA-7270: --- Summary: Add latest releases to streams_upgrade_test.py Key: KAFKA-7270 URL: https://issues.apache.org/jira/browse/KAFKA-7270 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Assignee: John Roesler namely, 2.0, 1.1.1, 1.0.2, 0.11.3, 1.10.2.2 Note that the relevant older branches need to add the relevant bugfix versions for themselves and their ancestor versions. The 2.0 branch actually needs to add the 2.0 version! Trunk needs to add all the versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7271) Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers
John Roesler created KAFKA-7271: --- Summary: Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers Key: KAFKA-7271 URL: https://issues.apache.org/jira/browse/KAFKA-7271 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Assignee: John Roesler Fix in the oldest branch that ignores the test and cherry-pick forward. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7272) Fix ignored test in streams_upgrade_test.py: test_simple_upgrade_downgrade
John Roesler created KAFKA-7272: --- Summary: Fix ignored test in streams_upgrade_test.py: test_simple_upgrade_downgrade Key: KAFKA-7272 URL: https://issues.apache.org/jira/browse/KAFKA-7272 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Assignee: John Roesler Fix starting from the oldest version that ignores the test -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7275) Prototype lock-free metrics
John Roesler created KAFKA-7275: --- Summary: Prototype lock-free metrics Key: KAFKA-7275 URL: https://issues.apache.org/jira/browse/KAFKA-7275 Project: Kafka Issue Type: Improvement Components: metrics, streams Reporter: John Roesler Assignee: John Roesler Currently, we have to be a little conservative in how granularly we measure things to avoid heavy synchronization costs in the metrics. It should be possible to refactor the thread-safe implementation to use volatile and java.util.concurrent.atomic instead and realize a pretty large performance improvement. However, before investing too much time in it, we should run some benchmarks to gauge how much improvement we can expect. I'd propose to run the benchmarks on trunk with debug turned on, and then to just remove all synchronization and run again to get an upper-bound performance improvement. If the results are promising, we can start prototyping a lock-free implementation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7277) Migrate Streams API to Duration instead of longMs times
John Roesler created KAFKA-7277: --- Summary: Migrate Streams API to Duration instead of longMs times Key: KAFKA-7277 URL: https://issues.apache.org/jira/browse/KAFKA-7277 Project: Kafka Issue Type: Improvement Reporter: John Roesler Right now Streams API unversally represents time as ms-since-unix-epoch. There's nothing wrong, per se, with this, but Duration is more ergonomic for an API. What we don't want is to present a heterogeneous API, so we need to make sure the whole Streams API is in terms of Duration. Implementation note: Durations potentially worsen memory pressure and gc performance, so internally, we will still use longMs as the representation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7284) Producer getting fenced may cause Streams to shut down
John Roesler created KAFKA-7284: --- Summary: Producer getting fenced may cause Streams to shut down Key: KAFKA-7284 URL: https://issues.apache.org/jira/browse/KAFKA-7284 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.0.0 Reporter: John Roesler Assignee: John Roesler As part of the investigation, I will determine what other versions are affected. In StreamTask, we catch a `ProducerFencedException` and throw a `TaskMigratedException`. However, in this case, the `RecordCollectorImpl` is throwing a `StreamsException`, caused by `KafkaException` caused by `ProducerFencedException`. In response to a TaskMigratedException, we would rebalance, but when we get a StreamsException, streams shuts itself down. In other words, we intended to do a rebalance in response to a producer fence, but actually, we shut down (when the fence happens inside the record collector). Coincidentally, Guozhang noticed and fixed this in a recent PR: [https://github.com/apache/kafka/pull/5428/files#diff-4e5612eeba09dabf30d0b8430f269ff6] The scope of this ticket is to extract that fix and associated tests, and send a separate PR to trunk and 2.0, and also to determine what other versions, if any, are affected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7222) KIP-328: Add Window Grace Period (and deprecate Window Retention)
[ https://issues.apache.org/jira/browse/KAFKA-7222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-7222. - Resolution: Fixed Fix Version/s: 2.1.0 [https://github.com/apache/kafka/pull/5369] is merged > KIP-328: Add Window Grace Period (and deprecate Window Retention) > - > > Key: KAFKA-7222 > URL: https://issues.apache.org/jira/browse/KAFKA-7222 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.1.0 > > > As described in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables] > > This ticket only covers the grace period portion of the work. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7315) Streams serialization docs contain a broken link for Avro
John Roesler created KAFKA-7315: --- Summary: Streams serialization docs contain a broken link for Avro Key: KAFKA-7315 URL: https://issues.apache.org/jira/browse/KAFKA-7315 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler https://kafka.apache.org/documentation/streams/developer-guide/datatypes.html#avro -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6033) Kafka Streams does not work with musl-libc (alpine linux)
[ https://issues.apache.org/jira/browse/KAFKA-6033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-6033. - Resolution: Won't Fix This is unfortunately out of our hands. If you think I'm wrong about this, please reopen the ticket. Thanks, -John > Kafka Streams does not work with musl-libc (alpine linux) > - > > Key: KAFKA-6033 > URL: https://issues.apache.org/jira/browse/KAFKA-6033 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.11.0.0 > Environment: Alpine 3.6 >Reporter: Jeffrey Zampieron >Priority: Major > > Using the released version of kafka fails on alpine based images b/c of > rocksdb using the jni and failing to load. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-4988) JVM crash when running on Alpine Linux
[ https://issues.apache.org/jira/browse/KAFKA-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-4988. - Resolution: Won't Fix I think this issue is out of our hands. If you think there is something for us to do, please feel free to reopen the ticket and comment. Thanks, -John > JVM crash when running on Alpine Linux > -- > > Key: KAFKA-4988 > URL: https://issues.apache.org/jira/browse/KAFKA-4988 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Vincent Rischmann >Priority: Minor > > I'm developing my Kafka Streams application using Docker and I run my jars > using the official openjdk:8-jre-alpine image. > I'm just starting to use windowing and now the JVM crashes because of an > issue with RocksDB I think. > It's trivial to fix on my part, just use the debian jessie based image. > However, it would be cool if alpine was supported too since its docker images > are quite a bit less heavy > {quote} > Exception in thread "StreamThread-1" java.lang.UnsatisfiedLinkError: > /tmp/librocksdbjni3285995384052305662.so: Error loading shared library > ld-linux-x86-64.so.2: No such file or directory (needed by > /tmp/librocksdbjni3285995384052305662.so) > at java.lang.ClassLoader$NativeLibrary.load(Native Method) > at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941) > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824) > at java.lang.Runtime.load0(Runtime.java:809) > at java.lang.System.load(System.java:1086) > at > org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78) > at > org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56) > at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64) > at org.rocksdb.RocksDB.(RocksDB.java:35) > at org.rocksdb.Options.(Options.java:22) > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:115) > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:148) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62) > at > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x7f60f34ce088, pid=1, tid=0x7f60f3705ab0 > # > # JRE version: OpenJDK Runtime Environment (8.0_121-b13) (build 1.8.0_121-b13) > # Java VM: OpenJDK 64-Bit Server VM (25.121-b13 mixed mode linux-amd64 > compressed oops) > # Derivati
[jira] [Created] (KAFKA-7367) Verify that Streams never creates RocksDB stores unless they are needed
John Roesler created KAFKA-7367: --- Summary: Verify that Streams never creates RocksDB stores unless they are needed Key: KAFKA-7367 URL: https://issues.apache.org/jira/browse/KAFKA-7367 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler We have gotten some reports of Streams creating RocksDB stores unnecessarily for stateless processes. We can and should verify this doesn't happen by creating integration tests for *every* stateless operator that verify that after processing, the state directory is still empty. These tests could potentially be backported as far as we care to so that we can identify and fix potential unnecessary stores in past versions as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7368) Support joining Windowed KTables
John Roesler created KAFKA-7368: --- Summary: Support joining Windowed KTables Key: KAFKA-7368 URL: https://issues.apache.org/jira/browse/KAFKA-7368 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Currently, there is no good way to join two `KTable, V>`, aka windowed KTables. They are KTable, so they have a `join` operator available, but it currently will use a regular KeyValue store instead of a Windowed store, so it will grow without bound and new windows enter. One option is to convert both KTables into KStream, and join them (which is a windowed join), and then convert them back into KTables for further processing, but this is an awkward way to accomplish an apparently straightforward task. It should instead be possible to directly support it, but the trick will be to make it impossible to accidentally use a window store for normal (aka non-windowed) KTables. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7386) Streams Scala wrapper should not cache serdes
John Roesler created KAFKA-7386: --- Summary: Streams Scala wrapper should not cache serdes Key: KAFKA-7386 URL: https://issues.apache.org/jira/browse/KAFKA-7386 Project: Kafka Issue Type: Bug Reporter: John Roesler Assignee: John Roesler for example, [https://github.com/apache/kafka/blob/trunk/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala#L28] invokes Serdes.String() once and caches the result. However, the implementation of the String serde has a non-empty configure method that is variant in whether it's used as a key or value serde. So we won't get correct execution if we create one serde and use it for both keys and values. The fix is simple: change all the `val` declarations in scala.Serdes to `def`. Thanks to the referential transparency for parameterless methods in scala, no user-facing code will break. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7393) Consider making suppression node names independent of topologically numbered names.
John Roesler created KAFKA-7393: --- Summary: Consider making suppression node names independent of topologically numbered names. Key: KAFKA-7393 URL: https://issues.apache.org/jira/browse/KAFKA-7393 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler See [https://github.com/apache/kafka/pull/5567#discussion_r214188984] (please keep the discussion on this Jira ticket). There is an opportunity to use a slightly different naming strategy for suppression nodes so that they can be moved around the topology without upsetting other nodes' names. The main downside is that it might be confusing to have a separate naming strategy for just this one kind of node. Then again, this could create important opportunities for topology-compatible optimizations. At a higher design/implementation cost, but a lower ongoing complexity, we could consider this naming approach for all node types. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7399) FunctionConversions in Streams-Scala should be private
John Roesler created KAFKA-7399: --- Summary: FunctionConversions in Streams-Scala should be private Key: KAFKA-7399 URL: https://issues.apache.org/jira/browse/KAFKA-7399 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler FunctionConversions defines several implicit conversions for internal use, but the class was made public accidentally. We should deprecate it and replace it with an equivalent private class. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7402) Kafka Streams should implement AutoCloseable where appropriate
John Roesler created KAFKA-7402: --- Summary: Kafka Streams should implement AutoCloseable where appropriate Key: KAFKA-7402 URL: https://issues.apache.org/jira/browse/KAFKA-7402 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Various components in Streams have close methods but do not implement AutoCloseable. This means that they can't be used in try-with-resources blocks. Remedying that would simplify our tests and make life easier for users as well. KafkaStreams itself is a notable example of this, but we can take the opportunity to look for other components that make sense as AutoCloseable as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7435) Consider standardizing the config object pattern on interface/implementation.
John Roesler created KAFKA-7435: --- Summary: Consider standardizing the config object pattern on interface/implementation. Key: KAFKA-7435 URL: https://issues.apache.org/jira/browse/KAFKA-7435 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Fix For: 3.0.0 Currently, the majority of Streams's config objects are structured as a "external" builder class (with protected state) and an "internal" subclass exposing getters to the state. This is serviceable, but there is an alternative we can consider: to use an interface for the external API and the implementation class for the internal one. Advantages: * we could use private state, which improves maintainability * the setters and getters would all be defined in the same class, improving readability * users browsing the public API would be able to look at an interface that contains less extraneous internal details than the current class * there is more flexibility in implementation Alternatives * instead of external-class/internal-subclass, we could use an external *final* class with package-protected state and an internal accessor class (not a subclass, obviously). This would make it impossible for users to try and create custom subclasses of our config objects, which is generally not allowed already, but is currently a runtime class cast exception. Example implementation: [https://github.com/apache/kafka/pull/5677] This change would break binary, but not source, compatibility, so the earliest we could consider it is 3.0. To be clear, I'm *not* saying this *should* be done, just calling for a discussion. Otherwise, I'd make a KIP. Thoughts? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7477) Improve Streams close timeout semantics
John Roesler created KAFKA-7477: --- Summary: Improve Streams close timeout semantics Key: KAFKA-7477 URL: https://issues.apache.org/jira/browse/KAFKA-7477 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler See [https://github.com/apache/kafka/pull/5682#discussion_r221473451] The current timeout semantics are a little "magical": * 0 means to block forever * negative numbers cause the close to complete immediately without checking the state I think this would make more sense: * reject negative numbers * make 0 just signal and return immediately (after checking the state once) * if I want to wait "forever", I can use {{ofYears(1)}} or {{ofMillis(Long.MAX_VALUE)}} or some other intuitively "long enough to be forever" value instead of a magic value. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7393) Consider making suppression node names independent of topologically numbered names.
[ https://issues.apache.org/jira/browse/KAFKA-7393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-7393. - Resolution: Won't Fix We realized that Suppressed needs to support a name parameter in keeping with KIP-372, which provides a way to solve this problem. If you supply `withName` to Suppressed, it will be independent of topologically numbered names, as desired. > Consider making suppression node names independent of topologically numbered > names. > --- > > Key: KAFKA-7393 > URL: https://issues.apache.org/jira/browse/KAFKA-7393 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > > See [https://github.com/apache/kafka/pull/5567#discussion_r214188984] (please > keep the discussion on this Jira ticket). > There is an opportunity to use a slightly different naming strategy for > suppression nodes so that they can be moved around the topology without > upsetting other nodes' names. > The main downside is that it might be confusing to have a separate naming > strategy for just this one kind of node. > Then again, this could create important opportunities for topology-compatible > optimizations. > > At a higher design/implementation cost, but a lower ongoing complexity, we > could consider this naming approach for all node types. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7852) Add public version of EmbeddedKafkaCluster
John Roesler created KAFKA-7852: --- Summary: Add public version of EmbeddedKafkaCluster Key: KAFKA-7852 URL: https://issues.apache.org/jira/browse/KAFKA-7852 Project: Kafka Issue Type: Improvement Reporter: John Roesler Currently, Kafka Client and Streams applications do not have good support for writing integration tests. Streams added the TopologyTestDriver, which is a much more efficient approach for testing Streams applications, specifically, but it's still more in the domain of unit testing. For integration tests, the current state is that people import test artifacts from Kafka, which is not a well controlled public API. It might be possible to offer a shim implementation of Kafka for testing, but the API is so large and complicated that this seems like a huge effort that's not likely to achieve or maintain perfect fidelity. Therefore, it seems like the best thing would just be to clean up the EmbeddedKafkaCluster and offer it in a public test-utils module. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7852) Add public version of EmbeddedKafkaCluster
[ https://issues.apache.org/jira/browse/KAFKA-7852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-7852. - Resolution: Duplicate > Add public version of EmbeddedKafkaCluster > -- > > Key: KAFKA-7852 > URL: https://issues.apache.org/jira/browse/KAFKA-7852 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Priority: Major > > Currently, Kafka Client and Streams applications do not have good support for > writing integration tests. > Streams added the TopologyTestDriver, which is a much more efficient approach > for testing Streams applications, specifically, but it's still more in the > domain of unit testing. > > For integration tests, the current state is that people import test artifacts > from Kafka, which is not a well controlled public API. > > It might be possible to offer a shim implementation of Kafka for testing, but > the API is so large and complicated that this seems like a huge effort that's > not likely to achieve or maintain perfect fidelity. > Therefore, it seems like the best thing would just be to clean up the > EmbeddedKafkaCluster and offer it in a public test-utils module. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7223) KIP-328: Add in-memory Suppression
[ https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-7223. - Resolution: Fixed > KIP-328: Add in-memory Suppression > -- > > Key: KAFKA-7223 > URL: https://issues.apache.org/jira/browse/KAFKA-7223 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.1.0 > > > As described in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.] > > This ticket is to implement Suppress, but only for in-memory buffers. > (depends on KAFKA-7222) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7916) Streams store cleanup: unify wrapping
John Roesler created KAFKA-7916: --- Summary: Streams store cleanup: unify wrapping Key: KAFKA-7916 URL: https://issues.apache.org/jira/browse/KAFKA-7916 Project: Kafka Issue Type: Improvement Reporter: John Roesler Assignee: John Roesler The internal store handling in Streams has become quite complex, with many layers of wrapping for different bookeeping operations. The first thing we can do about this is to at least unify the wrapping strategy, such that *all* store wrappers extend WrappedStateStore. This would make the code easier to understand, since all wrappers would have the same basic shape. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7917) Streams store cleanup: collapse layers
John Roesler created KAFKA-7917: --- Summary: Streams store cleanup: collapse layers Key: KAFKA-7917 URL: https://issues.apache.org/jira/browse/KAFKA-7917 Project: Kafka Issue Type: Improvement Reporter: John Roesler Following on KAFKA-7916, we can consider collapsing the "streams management layers" into one. Right now, we have: * metering (also handles moving from pojo world to bytes world) * change-logging * caching This is good compositional style, but we also have some runtime overhead of calling through all these layers, as well as some mental overhead of understanding how many and which layers we are going through. Also, there are dependencies between the caching and change-logging layers. I _think_ it would simplify the code if we collapsed these into one layer with boolean switches to turn on or off the different aspects. (rather than wrapping the store with the different layers or not depending on the same boolean conditions) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters
John Roesler created KAFKA-7918: --- Summary: Streams store cleanup: inline byte-store generic parameters Key: KAFKA-7918 URL: https://issues.apache.org/jira/browse/KAFKA-7918 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Currently, the fundamental layer of stores in Streams is the "bytes store". The easiest way to identify this is in `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a `XXBytesStoreSupplier`. We provide several implementations of these bytes stores, typically an in-memory one and a persistent one (aka RocksDB). Inside these bytes stores, the key is always `Bytes` and the value is always `byte[]` (serialization happens at a higher level). However, the store implementations are generically typed, just `K` and `V`. This is good for flexibility, but it makes the code a little harder to understand. I think that we used to do serialization at a lower level, so the generics are a hold-over from that. It would simplify the code if we just inlined the actual k/v types and maybe even renamed the classes from (e.g.) `InMemoryKeyValueStore` to `InMemoryKeyValueBytesStore`, and so forth. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7919) Reorganize Stores builders
John Roesler created KAFKA-7919: --- Summary: Reorganize Stores builders Key: KAFKA-7919 URL: https://issues.apache.org/jira/browse/KAFKA-7919 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler I have heard complaints from a few people that they find the whole process of using `Materialized`, `Stores`, `StoreBuilder`s, and `StoreSupplier`s confusing. I think it would help if we separated Stores into separate StoreBuilders and BytesStoreSuppliers factory classes. Or maybe even break the suppliers factory down further into `KeyValueBytesStoreSuppliers`, etc. Then, the javadocs in `Materialized.as` can point people to the right factory class to use, and, when they get there, they'll only be confronted with options they can make use of. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7917) Streams store cleanup: collapse layers
[ https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-7917. - Resolution: Won't Fix > Streams store cleanup: collapse layers > -- > > Key: KAFKA-7917 > URL: https://issues.apache.org/jira/browse/KAFKA-7917 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > Following on KAFKA-7916, we can consider collapsing the "streams management > layers" into one. > Right now, we have: > * metering (also handles moving from pojo world to bytes world) > * change-logging > * caching > This is good compositional style, but we also have some runtime overhead of > calling through all these layers, as well as some mental overhead of > understanding how many and which layers we are going through. > Also, there are dependencies between the caching and change-logging layers. > I _think_ it would simplify the code if we collapsed these into one layer > with boolean switches to turn on or off the different aspects. (rather than > wrapping the store with the different layers or not depending on the same > boolean conditions) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7928) Deprecate WindowStore.put(key, value)
John Roesler created KAFKA-7928: --- Summary: Deprecate WindowStore.put(key, value) Key: KAFKA-7928 URL: https://issues.apache.org/jira/browse/KAFKA-7928 Project: Kafka Issue Type: Improvement Reporter: John Roesler Specifically, `org.apache.kafka.streams.state.WindowStore#put(K, V)` This method is strange... A window store needs to have a timestamp associated with the key, so if you do a put without a timestamp, it's up to the store to just make one up. Even the javadoc on the method recommends not to use it, due to this confusing behavior. We should just deprecate it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7916) Streams store cleanup: unify wrapping
[ https://issues.apache.org/jira/browse/KAFKA-7916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-7916. - Resolution: Fixed > Streams store cleanup: unify wrapping > - > > Key: KAFKA-7916 > URL: https://issues.apache.org/jira/browse/KAFKA-7916 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > The internal store handling in Streams has become quite complex, with many > layers of wrapping for different bookeeping operations. > The first thing we can do about this is to at least unify the wrapping > strategy, such that *all* store wrappers extend WrappedStateStore. This would > make the code easier to understand, since all wrappers would have the same > basic shape. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7932) Streams needs to handle new Producer exceptions
John Roesler created KAFKA-7932: --- Summary: Streams needs to handle new Producer exceptions Key: KAFKA-7932 URL: https://issues.apache.org/jira/browse/KAFKA-7932 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Following on KAFKA-7763, Streams needs to handle the new behavior. See also https://github.com/apache/kafka/pull/6066 Streams code (StreamTask.java) needs to be modified to handle the new exception. Also, from another upstream change, `initTxn` can also throw TimeoutException now: default `MAX_BLOCK_MS_CONFIG` in producer is 60 seconds, so I think just wrapping it as StreamsException should be reasonable, similar to what we do for `producer#send`'s TimeoutException ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L220-L225] ). Note we need to handle in three functions: init/commit/abortTxn. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7943) Add Suppress system test with caching disabled.
John Roesler created KAFKA-7943: --- Summary: Add Suppress system test with caching disabled. Key: KAFKA-7943 URL: https://issues.apache.org/jira/browse/KAFKA-7943 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Assignee: John Roesler -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7944) Add more natural Suppress test
John Roesler created KAFKA-7944: --- Summary: Add more natural Suppress test Key: KAFKA-7944 URL: https://issues.apache.org/jira/browse/KAFKA-7944 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Can be integration or system test. The idea is to test with tighter time bounds with windows of say 30 seconds and use system time without adding any extra time for verification. Currently, all the tests rely on artificially advancing system time, which should be reliable, but you never know. So, we want to add a test that works exactly like production code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8040) Streams needs to retry initTransactions
John Roesler created KAFKA-8040: --- Summary: Streams needs to retry initTransactions Key: KAFKA-8040 URL: https://issues.apache.org/jira/browse/KAFKA-8040 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1 Reporter: John Roesler Assignee: John Roesler Fix For: 2.3.0 Following on KAFKA-7763, Streams needs to handle the new behavior. See also [https://github.com/apache/kafka/pull/6066] Streams code (StreamTask.java) needs to be modified to handle the new exception. Also, from another upstream change, `initTxn` can also throw TimeoutException now: default `MAX_BLOCK_MS_CONFIG` in producer is 60 seconds, so I think just wrapping it as StreamsException should be reasonable, similar to what we do for `producer#send`'s TimeoutException ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L220-L225] ). Note we need to handle in three functions: init/commit/abortTxn. See also https://github.com/apache/kafka/pull/6066#issuecomment-464403448 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8080) Remove streams_eos_test system test
John Roesler created KAFKA-8080: --- Summary: Remove streams_eos_test system test Key: KAFKA-8080 URL: https://issues.apache.org/jira/browse/KAFKA-8080 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler After KAFKA-7944 / [https://github.com/apache/kafka/pull/6382] , the system test streams_eos_test.py is mostly redundant. Quoting my analysis from [https://github.com/apache/kafka/pull/6382#discussion_r263536548,] {quote}Ok, so the smoke test and the eos test are similar, but not identical. The smoke test application has more features in it, though. So, we have more feature coverage under eos when we test with the smoke test. The eos test evaluates two topologies one with no repartition, and one with a repartition. The smoke test topology contains repartitions, so it only tests _with_ repartition. I think that it should be sufficient to test only _with_ repartition. The eos test verification specifically checks that "all transactions finished" ({{org.apache.kafka.streams.tests.EosTestDriver#verifyAllTransactionFinished}}). I'm not clear on exactly what we're looking for here. It looks like we create a transactional producer and send a record to each partition and then expect to get all those records back, without seeing any other records. But I'm not sure why we're doing this. If we want to drop the eos test, then we might need to add this to the smoke test verification. {quote} And [~guozhang]'s reply: {quote}{{verifyAllTransactionFinished}} aimed to avoid a situation that some dangling txn is open forever, without committing or aborting. Because the consumer needs to guarantee offset ordering when returning data, with read-committed they will also be blocked on those open txn's data forever (this usually indicates a broker-side issue, not streams though, but still). I think we should still retain this check if we want to merge in the EosTest to SmokeTest. {quote} As described above, the task is simply to add a similar check to the end of the verification logic in the SmokeTestDriver. Then, we can remove the EOS system test, as well as all the Java code that supports it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7944) Add more natural Suppress test
[ https://issues.apache.org/jira/browse/KAFKA-7944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-7944. - Resolution: Fixed > Add more natural Suppress test > -- > > Key: KAFKA-7944 > URL: https://issues.apache.org/jira/browse/KAFKA-7944 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > Can be integration or system test. > The idea is to test with tighter time bounds with windows of say 30 seconds > and use system time without adding any extra time for verification. > Currently, all the tests rely on artificially advancing system time, which > should be reliable, but you never know. So, we want to add a test that works > exactly like production code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8111) KafkaProducer can't produce data
John Roesler created KAFKA-8111: --- Summary: KafkaProducer can't produce data Key: KAFKA-8111 URL: https://issues.apache.org/jira/browse/KAFKA-8111 Project: Kafka Issue Type: Bug Components: clients, core Affects Versions: 2.3.0 Reporter: John Roesler Using a Producer from the current trunk (a6691fb79), I'm unable to produce data to a 2.2 broker. tl;dr;, I narrowed down the problem to [https://github.com/apache/kafka/commit/a42f16f98] . My hypothesis is that some part of that commit broke backward compatibility with older brokers. Repro steps: I'm using this Producer config: {noformat} final Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); return properties;{noformat} # create a simple Producer to produce test data to a broker # build against commmit a42f16f98 # start an older broker. (I was using 2.1, and someone else reproduced it with 2.2) # run your producer and note that it doesn't produce data (seems to hang, I see it produce 2 records in 1 minute) # build against the predecessor commit 65aea1f36 # run your producer and note that it DOES produce data (I see it produce 1M records every 15 second) I've also confirmed that if I check out the current trunk (a6691fb79e2c55b3) and revert a42f16f98, I also observe that it produces as expected (1M every 15 seconds). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8155) Update Streams system tests for 2.2.0 and 2.1.1 releases
John Roesler created KAFKA-8155: --- Summary: Update Streams system tests for 2.2.0 and 2.1.1 releases Key: KAFKA-8155 URL: https://issues.apache.org/jira/browse/KAFKA-8155 Project: Kafka Issue Type: Task Components: streams, system tests Reporter: John Roesler Assignee: John Roesler -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8167) Document named stateful operators
John Roesler created KAFKA-8167: --- Summary: Document named stateful operators Key: KAFKA-8167 URL: https://issues.apache.org/jira/browse/KAFKA-8167 Project: Kafka Issue Type: Task Components: documentation, streams Affects Versions: 2.1.1, 2.2.0, 2.1.0, 2.3.0, 2.1.2, 2.2.1 Reporter: John Roesler In KIP-372 / KAFKA-7406, we added the ability to name all persistent resources in support of topology compatibility. We missed documenting it, though, or at least, I couldn't find the docs. Since this feature is a higher-level, cross-cutting concern, we should add a section to the docs describing the compatibility problem and the full set of practices that you can employ to achieve compatibility by naming persistent resources. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8189) Streams should have an option to require names for stateful components
John Roesler created KAFKA-8189: --- Summary: Streams should have an option to require names for stateful components Key: KAFKA-8189 URL: https://issues.apache.org/jira/browse/KAFKA-8189 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler With the introduction of [https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping] , in conjunction with naming state via Materialized and Suppressed, Streams users have the ability to alter their topologies and restart without losing track of operator state or repartition topics. It would be a robust pattern to always name stateful components, but this pattern is vulnerable to simple coding and configuration mistakes. If developers lose vigilence even once and deploy a topology with *any* state not named, the only way to correct it is with an application reset. Streams can support topology compatibility by offering a config option to require names on all stateful components. Then, if someone accidentally adds an anonymous stateful operator, Streams would throw an exception instead of generating a name, preserving the integrity of the application. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8204) Streams may flush state stores in the incorrect order
John Roesler created KAFKA-8204: --- Summary: Streams may flush state stores in the incorrect order Key: KAFKA-8204 URL: https://issues.apache.org/jira/browse/KAFKA-8204 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Assignee: John Roesler Cached state stores may forward records during a flush call, so Streams should flush the stores in topological order. Otherwise, Streams may flush a downstream store before an upstream one, resulting in sink results being committed without the corresponding state changelog updates being committed. This behavior is partly responsible for the bug reported in KAFKA-7895 . The fix is simply to flush the stores in topological order, then when the upstream store forwards records to a downstream stateful processor, the corresponding state changes will be correctly flushed as well. An alternative would be to repeatedly call flush on all state stores until they report there is nothing left to flush, but this requires a public API change to enable state stores to report whether they need a flush or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window
[ https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reopened KAFKA-7895: - > Ktable supress operator emitting more than one record for the same key per > window > - > > Key: KAFKA-7895 > URL: https://issues.apache.org/jira/browse/KAFKA-7895 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0, 2.1.1 >Reporter: prasanthi >Assignee: John Roesler >Priority: Major > Fix For: 2.2.0, 2.1.2 > > > Hi, We are using kstreams to get the aggregated counts per vendor(key) within > a specified window. > Here's how we configured the suppress operator to emit one final record per > key/window. > {code:java} > KTable, Long> windowedCount = groupedStream > .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L))) > .count(Materialized.with(Serdes.Integer(),Serdes.Long())) > .suppress(Suppressed.untilWindowCloses(unbounded())); > {code} > But we are getting more than one record for the same key/window as shown > below. > {code:java} > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039 > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162 > [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584 > [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107 > [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315 > [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code} > Could you please take a look? > Thanks > > > Added by John: > Acceptance Criteria: > * add suppress to system tests, such that it's exercised with crash/shutdown > recovery, rebalance, etc. > ** [https://github.com/apache/kafka/pull/6278] > * make sure that there's some system test coverage with caching disabled. > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943 > * test with tighter time bounds with windows of say 30 seconds and use > system time without adding any extra time for verification > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8254) Suppress incorrectly passes a null topic to the serdes
John Roesler created KAFKA-8254: --- Summary: Suppress incorrectly passes a null topic to the serdes Key: KAFKA-8254 URL: https://issues.apache.org/jira/browse/KAFKA-8254 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.1.1, 2.2.0, 2.1.0 Reporter: John Roesler Fix For: 2.3.0, 2.1.2, 2.2.1 For example, in KTableSuppressProcessor, we do: {noformat} final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(null, key)); {noformat} This violates the contract of Serializer (and Deserializer), and breaks integration with known Serde implementations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8260) Flaky test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
John Roesler created KAFKA-8260: --- Summary: Flaky test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup Key: KAFKA-8260 URL: https://issues.apache.org/jira/browse/KAFKA-8260 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.3.0 Reporter: John Roesler I have seen this fail again just now. See also KAFKA-7965 and KAFKA-7936. https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3874/testReport/junit/kafka.api/ConsumerBounceTest/testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup/ {noformat} Error Message org.scalatest.junit.JUnitTestFailedError: Should have received an class org.apache.kafka.common.errors.GroupMaxSizeReachedException during the cluster roll Stacktrace org.scalatest.junit.JUnitTestFailedError: Should have received an class org.apache.kafka.common.errors.GroupMaxSizeReachedException during the cluster roll at org.scalatest.junit.AssertionsForJUnit.newAssertionFailedException(AssertionsForJUnit.scala:100) at org.scalatest.junit.AssertionsForJUnit.newAssertionFailedException$(AssertionsForJUnit.scala:99) at org.scalatest.junit.JUnitSuite.newAssertionFailedException(JUnitSuite.scala:71) at org.scalatest.Assertions.fail(Assertions.scala:1089) at org.scalatest.Assertions.fail$(Assertions.scala:1085) at org.scalatest.junit.JUnitSuite.fail(JUnitSuite.scala:71) at kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:350) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8280) Flaky Test DynamicBrokerReconfigurationTest#testUncleanLeaderElectionEnable
John Roesler created KAFKA-8280: --- Summary: Flaky Test DynamicBrokerReconfigurationTest#testUncleanLeaderElectionEnable Key: KAFKA-8280 URL: https://issues.apache.org/jira/browse/KAFKA-8280 Project: Kafka Issue Type: Bug Affects Versions: 2.3.0 Reporter: John Roesler I saw this fail again on https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3979/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/testUncleanLeaderElectionEnable/ {noformat} Error Message java.lang.AssertionError: Unclean leader not elected Stacktrace java.lang.AssertionError: Unclean leader not elected at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable(DynamicBrokerReconfigurationTest.scala:510) {noformat} {noformat} Standard Output Completed Updating config for entity: brokers '0'. Completed Updating config for entity: brokers '1'. Completed Updating config for entity: brokers '2'. [2019-04-23 01:17:11,690] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=1] Error for partition testtopic-6 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2019-04-23 01:17:11,690] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=1] Error for partition testtopic-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2019-04-23 01:17:11,722] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition testtopic-7 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2019-04-23 01:17:11,722] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition testtopic-1 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2019-04-23 01:17:11,760] ERROR [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=1] Error for partition testtopic-6 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2019-04-23 01:17:11,761] ERROR [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=1] Error for partition testtopic-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2019-04-23 01:17:11,765] ERROR [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error for partition testtopic-3 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2019-04-23 01:17:11,765] ERROR [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error for partition testtopic-9 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2019-04-23 01:17:13,779] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=1] Error for partition __consumer_offsets-8 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=1] Error for partition __consumer_offsets-38 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=1] Error for partition __consumer_offsets-2 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=1] Error for partition __consumer_offsets-14 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=1] Error for partition __consumer_offsets-20 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=1] Error for pa
[jira] [Created] (KAFKA-8307) Kafka Streams should provide some mechanism to determine topology equality and compatibility
John Roesler created KAFKA-8307: --- Summary: Kafka Streams should provide some mechanism to determine topology equality and compatibility Key: KAFKA-8307 URL: https://issues.apache.org/jira/browse/KAFKA-8307 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Currently, Streams provides no mechanism to compare two topologies. This is a common operation when users want to have tests verifying that they don't accidentally alter their topology. They would save the known-good topology and then add a unit test verifying the current code against that known-good state. However, because there's no way to do this comparison properly, everyone is reduced to using the string format of the topology (from `Topology#describe().toString()`). The major drawback is that the string format is meant for human consumption. It is neither machine-parseable nor stable. So, these compatibility tests are doomed to fail when any minor, non-breaking, change is made either to the application, or to the library. This trains everyone to update the test whenever it fails, undermining its utility. We should fix this problem, and provide both a mechanism to serialize the topology and to compare two topologies for compatibility. All in all, I think we need: # a way to serialize/deserialize topology structure in a machine-parseable format that is future-compatible. Offhand, I'd recommend serializing the topology structure as JSON, and establishing a policy that attributes should only be added to the object graph, never removed. Note, it's out of scope to be able to actually run a deserialized topology; we only want to save and load the structure (not the logic) to facilitate comparisons. # a method to verify the *equality* of two topologies... This method tells you that the two topologies are structurally identical. We can't know if the logic of any operator has changed, only if the structure of the graph is changed. We can consider whether other graph properties, like serdes, should be included. # a method to verify the *compatibility* of two topologies... This method tells you that moving from topology A to topology B does not require an application reset. Note that this operation is not commutative: `A.compatibleWith(B)` does not imply `B.compatibleWith(A)`. We can discuss whether `A.compatibleWith(B) && B.compatibleWith(A)` implies `A.equals(B)` (I think not necessarily, because we may want "equality" to be stricter than "compatibility"). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8318) Session Window Aggregations generate an extra tombstone
John Roesler created KAFKA-8318: --- Summary: Session Window Aggregations generate an extra tombstone Key: KAFKA-8318 URL: https://issues.apache.org/jira/browse/KAFKA-8318 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler See the discussion https://github.com/apache/kafka/pull/6654#discussion_r280231439 The session merging logic generates a tombstone in addition to an update when the session window already exists. It's not a correctness issue, just a small performance hit, because that tombstone is immediately invalidated by the update. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8396) Clean up Transformer API
John Roesler created KAFKA-8396: --- Summary: Clean up Transformer API Key: KAFKA-8396 URL: https://issues.apache.org/jira/browse/KAFKA-8396 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Currently, KStream operators transformValues and flatTransformValues disable context forwarding, and force operators to just return the new values. The reason is that we wanted to prevent the key from changing, since the whole point of a `xValues` transformation is that we _do not_ change the key, and hence don't need to repartition. However, the chosen mechanism has some drawbacks: The Transform concept is basically a way to plug in a custom Processor within the Streams DSL, but these restrictions make it more like a MapValues with access to the context. For example, even though you can still schedule punctuations, there's no way to forward values as a result of them. So, as a user, it's hard to build a mental model of how to use a TransformValues (because it's not quite a Transformer and not quite a Mapper). Also, logically, a Transformer can call forward as much as it wants, so a Transformer and a FlatTransformer are effectively the same thing. Then, we also have TransformValues and FlatTransformValues that are also two more versions of the same thing, just to implement the key restrictions. Internally, some of these can send downstream by returning OR forwarding, and others can only return. It's a lot for users to keep in mind. We can clean up this API significantly by just allowing all transformers to call `forward`. In the `Values` case, we can wrap the ProcessorContext in one that checks the key is `equal` to the one that got passed in (i.e., saves a reference and enforces equality with that reference in any call to `forward`). Then, we can actually deprecate the `*ValueTransformer*` interfaces and remove the restriction about calling forward. We can consider a further cleanup (TBD) to deprecate the existing Transformer interface entirely, and replace it with one with a `void` return type. Then, the Transform and FlatTransform cases collapse together, and we just need Transform and TransformValues. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8403) Suppress needs a Materialized variant
John Roesler created KAFKA-8403: --- Summary: Suppress needs a Materialized variant Key: KAFKA-8403 URL: https://issues.apache.org/jira/browse/KAFKA-8403 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler The newly added KTable Suppress operator lacks a Materialized variant, which would be useful if you wanted to query the results of the suppression. Suppression results will eventually match the upstream results, but the intermediate distinction may be meaningful for some applications. For example, you could want to query only the final results of a windowed aggregation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8410) Strengthen the types of Processors, at least in the DSL, maybe in the PAPI as well
John Roesler created KAFKA-8410: --- Summary: Strengthen the types of Processors, at least in the DSL, maybe in the PAPI as well Key: KAFKA-8410 URL: https://issues.apache.org/jira/browse/KAFKA-8410 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Presently, it's very difficult to have confidence when adding to or modifying processors in the DSL. There's a lot of raw types, duck-typing, and casting that contribute to this problem. The root, though, is that the generic types on `Processor` refer only to the _input_ key and value types. No information is captured or verified about what the _output_ types of a processor are. For example, this leads to widespread confusion in the code base about whether a processor produces `V`s or `Change`s. The type system actually makes matters worse, since we use casts to make the processors conform to declared types that are in fact wrong, but are never checked due to erasure. We can start to make some headway on this tech debt by adding some types to the ProcessorContext that bound the `` that may be passed to `context.forward`. Then, we can build on this by fully specifying the input and output types of the Processors, which in turn would let us eliminate the majority of unchecked casts in the DSL operators. I'm not sure whether adding these generic types to the existing ProcessorContext and Processor interfaces, which would also affect the PAPI has any utility, or whether we should make this purely an internal change by introducing GenericProcessorContext and GenericProcessor peer interfaces for the DSL to use. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7991) Add StreamsUpgradeTest for 2.2 release
[ https://issues.apache.org/jira/browse/KAFKA-7991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-7991. - Resolution: Duplicate Yep. Closing as a duplicate. > Add StreamsUpgradeTest for 2.2 release > -- > > Key: KAFKA-7991 > URL: https://issues.apache.org/jira/browse/KAFKA-7991 > Project: Kafka > Issue Type: Task > Components: streams, system tests >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: John Roesler >Priority: Blocker > Fix For: 2.3.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8452) Possible Suppress buffer optimization: de-duplicate prior value
John Roesler created KAFKA-8452: --- Summary: Possible Suppress buffer optimization: de-duplicate prior value Key: KAFKA-8452 URL: https://issues.apache.org/jira/browse/KAFKA-8452 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler As of KAFKA-8199, the suppression buffers have to track the "prior value" in addition to the "old" and "new" values for each record, to support transparent downstream views. In many cases, the prior value is actually the same as the old value, and we could avoid storing it separately. The challenge is that the old and new values are already serialized into a common array (as a Change via the FullChangeSerde), so the "prior" value would actually be a slice on the underlying array. But, of course, Java does not have array slices. To get around this, we either need to switch to ByteBuffers (which support slices) or break apart the serialized Change into just serialized old and new values. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8455) Add NothingSerde to Serdes
John Roesler created KAFKA-8455: --- Summary: Add NothingSerde to Serdes Key: KAFKA-8455 URL: https://issues.apache.org/jira/browse/KAFKA-8455 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Often, when reading an input topic, the key is expected to be null, but there's actually no way to represent this fact in Consumed, leading to confusing type signatures down the topology. For example, you might use the BytesSerde, but then you have a KStream. When maintaining a large application, this becomes a hazard, since you'd need to "be really careful" not to try and dereference the key at any point, since you actually know it's always null. Much better would be to actually represent the fact that the key is null, using the Void type. One such example of this is the NothingSerde I wrote here: https://github.com/confluentinc/kafka-streams-examples/blob/5.2.1-post/src/test/java/io/confluent/examples/streams/IntegrationTestUtils.java#L465 After some conversations, I've come to believe this would actually be a useful addition to the main Serdes collection. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8478) Poll for more records before forced processing
John Roesler created KAFKA-8478: --- Summary: Poll for more records before forced processing Key: KAFKA-8478 URL: https://issues.apache.org/jira/browse/KAFKA-8478 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler While analyzing the algorithm of Streams's poll/process loop, I noticed the following: The algorithm of runOnce is: {code} loop0: long poll for records (100ms) loop1: loop2: for BATCH_SIZE iterations: process one record in each task that has data enqueued adjust BATCH_SIZE if loop2 processed any records, repeat loop 1 else, break loop1 and repeat loop0 {code} There's potentially an unwanted interaction between "keep processing as long as any record is processed" and forcing processing after `max.task.idle.ms`. If there are two tasks, A and B, and A runs out of records on one input before B, then B could keep the processing loop running, and hence prevent A from getting any new records, until max.task.idle.ms expires, at which point A will force processing on its other input partition. The intent of idling is to at least give A a chance of getting more records on the empty input, but under this situation, we'd never even check for more records before forcing processing. I'm thinking we should only enforce processing if there was a completed poll since we noticed the task was missing inputs (otherwise, we may as well not bother idling at all). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8452) Possible Suppress buffer optimization: de-duplicate prior value
[ https://issues.apache.org/jira/browse/KAFKA-8452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-8452. - Resolution: Fixed > Possible Suppress buffer optimization: de-duplicate prior value > --- > > Key: KAFKA-8452 > URL: https://issues.apache.org/jira/browse/KAFKA-8452 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > As of KAFKA-8199, the suppression buffers have to track the "prior value" in > addition to the "old" and "new" values for each record, to support > transparent downstream views. > In many cases, the prior value is actually the same as the old value, and we > could avoid storing it separately. The challenge is that the old and new > values are already serialized into a common array (as a Change via the > FullChangeSerde), so the "prior" value would actually be a slice on the > underlying array. But, of course, Java does not have array slices. > To get around this, we either need to switch to ByteBuffers (which support > slices) or break apart the serialized Change into just serialized old and new > values. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8582) Consider adding an ExpiredWindowRecordHandler to Suppress
John Roesler created KAFKA-8582: --- Summary: Consider adding an ExpiredWindowRecordHandler to Suppress Key: KAFKA-8582 URL: https://issues.apache.org/jira/browse/KAFKA-8582 Project: Kafka Issue Type: Improvement Reporter: John Roesler I got some feedback on Suppress: {quote}Specifying how to handle events outside the grace period does seem like a business concern, and simply discarding them thus seems risky (for example imagine any situation where money is involved). This sort of situation is addressed by the late-triggering approach associated with watermarks (https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), given this I wondered if you were considering adding anything similar?{quote} It seems like, if a record has arrived past the grace period for its window, then the state of the windowed aggregation would already have been lost, so if we were to compute an aggregation result, it would be incorrect. Plus, since the window is already expired, we can't store the new (incorrect, but more importantly expired) aggregation result either, so any subsequent super-late records would also face the same blank-slate. I think this would wind up looking like this: if you have three timely records for a window, and then three more that arrive after the grace period, and you were doing a count aggregation, you'd see the counts emitted for the window as [1, 2, 3, 1, 1, 1]. I guess we could add a flag to the post-expiration results to indicate that they're broken, but this seems like the wrong approach. The post-expiration aggregation _results_ are meaningless, but I could see wanting to send the past-expiration _input records_ to a dead-letter queue or something instead of dropping them. Along this line of thinking, I wonder if we should add an optional past-expiration record handler interface to the suppression operator. Then, you could define your own logic, whether it's a dead-letter queue, sending it to some alerting pipeline, or even just crashing the application before it can do something wrong. This would be a similar pattern to how we allow custom logic to handle deserialization errors by supplying a org.apache.kafka.streams.errors.DeserializationExceptionHandler. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8696) Clean up Sum/Count/Total metrics
John Roesler created KAFKA-8696: --- Summary: Clean up Sum/Count/Total metrics Key: KAFKA-8696 URL: https://issues.apache.org/jira/browse/KAFKA-8696 Project: Kafka Issue Type: Improvement Reporter: John Roesler Assignee: John Roesler Kafka has a family of metrics consisting of: org.apache.kafka.common.metrics.stats.Count org.apache.kafka.common.metrics.stats.Sum org.apache.kafka.common.metrics.stats.Total org.apache.kafka.common.metrics.stats.Rate.SampledTotal org.apache.kafka.streams.processor.internals.metrics.CumulativeCount These metrics are all related to each other, but their relationship is obscure (and one is redundant) (and another is internal). I've recently been involved in a third recapitulation of trying to work out which metric does what. It seems like it's time to clean up the mess and save everyone from having to work out the mystery for themselves. I've proposed https://cwiki.apache.org/confluence/x/kkAyBw to fix it. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8769) Consider computing stream time independently per key
John Roesler created KAFKA-8769: --- Summary: Consider computing stream time independently per key Key: KAFKA-8769 URL: https://issues.apache.org/jira/browse/KAFKA-8769 Project: Kafka Issue Type: Improvement Reporter: John Roesler Currently, Streams uses a concept of "stream time", which is computed as the highest timestamp observed by stateful operators, per partition. This concept of time backs grace period, retention time, and suppression. For use cases in which data is produced to topics in roughly chronological order (as in db change capture), this reckoning is fine. Some use cases have a different pattern, though. For example, in IOT applications, it's common for sensors to save up quite a bit of data and then dump it all at once into the topic. See https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware for a concrete example of the use case. I have heard of cases where each sensor dumps 24 hours' worth of data at a time into the topic. This results in a pattern in which, when reading a single partition, the operators observe a lot of consecutive records for one key that increase in timestamp for 24 hours, then a bunch of consecutive records for another key that are also increasing in timestamp over the same 24 hour period. With our current stream-time definition, this means that the partition's stream time increases while reading the first key's data, but then stays paused while reading the second key's data, since the second batch of records all have timestamps in the "past". E.g: {noformat} A@t0 (stream time: 0) A@t1 (stream time: 1) A@t2 (stream time: 2) A@t3 (stream time: 3) B@t0 (stream time: 3) B@t1 (stream time: 3) B@t2 (stream time: 3) B@t3 (stream time: 3) {noformat} This pattern results in an unfortunate compromise in which folks are required to set the grace period to the max expected time skew, for example 24 hours, or Streams will just drop the second key's data (since it is late). But, this means that if they want to use Suppression for "final results", they have to wait 24 hours for the result. This tradeoff is not strictly necessary, though, because each key represents a logically independent sequence of events. Tracking by partition is simply convenient, but typically not logically meaningful. That is, the partitions are just physically independent sequences of events, so it's convenient to track stream time at this granularity. It would be just as correct, and more useful for IOT-like use cases, to track time independently for each key. However, before considering this change, we need to solve the testing/low-traffic problem. This is the opposite issue, where a partition doesn't get enough traffic to advance stream time and results remain "stuck" in the suppression buffers. We can provide some mechanism to force the advancement of time across all partitions, for use in testing when you want to flush out all results, or in production when some topic is low volume. We shouldn't consider tracking time _more_ granularly until this problem is solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8770) Either switch to or add an option for emit-on-change
John Roesler created KAFKA-8770: --- Summary: Either switch to or add an option for emit-on-change Key: KAFKA-8770 URL: https://issues.apache.org/jira/browse/KAFKA-8770 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Currently, Streams offers two emission models: * emit-on-window-close: (using Suppression) * emit-on-update: (i.e., emit a new result whenever a new record is processed, regardless of whether the result has changed) There is also an option to drop some intermediate results, either using caching or suppression. However, there is no support for emit-on-change, in which results would be forwarded only if the result has changed. This has been reported to be extremely valuable as a performance optimizations for some high-traffic applications, and it reduces the computational burden both internally for downstream Streams operations, as well as for external systems that consume the results, and currently have to deal with a lot of "no-op" changes. It would be pretty straightforward to implement this, by loading the prior results before a stateful operation and comparing with the new result before persisting or forwarding. In many cases, we load the prior result anyway, so it may not be a significant performance impact either. One design challenge is what to do with timestamps. If we get one record at time 1 that produces a result, and then another at time 2 that produces a no-op, what should be the timestamp of the result, 1 or 2? emit-on-change would require us to say 1. Clearly, we'd need to do some serious benchmarks to evaluate any potential implementation of emit-on-change. Another design challenge is to decide if we should just automatically provide emit-on-change for stateful operators, or if it should be configurable. Configuration increases complexity, so unless the performance impact is high, we may just want to change the emission model without a configuration. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8868) Consider auto-generating Streams binary protocol messages
John Roesler created KAFKA-8868: --- Summary: Consider auto-generating Streams binary protocol messages Key: KAFKA-8868 URL: https://issues.apache.org/jira/browse/KAFKA-8868 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Assignee: John Roesler Rather than maintain hand coded protocol serialization code, Streams could use the same code-generation framework as Clients/Core. There isn't a perfect match, since the code generation framework includes an assumption that you're generating "protocol messages", rather than just arbitrary blobs, but I think it's close enough to justify using it, and improving it over time. Using the code generation allows us to drop a lot of detail-oriented, brittle, and hard-to-maintain serialization logic in favor of a schema spec. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Resolved] (KAFKA-9006) Flaky Test KTableKTableForeignKeyJoinIntegrationTest.doLeftJoinFilterOutRapidlyChangingForeignKeyValues
[ https://issues.apache.org/jira/browse/KAFKA-9006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-9006. - Resolution: Duplicate > Flaky Test > KTableKTableForeignKeyJoinIntegrationTest.doLeftJoinFilterOutRapidlyChangingForeignKeyValues > --- > > Key: KAFKA-9006 > URL: https://issues.apache.org/jira/browse/KAFKA-9006 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Bill Bejeck >Assignee: John Roesler >Priority: Major > Labels: flaky-test > > h3. > {noformat} > Error Message > array lengths differed, expected.length=2 actual.length=1; arrays first > differed at element [0]; expected: but > was: > Stacktrace > array lengths differed, expected.length=2 actual.length=1; arrays first > differed at element [0]; expected: but > was: at > org.junit.internal.ComparisonCriteria.arrayEquals(ComparisonCriteria.java:78) > at > org.junit.internal.ComparisonCriteria.arrayEquals(ComparisonCriteria.java:28) > at org.junit.Assert.internalArrayEquals(Assert.java:534) at > org.junit.Assert.assertArrayEquals(Assert.java:285) at > org.junit.Assert.assertArrayEquals(Assert.java:300) at > org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinIntegrationTest.doLeftJoinFilterOutRapidlyChangingForeignKeyValues(KTableKTableForeignKeyJoinIntegrationTest.java:585) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at > org.junit.rules.RunRules.evaluate(RunRules.java:20) at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at > org.junit.runners.ParentRunner.run(ParentRunner.java:412) at > org.junit.runners.Suite.runChild(Suite.java:128) at > org.junit.runners.Suite.runChild(Suite.java:27) at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at > org.junit.runners.ParentRunner.run(ParentRunner.java:412) at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at jdk.internal.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Delega
[jira] [Resolved] (KAFKA-9022) Flaky Test KTableKTableForeignKeyJoinIntegrationTest. doInnerJoinFilterOutRapidlyChangingForeignKeyValues
[ https://issues.apache.org/jira/browse/KAFKA-9022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-9022. - Resolution: Duplicate > Flaky Test KTableKTableForeignKeyJoinIntegrationTest. > doInnerJoinFilterOutRapidlyChangingForeignKeyValues > - > > Key: KAFKA-9022 > URL: https://issues.apache.org/jira/browse/KAFKA-9022 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Sophie Blee-Goldman >Assignee: John Roesler >Priority: Critical > Labels: flaky-test > Fix For: 2.4.0 > > > array lengths differed, expected.length=2 actual.length=1; arrays first > differed at element [0]; expected: but > was: > > Failed locally for me -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9058) Foreign Key Join should not require a queriable store
John Roesler created KAFKA-9058: --- Summary: Foreign Key Join should not require a queriable store Key: KAFKA-9058 URL: https://issues.apache.org/jira/browse/KAFKA-9058 Project: Kafka Issue Type: Improvement Affects Versions: 2.4.0 Reporter: John Roesler Assignee: John Roesler While resolving KAFKA-9000, I uncovered a significant flaw in the implementation of foreign key joins. The join only works if supplied with a queriable store. I think this was simply an oversight during implementation and review. It would be better to fix this now before the release, since the restriction it places on users could represent a significant burden. If they don't otherwise need the store to be queriable, then they shouldn't be forced to allocate enough storage for a full copy of the join result, or add the changelog topic for it either. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8902) Benchmark cooperative vs eager rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-8902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-8902. - Resolution: Fixed I wrote a simple Streams application using the Processor API to update 10 stores with every record seen. {code} final int numStores = 10; final Topology topology = new Topology(); topology.addSource("source", new StringDeserializer(), new StringDeserializer(), "table-in"); topology.addProcessor( "processor", (ProcessorSupplier) () -> new Processor() { private final List> stores = new ArrayList<>(numStores); @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { for (int i = 0; i < numStores; i++) { stores.add( i, (KeyValueStore) context.getStateStore("store" + i) ); } } @Override public void process(final String key, final String value) { for (final KeyValueStore store : stores) { store.put(key, value); } } @Override public void close() { stores.clear(); } }, "source" ); {code} I tested this topology using both in-memory and on-disk stores, with caching and logging enabled. My benchmark consisted of running one KafkaStreams instance and measuring its metrics, while simulating other nodes joining and leaving the cluster (by constructing the simulated nodes to participate in the consumer group protocol without actually doing any work). I tested three cluster rebalance scenarios: * scale up: 100 partitions / 10 nodes = 10 tasks per node starting, run 4 minutes, add one node (each node loses one task), run 2 minutes, add one node (each node loses another task), run two minutes, add two nodes (each node loses one task), end the test at the 10 minute mark * rolling bounce: 100 partitions / 10 nodes = 10 tasks per node starting, run 4 minutes, bounce each node in the cluster (waiting for it to join and all nodes to return to RUNNING before proceeding), end the test at the 10 minute mark * full bounce: 100 partitions / 10 nodes = 10 tasks per node starting, run 4 minutes, bounce each node in the cluster (without waiting, so they all leave and join at once), end the test at the 10 minute mark For input data, I randomly generated a dataset of 10,000 keys, and another with 100,000 keys, all with 1kB values. This data was pre-loaded into the broker, with compaction and retention disabled (so that every test iteration would get the same sequence of updates) I ran all the benchmarks on AWS i3.large instances, with a dedicated broker node running on a separate i3.large instance. For each test configuration and scenario, I ran 20 independent trials and discarded the high and low results (to exclude outliers), for 18 total data points. The key metric was the overall throughput of a single node during the test. I compared the above results from: * 2.3.1-SNAPSHOT (the current head of the 2.3 branch) - Eager protocol * 2.4.0-SNAPSHOT (the current head of the 2.4 branch) - Cooperative protocol * a modified 2.4.0-SNAPSHOT with cooperative rebalancing disabled - Eager protocol What I found is that under all scenarios, all three versions performed the same (within a 99.9% significance threshold) under the same data sets and the same configurations. I didn't see any marked improvement as a result of cooperative rebalancing alone, but this is only the foundation for several follow-on improvements. What is very good to know is that I also didn't find any regression as a result of the new protocol implementation. > Benchmark cooperative vs eager rebalancing > -- > > Key: KAFKA-8902 > URL: https://issues.apache.org/jira/browse/KAFKA-8902 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: John Roesler >Priority: Major > Fix For: 2.4.0 > > > Cause rebalance and measure: > * overall throughput > * paused time > * (also look at the metrics from > (https://issues.apache.org/jira/browse/KAFKA-8609)): > ** accumulated rebalance time > Cluster/topic sizing: > ** 10 instances > ** 100 tasks (each instance gets 10 tasks) > ** 1000 stores (each task gets 10 stores) > * standbys = [0 and 1] > Rolling bounce: > * with and without state loss > * shorter and faster than session timeout (shorter in particular should be > interesting) > Expand (from 9 to 10) > Contract (fro
[jira] [Created] (KAFKA-9103) KIP-441: Add TaskLags to SubscriptionInfo
John Roesler created KAFKA-9103: --- Summary: KIP-441: Add TaskLags to SubscriptionInfo Key: KAFKA-9103 URL: https://issues.apache.org/jira/browse/KAFKA-9103 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Assignee: John Roesler As described in KIP-441, we will add the TaskLags field to, and remove the ActiveTasks and StandbyTasks fields from, the SubscriptionInfo object. This change can be made independent of the new balancing algorithm, since we can transparently convert back and forth between active/standby tasks and the tasklags formats, using a lag of 0 to denote active tasks and a lag > 0 to denote standbys. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9138) Add system test covering Foreign Key joins (KIP-213)
John Roesler created KAFKA-9138: --- Summary: Add system test covering Foreign Key joins (KIP-213) Key: KAFKA-9138 URL: https://issues.apache.org/jira/browse/KAFKA-9138 Project: Kafka Issue Type: Test Components: streams Reporter: John Roesler Assignee: John Roesler There are unit and integration tests, but we should really have a system test as well. I plan to create a new test, since this feature is pretty different than the existing topology/data set of smoke test. Although, it might be possible for the new test to subsume smoke test. I'd give the new test a few releases to burn in before considering a merge, though. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9169) Standby Tasks point ask for incorrect offsets on resuming post suspension
[ https://issues.apache.org/jira/browse/KAFKA-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-9169. - Resolution: Fixed This was fixed in https://github.com/apache/kafka/pull/7681/ and merged to trunk (currently 2.5.0-SNAPSHOT) > Standby Tasks point ask for incorrect offsets on resuming post suspension > - > > Key: KAFKA-9169 > URL: https://issues.apache.org/jira/browse/KAFKA-9169 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2 >Reporter: Navinder Brar >Assignee: John Roesler >Priority: Critical > Fix For: 2.5.0 > > > In versions(check 2.0) where standby tasks are suspended on each rebalance > the checkpoint file is updated post the flush and the expected behaviour is > that post assignment the same standby task gets assigned back on the machine > it will start reading data from changelog from the same offset from it left > off. > > But there looks like a bug in the code, every time post rebalance it starts > reading from the offset from where it read the first time the task was > assigned on this machine. This has 2 repercussions: > # After every rebalance the standby tasks start restoring huge amount of > data which they have already restored earlier(Verified this via 300x increase > Network IO on all streams instances post rebalance even when no change in > assignment) . > # If changelog has time retention those offsets will not be available in the > changelog, which leads to offsetOutOfRange exceptions and the stores get > deleted and recreated again. > > I have gone through the code and I think I know the issue. > In TaskManager# updateNewAndRestoringTasks(), the function > assignStandbyPartitions() gets called for all the running standby tasks where > it populates the Map: checkpointedOffsets from the > standbyTask.checkpointedOffsets() which is only updated at the time of > initialization of a StandbyTask(i.e. in it's constructor). > > This has an easy fix. > Post resumption we are reading standbyTask.checkpointedOffsets() to know the > offset from where the standby task should start running and not from > stateMgr.checkpointed() which gets updated on every commit to the checkpoint > file. In the former case it's always reading from the same offset, even those > which it had already read earlier and in cases where changelog topic has a > retention time, it gives offsetOutOfRange exception. So, > standbyTask.checkpointedOffsets() is quite useless and we should use > stateMgr.checkpointed() instead to return offsets to task manager. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10086) Standby state isn't always re-used when transitioning to active
[ https://issues.apache.org/jira/browse/KAFKA-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-10086. -- Resolution: Fixed > Standby state isn't always re-used when transitioning to active > --- > > Key: KAFKA-10086 > URL: https://issues.apache.org/jira/browse/KAFKA-10086 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 2.6.0, 2.7.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 2.6.0, 2.7.0 > > > This ticket was initially just to write an integration test, but I escalated > it to a blocker and changed the title when the integration test actually > surfaced two bugs: > # Offset positions were not reported for in-memory stores, so tasks with > in-memory stores would never be considered as "caught up" and could not take > over active processing, preventing clusters from ever achieving balance. This > is a regression in 2.6 > # When the TaskAssignor decided to switch active processing from a former > owner to a new one that had a standby, the lower-level cooperative rebalance > protocol would first de-schedule the task completely, and only later would > assign it to the new owner. For in-memory stores, this causes the standby > state not to be re-used, and for persistent stores, it creates a window in > which the cleanup thread might delete the state directory. In both cases, > even though the instance previously had a standby, once it gets the active, > it still had to restore the entire state from the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10165) Percentiles metric leaking memory
[ https://issues.apache.org/jira/browse/KAFKA-10165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-10165. -- Resolution: Fixed > Percentiles metric leaking memory > - > > Key: KAFKA-10165 > URL: https://issues.apache.org/jira/browse/KAFKA-10165 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Reporter: Sophie Blee-Goldman >Assignee: John Roesler >Priority: Blocker > Fix For: 2.6.0 > > > We've hit several OOM in our soak cluster lately. We were finally able to get > a heap dump right after the OOM, and found over 3.5 GB of memory being > retained by the percentiles (or specifically by the 1MB float[] used by the > percentiles). > The leak does seem specific to the Percentiles class, as we see ~3000 > instances of the Percentiles object vs only ~500 instances of the Max object, > which is also used in the same sensor as the Percentiles > We did recently lower the size from 1MB to 100kB, but it's clear there is a > leak of some kind and a "smaller leak" is not an acceptable solution. If the > cause fo the leak is not immediately obvious we should just revert the > percentiles in 2.6 and work on stabilizing them for 2.7 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10185) Streams should log summarized restoration information at info level
John Roesler created KAFKA-10185: Summary: Streams should log summarized restoration information at info level Key: KAFKA-10185 URL: https://issues.apache.org/jira/browse/KAFKA-10185 Project: Kafka Issue Type: Task Components: streams Reporter: John Roesler Assignee: John Roesler -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10200) MockProcessorContext doesn't work with WindowStores
John Roesler created KAFKA-10200: Summary: MockProcessorContext doesn't work with WindowStores Key: KAFKA-10200 URL: https://issues.apache.org/jira/browse/KAFKA-10200 Project: Kafka Issue Type: Bug Components: streams, streams-test-utils Reporter: John Roesler Assignee: John Roesler The recommended pattern for testing custom Processor implementations is to use the test-utils MockProcessorContext. If a Processor implementation needs a store, the store also has to be initialized with the same context. However, the existing (in-memory and persistent) Windowed store implementations perform internal casts that result in class cast exceptions if you attempt to initialize them with the MockProcessorContext. A workaround is to instead embed the processor in an application and use the TopologyTestDriver instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10202) SmokeTest should provide a machanism to generate continuously AND verify the result
John Roesler created KAFKA-10202: Summary: SmokeTest should provide a machanism to generate continuously AND verify the result Key: KAFKA-10202 URL: https://issues.apache.org/jira/browse/KAFKA-10202 Project: Kafka Issue Type: Improvement Components: streams, system tests Reporter: John Roesler Several system tests use the SmokeTestDriver, but they have to choose between generating for a fixed period of time (say, two minutes) before verifying results OR generating until cancelled with no verification. It's not impossible to implement "generate until cancelled and then verify", and doing so would both speed up tests that are just blocked by the two-minute generation period AND decrease flakiness in other tests, when they can't complete all their operations within the deadline before verification begins. One ides is to revamp the smoke test driver's verification logic to make it consume the input and determine the expected results, so that the verifier can be run as a separate process instead of passing the expected results straight from the generator to the verifier. This is how the RelationalSmokeTest works. An alternative is to register a signal handler so that we can signal the driver to stop generating and start verifying. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10203) Rolling upgrade from 2.1.1 to trunk (2.7.x) doesn't work
John Roesler created KAFKA-10203: Summary: Rolling upgrade from 2.1.1 to trunk (2.7.x) doesn't work Key: KAFKA-10203 URL: https://issues.apache.org/jira/browse/KAFKA-10203 Project: Kafka Issue Type: Bug Reporter: John Roesler As part of KAFKA-10173, I converted the upgrade test to use the SmokeTest application and also added new upgrade paths, including from 2.1.1 to trunk. It is a rolling bounce scenario. After the first instance upgrades from 2.1.1 to trunk, I observe the following on _both_ instances that are still on 2.1.1: {code:java} org.apache.kafka.streams.errors.TaskAssignmentException: stream-thread [SmokeTest-ed7632fc-3465-4534-8a26-2f6ad76ff80f-StreamThread-2-consumer] Number of assigned partitions 11 is not equal to the number of active taskIds 7, assignmentInfo=[version=4, supported version=4, active tasks=[0_0, 0_2, 2_0, 1_1, 1_3, 2_2, 3_3], standby tasks={0_0=[data-0], 1_0=[max-0, min-0], 0_1=[data-1], 0_2=[data-2], 2_0=[sum-0, cnt-0], 1_1=[max-1, min-1], 1_2=[min-2, max-2], 0_3=[data-3], 2_1=[cnt-1, sum-1], 3_0=[SmokeTest-cntByCnt-repartition-0], 0_4=[data-4], 3_1=[SmokeTest-cntByCnt-repartition-1], 2_2=[cnt-2, sum-2], 1_3=[min-3, max-3], 3_2=[SmokeTest-cntByCnt-repartition-2], 1_4=[max-4, min-4], 2_3=[sum-3, cnt-3], 2_4=[sum-4, cnt-4], 3_3=[SmokeTest-cntByCnt-repartition-3], 3_4=[SmokeTest-cntByCnt-repartition-4]}, global assignment={}] at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.processVersionOneAssignment(StreamsPartitionAssignor.java:892) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.processVersionTwoAssignment(StreamsPartitionAssignor.java:908) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.processVersionThreeAssignment(StreamsPartitionAssignor.java:925) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.processVersionFourAssignment(StreamsPartitionAssignor.java:932) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:872) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:281) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:406) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:341) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:913) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:818) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) {code} the other 2.1.1 instance reports the same exception, with "Number of assigned partitions 9 is not equal to the number of active taskIds 6" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10185) Streams should log summarized restoration information at info level
[ https://issues.apache.org/jira/browse/KAFKA-10185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-10185. -- Resolution: Fixed > Streams should log summarized restoration information at info level > --- > > Key: KAFKA-10185 > URL: https://issues.apache.org/jira/browse/KAFKA-10185 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.6.0, 2.5.1 > > > Currently, restoration progress is only visible at debug level in the > Consumer's Fetcher logs. Users can register a restoration listener and > implement their own logging, but it would substantially improve operability > to have some logs available at INFO level. > Logging each partition in each restore batch at info level would be too much, > though, so we should print summarized logs at a decreased interval, like > every 10 seconds. -- This message was sent by Atlassian Jira (v8.3.4#803005)