[jira] [Created] (KAFKA-7493) Rewrite test_broker_type_bounce_at_start

2018-10-09 Thread John Roesler (JIRA)
John Roesler created KAFKA-7493:
---

 Summary: Rewrite test_broker_type_bounce_at_start
 Key: KAFKA-7493
 URL: https://issues.apache.org/jira/browse/KAFKA-7493
 Project: Kafka
  Issue Type: Improvement
  Components: streams, system tests
Reporter: John Roesler


Currently, the test test_broker_type_bounce_at_start in 
streams_broker_bounce_test.py is ignored.

As written, there are a couple of race conditions that lead to flakiness.

It should be possible to re-write the test to wait on log messages, as the 
other tests do, instead of just sleeping to more deterministically transition 
the test from one state to the next.

Once the test is fixed, the fix should be back-ported to all prior branches, 
back to 0.10.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7539) ConsumerBounceTest.testClose transient failure

2018-10-24 Thread John Roesler (JIRA)
John Roesler created KAFKA-7539:
---

 Summary: ConsumerBounceTest.testClose transient failure
 Key: KAFKA-7539
 URL: https://issues.apache.org/jira/browse/KAFKA-7539
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: Fangmin Lv
 Fix For: 0.9.0.0


{code}
kafka.api.ConsumerBounceTest > testSeekAndCommitWithBrokerFailures FAILED
java.lang.AssertionError: expected:<1000> but was:<976>
at org.junit.Assert.fail(Assert.java:92)
at org.junit.Assert.failNotEquals(Assert.java:689)
at org.junit.Assert.assertEquals(Assert.java:127)
at org.junit.Assert.assertEquals(Assert.java:514)
at org.junit.Assert.assertEquals(Assert.java:498)
at 
kafka.api.ConsumerBounceTest.seekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:117)
at 
kafka.api.ConsumerBounceTest.testSeekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:98)

kafka.api.ConsumerBounceTest > testSeekAndCommitWithBrokerFailures FAILED
java.lang.AssertionError: expected:<1000> but was:<913>
at org.junit.Assert.fail(Assert.java:92)
at org.junit.Assert.failNotEquals(Assert.java:689)
at org.junit.Assert.assertEquals(Assert.java:127)
at org.junit.Assert.assertEquals(Assert.java:514)
at org.junit.Assert.assertEquals(Assert.java:498)
at 
kafka.api.ConsumerBounceTest.seekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:117)
at 
kafka.api.ConsumerBounceTest.testSeekAndCommitWithBrokerFailures(ConsumerBounceTest.scala:98)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7539) ConsumerBounceTest.testClose transient failure

2018-10-24 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-7539.
-
Resolution: Invalid
  Reviewer:   (was: Jason Gustafson)

Cloning to create this issue was a mistake. I'm going to create a fresh one.

> ConsumerBounceTest.testClose transient failure
> --
>
> Key: KAFKA-7539
> URL: https://issues.apache.org/jira/browse/KAFKA-7539
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Major
>  Labels: flaky-test
>
> {code:java}
> java.lang.ArrayIndexOutOfBoundsException: -1
>   at 
> kafka.integration.KafkaServerTestHarness.killBroker(KafkaServerTestHarness.scala:146)
>   at 
> kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:238)
>   at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:211)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117)
>   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155)
>   at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConne

[jira] [Created] (KAFKA-7540) Transient failure: kafka.api.ConsumerBounceTest.testClose

2018-10-24 Thread John Roesler (JIRA)
John Roesler created KAFKA-7540:
---

 Summary: Transient failure: kafka.api.ConsumerBounceTest.testClose
 Key: KAFKA-7540
 URL: https://issues.apache.org/jira/browse/KAFKA-7540
 Project: Kafka
  Issue Type: Bug
Reporter: John Roesler


Observed on Java 8: 
[https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/17314/testReport/junit/kafka.api/ConsumerBounceTest/testClose/]

 

Stacktrace:
{noformat}
java.lang.ArrayIndexOutOfBoundsException: -1
at 
kafka.integration.KafkaServerTestHarness.killBroker(KafkaServerTestHarness.scala:146)
at 
kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:238)
at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:211)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117)
at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155)
at 
org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137)
at 
org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
at 
org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPoli

[jira] [Created] (KAFKA-7541) Transient Failure: kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable

2018-10-24 Thread John Roesler (JIRA)
John Roesler created KAFKA-7541:
---

 Summary: Transient Failure: 
kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable
 Key: KAFKA-7541
 URL: https://issues.apache.org/jira/browse/KAFKA-7541
 Project: Kafka
  Issue Type: Bug
Reporter: John Roesler


Observed on Java 11: 
[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/264/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/testUncleanLeaderElectionEnable/]

 

Stacktrace:
{noformat}
java.lang.AssertionError: Unclean leader not elected
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at 
kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable(DynamicBrokerReconfigurationTest.scala:487)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnec

[jira] [Created] (KAFKA-7542) Transient Failure: kafka.server.GssapiAuthenticationTest.testServerAuthenticationFailure

2018-10-24 Thread John Roesler (JIRA)
John Roesler created KAFKA-7542:
---

 Summary: Transient Failure: 
kafka.server.GssapiAuthenticationTest.testServerAuthenticationFailure
 Key: KAFKA-7542
 URL: https://issues.apache.org/jira/browse/KAFKA-7542
 Project: Kafka
  Issue Type: Bug
Reporter: John Roesler


Observed on java 11: 
[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/264/testReport/junit/kafka.server/GssapiAuthenticationTest/testServerAuthenticationFailure/]

at 
[https://github.com/apache/kafka/pull/5795/commits/5bdcd0e023c6f406d585155399f6541bb6a9f9c2]

 

Stacktrace:
{noformat}
org.scalatest.junit.JUnitTestFailedError: 
at 
org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:100)
at 
org.scalatest.junit.JUnitSuite.newAssertionFailedException(JUnitSuite.scala:71)
at org.scalatest.Assertions$class.fail(Assertions.scala:1075)
at org.scalatest.junit.JUnitSuite.fail(JUnitSuite.scala:71)
at 
kafka.server.GssapiAuthenticationTest.testServerAuthenticationFailure(GssapiAuthenticationTest.scala:128)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.i

[jira] [Created] (KAFKA-7544) Transient Failure: org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFails

2018-10-24 Thread John Roesler (JIRA)
John Roesler created KAFKA-7544:
---

 Summary: Transient Failure: 
org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFails
 Key: KAFKA-7544
 URL: https://issues.apache.org/jira/browse/KAFKA-7544
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: John Roesler


Observed on Java 11: 
[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/264/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/shouldNotViolateEosIfOneTaskFails/]

at 
[https://github.com/apache/kafka/pull/5795/commits/5bdcd0e023c6f406d585155399f6541bb6a9f9c2]

 

stacktrace:
{noformat}
java.lang.AssertionError: 
Expected: <[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 2), KeyValue(0, 3), 
KeyValue(0, 4), KeyValue(0, 5), KeyValue(0, 6), KeyValue(0, 7), KeyValue(0, 8), 
KeyValue(0, 9)]>
 but: was <[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 2), KeyValue(0, 3), 
KeyValue(0, 4), KeyValue(0, 5), KeyValue(0, 6), KeyValue(0, 7), KeyValue(0, 8), 
KeyValue(0, 9), KeyValue(0, 10), KeyValue(0, 11), KeyValue(0, 12), KeyValue(0, 
13), KeyValue(0, 14)]>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
at 
org.apache.kafka.streams.integration.EosIntegrationTest.checkResultPerKey(EosIntegrationTest.java:218)
at 
org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFails(EosIntegrationTest.java:360)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImp

[jira] [Created] (KAFKA-7553) Jenkins PR tests hung

2018-10-25 Thread John Roesler (JIRA)
John Roesler created KAFKA-7553:
---

 Summary: Jenkins PR tests hung
 Key: KAFKA-7553
 URL: https://issues.apache.org/jira/browse/KAFKA-7553
 Project: Kafka
  Issue Type: Bug
Reporter: John Roesler
 Attachments: consoleText.txt

I wouldn't worry about this unless it continues to happen, but I wanted to 
document it.

This was a Java 11 build: 
[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/266/]

It was for this PR: [https://github.com/apache/kafka/pull/5795]

And this commit: 
[https://github.com/apache/kafka/pull/5795/commits/5bdcd0e023c6f406d585155399f6541bb6a9f9c2]

 

It looks like the tests just hung after 46 minutes, until the build timed out 
at 180 minutes.

End of the output:
{noformat}
...
00:46:27.275 kafka.server.ServerGenerateBrokerIdTest > 
testConsistentBrokerIdFromUserConfigAndMetaProps STARTED
00:46:29.775 
00:46:29.775 kafka.server.ServerGenerateBrokerIdTest > 
testConsistentBrokerIdFromUserConfigAndMetaProps PASSED
03:00:51.124 Build timed out (after 180 minutes). Marking the build as aborted.
03:00:51.440 Build was aborted
03:00:51.492 [FINDBUGS] Skipping publisher since build result is ABORTED
03:00:51.492 Recording test results
03:00:51.495 Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
03:00:58.017 Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
03:00:59.330 Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
03:00:59.331 Adding one-line test results to commit status...
03:00:59.332 Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
03:00:59.334 Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
03:00:59.335 Setting status of 5bdcd0e023c6f406d585155399f6541bb6a9f9c2 to 
FAILURE with url https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/266/ 
and message: 'FAILURE
03:00:59.335  9053 tests run, 1 skipped, 0 failed.'
03:00:59.335 Using context: JDK 11 and Scala 2.12
03:00:59.541 Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
03:00:59.542 Finished: ABORTED{noformat}
 

I did find one test that started but did not finish:
{noformat}
00:23:29.576 kafka.api.PlaintextConsumerTest > 
testLowMaxFetchSizeForRequestAndPartition STARTED
{noformat}
But note that the tests continued to run for another 23 minutes after this one 
started.

 

Just for completeness, there were 4 failures:
{noformat}
00:22:06.875 kafka.admin.ResetConsumerGroupOffsetTest > 
testResetOffsetsNotExistingGroup FAILED
00:22:06.875 java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
00:22:06.875 at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
00:22:06.875 at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
00:22:06.875 at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
00:22:06.876 at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
00:22:06.876 at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:307)
00:22:06.876 at 
kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup(ResetConsumerGroupOffsetTest.scala:89)
00:22:06.876 
00:22:06.876 Caused by:
00:22:06.876 
org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.{noformat}
 
{noformat}
00:25:22.175 kafka.api.CustomQuotaCallbackTest > testCustomQuotaCallback FAILED
00:25:22.175 java.lang.AssertionError: Partition [group1_largeTopic,69] 
metadata not propagated after 15000 ms
00:25:22.176 at kafka.utils.TestUtils$.fail(TestUtils.scala:351)
00:25:22.176 at 
kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:741)
00:25:22.176 at 
kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:831)
00:25:22.176 at 
kafka.utils.TestUtils$$anonfun$createTopic$2.apply(TestUtils.scala:330)
00:25:22.176 at 
kafka.utils.TestUtils$$anonfun$createTopic$2.apply(TestUtils.scala:329)
00:25:22.176 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
00:25:22.176 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
00:25:22.176 at 
scala.collection.Iterator$class.foreach(Iterator.scala:891)
00:25:22.176 at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
00:25:22.176 at 
scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
00:25:22.176 at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
00:25:22.176 at 
scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
00:25:22.176 at scala.collection.SetLike$class.map(SetLike.scala:92)
00:25:22.17

[jira] [Created] (KAFKA-7806) Windowed Aggregations should wrap default key serde if none is specified

2019-01-09 Thread John Roesler (JIRA)
John Roesler created KAFKA-7806:
---

 Summary: Windowed Aggregations should wrap default key serde if 
none is specified
 Key: KAFKA-7806
 URL: https://issues.apache.org/jira/browse/KAFKA-7806
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


In Streams, windowing a stream by either time or session windows causes the 
stream's keys to be transformed from `K` to `Windowed`.

Since this is a well defined transition, it's not necessary for developers to 
explicitly provide a `Serde>`. For convenience, Streams, which 
already knows the key serde (`Serde`) automatically wraps it in case it's 
needed by downstream operators.

However, this automatic wrapping only takes place if the key serde has been 
explicitly provided in the topology. If the topology relies on the 
`default.key.serde` configuration, no wrapping takes place, and downstream 
operators will encounter a ClassCastException trying to cast a `Windowed` (the 
windowed key) to whatever type the default serde handles (which is the key 
wrapped inside the windowed key).

Specifically, they key serde forwarding logic is:

in `org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl`:

`materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null`

and in `org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImpl`:

`materializedInternal.keySerde() != null ? new 
WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null`

 

This pattern of not "solidifying" the default key serde is common in Streams. 
Not all operators need a serde, and the default serde may not be applicable to 
all operators. So, it would be a mistake to arbitrary operators to grab the 
default serde and pass it downstream as if it had been explicitly set.

 

However, in this case specifically, all windowed aggregations are stateful, so 
if we don't have an explicit key serde at this point, we know that we have used 
the default serde in the window store. If the default serde were incorrect, an 
exception would be thrown by the windowed aggregation itself. So it actually is 
safe to wrap the default serde in a windowed serde and pass it downstream, 
which would result in a better development experience.

 

Unfortunately, the default serde is set via config, but the windowed serde 
wrapping happens during DSL building, when the config is not generally 
available. Therefore, we would need a special windowed serde wrapper that 
signals that it wraps the default serde, which would be fully resolved during 
operators' init call.

For example, something of this nature:

`materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : 
FullTimeWindowedSerde.wrapDefault(windows.size())`

etc.

 

Complicating the situation slightly, all the windowed serializers and 
deserializers will resolve a runtime inner class using 
`default.windowed.key.serde.inner` if given a null inner serde to wrap. 
However, at this point in the topology build, we do know that the windowed 
aggregation has specifically used the `default.key.serde`, not the 
`default.windowed.key.serde.inner` to persist its state to the window store, 
therefore, it should be correct to wrap the default key serde specifically and 
not use the `default.windowed.key.serde.inner`.

 

In addition to fixing this for TimeWindowed and SessionWindowed streams, we 
need to have good test coverage of the new code. There is clearly a blind spot 
in the tests, or we would have noticed this sooner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-7741) Bad dependency via SBT

2019-01-10 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reopened KAFKA-7741:
-

> Bad dependency via SBT
> --
>
> Key: KAFKA-7741
> URL: https://issues.apache.org/jira/browse/KAFKA-7741
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.0.1, 2.1.0
> Environment: Windows 10 professional, IntelliJ IDEA 2017.1
>Reporter: sacha barber
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.2.0, 2.1.1, 2.0.2
>
>
> I am using the Kafka-Streams-Scala 2.1.0 JAR.
> And if I create a new Scala project using SBT with these dependencies 
> {code}
> name := "ScalaKafkaStreamsDemo"
> version := "1.0"
> scalaVersion := "2.12.1"
> libraryDependencies += "org.apache.kafka" %% "kafka" % "2.0.0"
> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.0"
> libraryDependencies += "org.apache.kafka" % "kafka-streams" % "2.0.0"
> libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.0.0"
> //TEST
> libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % Test
> libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % 
> "2.0.0" % Test
> {code}
> I get this error
>  
> {code}
> SBT 'ScalaKafkaStreamsDemo' project refresh failed
> Error:Error while importing SBT project:...[info] Resolving 
> jline#jline;2.14.1 ...
> [warn] [FAILED ] 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}: (0ms)
> [warn]  local: tried
> [warn] 
> C:\Users\sacha\.ivy2\local\javax.ws.rs\javax.ws.rs-api\2.1.1\${packaging.type}s\javax.ws.rs-api.${packaging.type}
> [warn]  public: tried
> [warn] 
> https://repo1.maven.org/maven2/javax/ws/rs/javax.ws.rs-api/2.1.1/javax.ws.rs-api-2.1.1.${packaging.type}
> [info] downloading 
> https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-test-utils/2.1.0/kafka-streams-test-utils-2.1.0.jar
>  ...
> [info] [SUCCESSFUL ] 
> org.apache.kafka#kafka-streams-test-utils;2.1.0!kafka-streams-test-utils.jar 
> (344ms)
> [warn] ::
> [warn] :: FAILED DOWNLOADS ::
> [warn] :: ^ see resolution messages for details ^ ::
> [warn] ::
> [warn] :: javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [warn] ::
> [trace] Stack trace suppressed: run 'last *:ssExtractDependencies' for the 
> full output.
> [trace] Stack trace suppressed: run 'last *:update' for the full output.
> [error] (*:ssExtractDependencies) sbt.ResolveException: download failed: 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [error] (*:update) sbt.ResolveException: download failed: 
> javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}
> [error] Total time: 8 s, completed 16-Dec-2018 19:27:21
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=384M; 
> support was removed in 8.0See complete log in  href="file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log">file:/C:/Users/sacha/.IdeaIC2017.1/system/log/sbt.last.log
> {code}
> This seems to be a common issue with bad dependency from Kafka to 
> javax.ws.rs-api.
> if I drop the Kafka version down to 2.0.0 and add this line to my SBT file 
> this error goes away
> {code}
> libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" 
> artifacts(Artifact("javax.ws.rs-api", "jar", "jar"))`
> {code}
>  
> However I would like to work with 2.1.0 version.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4325) Improve processing of late records for window operations

2019-01-14 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-4325.
-
   Resolution: Fixed
Fix Version/s: 2.1.0

As [~mjsax] noted, this need was satisfied by KAFKA-7222.

> Improve processing of late records for window operations
> 
>
> Key: KAFKA-4325
> URL: https://issues.apache.org/jira/browse/KAFKA-4325
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
> Fix For: 2.1.0
>
>
> Windows are kept until their retention time passed. If a late arriving record 
> is processed that is older than any window kept, a new window is created 
> containing this single late arriving record, the aggregation is computed and 
> the window is immediately discarded afterward (as it is older than retention 
> time).
> This behavior might case problems for downstream application as the original 
> window aggregate might we overwritten with the late single-record- aggregate 
> value. Thus, we should rather not process the late arriving record for this 
> case.
> However, data loss might not be acceptable for all use cases. In order to 
> enable the use to not lose any data, window operators should allow to 
> register a handler function that is called instead of just dropping the late 
> arriving record.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-5697) StreamThread.shutdown() need to interrupt the stream threads to break the loop

2018-05-29 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reopened KAFKA-5697:
-
  Assignee: John Roesler

> StreamThread.shutdown() need to interrupt the stream threads to break the loop
> --
>
> Key: KAFKA-5697
> URL: https://issues.apache.org/jira/browse/KAFKA-5697
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: John Roesler
>Priority: Major
>  Labels: newbie
> Fix For: 2.0.0
>
>
> In {{StreamThread.shutdown()}} we currently do nothing but set the state, 
> hoping the stream thread may eventually check it and shutdown itself. 
> However, under certain scenarios the thread may get blocked within a single 
> loop and hence will never check on this state enum. For example, it's 
> {{consumer.poll}} call trigger {{ensureCoordinatorReady()}} which will block 
> until the coordinator can be found. If the coordinator broker is never up and 
> running then the Stream instance will be blocked forever.
> A simple way to produce this issue is to start the work count demo without 
> starting the ZK / Kafka broker, and then it will get stuck in a single loop 
> and even `ctrl-C` will not stop it since its set state will never be read by 
> the thread:
> {code:java}
> [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6783) consumer poll(timeout) blocked infinitely when no available bootstrap server

2018-05-29 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-6783.
-
Resolution: Duplicate

Hey [~koqizhao],

We've merged 
[https://github.com/apache/kafka/commit/c470ff70d3e829c8b12f6eb6cc812c4162071a1f]
 under KAFKA-5697, which should fix this issue. In retrospect, your ticket 
would have been a more appropriate scope for the work, but it's too late to 
change the commit title now.

I'm marking this ticket as a duplicate just to provide a pointer to the ticket 
under which the issue is fixed.

If you feel the issue still persists in some form, please feel free to re-open 
the ticket or start a new one.

Also note, there is a similar issue with several other consumer methods, which 
we plan to fix under KIP-266. Poll is likely to be the only one to make it into 
2.0, though.

Thanks,

-John

> consumer poll(timeout) blocked infinitely when no available bootstrap server
> 
>
> Key: KAFKA-6783
> URL: https://issues.apache.org/jira/browse/KAFKA-6783
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0
>Reporter: Qiang Zhao
>Priority: Major
>  Labels: features
> Fix For: 2.0.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> {code:java}
> @Test
> public void testPollWithAllBootstrapServersDown() throws Exception {
> ExecutorService executor = Executors.newSingleThreadExecutor();
> try {
> final long pollTimeout = 1000;
> final AtomicBoolean pollComplete = new AtomicBoolean();
> executor.submit(new Runnable() {
> @Override
> public void run() {
> Properties props = new Properties();
> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:29092");
> try (KafkaConsumer consumer = 
> newConsumer(props)) {
> consumer.subscribe(Arrays.asList(topic));
> try {
> consumer.poll(pollTimeout);
> } catch (Exception ex) {
> ex.printStackTrace();
> } finally {
> pollComplete.set(true);
> }
> }
> }
> });
> Thread.sleep(pollTimeout * 2);
> Assert.assertTrue("poll timeout not work when all servers down", 
> pollComplete.get());
> } finally {
> executor.shutdown();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6978) Make Streams Window retention time strict

2018-05-31 Thread John Roesler (JIRA)
John Roesler created KAFKA-6978:
---

 Summary: Make Streams Window retention time strict
 Key: KAFKA-6978
 URL: https://issues.apache.org/jira/browse/KAFKA-6978
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Currently, the configured retention time for windows is a lower bound. We 
actually keep the window around until it's time to roll a new segment. At that 
time, we drop all windows in the oldest segment.

As long as a window is still in a segment, we will continue to add 
late-arriving records to it and also serve IQ queries from it. This is sort of 
nice, because it makes optimistic use of the fact that the windows live for 
some time after their retention expires. However, it is also a source of 
(apparent) non-determinism, and it's arguably better for programability if we 
adhere strictly to the configured constraints.

Therefore, the new behavior will be:
 * once the retention time for a window passes, Streams will drop any 
later-arriving records (with a warning log and a metric)

 * likewise, IQ will first check whether the window is younger than its 
retention time before answering queries.

No changes need to be made to the underlying segment management, this is purely 
to make the behavior more strict wrt the configuration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7000) KafkaConsumer.position should wait for assignment metadata

2018-06-05 Thread John Roesler (JIRA)
John Roesler created KAFKA-7000:
---

 Summary: KafkaConsumer.position should wait for assignment metadata
 Key: KAFKA-7000
 URL: https://issues.apache.org/jira/browse/KAFKA-7000
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler
Assignee: John Roesler


While updating Kafka Streams to stop using the deprecated Consumer.poll(long), 
I found that this code unexpectedly throws an exception:
{code:java}
consumer.subscribe(topics);
// consumer.poll(0); <- I've removed this line, which shouldn't be necessary 
here.

final Set partitions = new HashSet<>();
for (final String topic : topics) {
for (final PartitionInfo partition : consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(partition.topic(), 
partition.partition()));
}
}

for (final TopicPartition tp : partitions) {
final long offset = consumer.position(tp);
committedOffsets.put(tp, offset);
}{code}
Here is the exception:
{code:java}
Exception in thread "main" java.lang.IllegalStateException: You can only check 
the position for partitions assigned to this consumer.
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1620)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1586)
   at 
org.apache.kafka.streams.tests.EosTestDriver.getCommittedOffsets(EosTestDriver.java:275)
   at 
org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:148)
   at 
org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:69){code}
 

As you can see in the commented code in my snippet, we used to block for 
assignment with a poll(0), which is now deprecated.

It seems reasonable to me for position() to do the same thing that poll() does, 
which is call `coordinator.poll(timeout.toMillis())` early in processing to 
ensure an up-to-date assignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7072) Kafka Streams may drop rocksb window segments before they expire

2018-06-18 Thread John Roesler (JIRA)
John Roesler created KAFKA-7072:
---

 Summary: Kafka Streams may drop rocksb window segments before they 
expire
 Key: KAFKA-7072
 URL: https://issues.apache.org/jira/browse/KAFKA-7072
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.1, 1.1.0, 0.11.0.2, 1.0.0, 0.11.0.1, 0.11.0.0, 2.0.0
Reporter: John Roesler
Assignee: John Roesler
 Fix For: 2.1.0


The current implementation of Segments used by Rocks Session and Time window 
stores is in conflict with our current timestamp management model.

The current segmentation approach allows configuration of a fixed number of 
segments (let's say *4*) and a fixed retention time. We essentially divide up 
the retention time into the available number of segments:
{quote}{{<-|-|}}
{{   expiration date                 right now}}
{{          \---retention time/}}
{{          |  seg 0  |  seg 1  |  seg 2  |  seg 3  |}}
{quote}
Note that we keep one extra segment so that we can record new events, while 
some events in seg 0 are actually expired (but we only drop whole segments, so 
they just get to hang around.
{quote}{{<-|-|}}
{{       expiration date                 right now}}
{{              \---retention time/}}
{{          |  seg 0  |  seg 1  |  seg 2  |  seg 3  |}}
{quote}
When it's time to provision segment 4, we know that segment 0 is completely 
expired, so we drop it:
{quote}{{<---|-|}}
{{             expiration date                 right now}}
{{                    \---retention time/}}
{{                    |  seg 1  |  seg 2  |  seg 3  |  seg 4  |}}
{quote}
 

However, the current timestamp management model allows for records from the 
future. Namely, because we define stream time as the minimum buffered timestamp 
(nondecreasing), we can have a buffer like this: [ 5, 2, 6 ], and our stream 
time will be 2, but we'll handle a record with timestamp 5 next. referring to 
the example, this means we could wind up having to provision segment 4 before 
segment 0 expires!

Let's say "f" is our future event:
{quote}{{<---|-|f}}
{{             expiration date                 right now}}
{{                    \---retention time/}}
{{                    |  seg 1  |  seg 2  |  seg 3  |  seg 4  |}}
{quote}
{{}}Should we drop segment 0 prematurely? Or should we crash and refuse to 
process "f"?

Today, we do the former, and this is probably the better choice. If we refuse 
to process "f", then we cannot make progress ever again.

Dropping segment 0 prematurely is a bummer, but users could also set the 
retention time high enough that they don't think they'll actually get any 
events late enough to need segment 0. Worst case, since we can have many future 
events without advancing stream time, sparse enough to each require their own 
segment, which would eat deeply into the retention time, dropping many segments 
that should be live.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7080) WindowStoreBuilder incorrectly initializes CachingWindowStore

2018-06-20 Thread John Roesler (JIRA)
John Roesler created KAFKA-7080:
---

 Summary: WindowStoreBuilder incorrectly initializes 
CachingWindowStore
 Key: KAFKA-7080
 URL: https://issues.apache.org/jira/browse/KAFKA-7080
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.1, 1.1.0, 1.0.0, 2.0.0
Reporter: John Roesler
Assignee: John Roesler
 Fix For: 2.1.0


When caching is enabled on the WindowStoreBuilder, it creates a 
CachingWindowStore. However, it incorrectly passes storeSupplier.segments() 
(the number of segments) to the segmentInterval argument.

 

The impact is low, since any valid number of segments is also a valid segment 
size, but it likely results in much smaller segments than intended. For 
example, the segments may be sized 3ms instead of 60,000ms.

 

Ideally the WindowBytesStoreSupplier interface would allow suppliers to 
advertise their segment size instead of segment count. I plan to create a KIP 
to propose this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6978) Make Streams Window retention time strict

2018-06-25 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-6978.
-
   Resolution: Fixed
Fix Version/s: 2.1.0

This feature was merged in 
https://github.com/apache/kafka/commit/954be11bf2d3dc9fa11a69830d2ef5ff580ff533

> Make Streams Window retention time strict
> -
>
> Key: KAFKA-6978
> URL: https://issues.apache.org/jira/browse/KAFKA-6978
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently, the configured retention time for windows is a lower bound. We 
> actually keep the window around until it's time to roll a new segment. At 
> that time, we drop all windows in the oldest segment.
> As long as a window is still in a segment, we will continue to add 
> late-arriving records to it and also serve IQ queries from it. This is sort 
> of nice, because it makes optimistic use of the fact that the windows live 
> for some time after their retention expires. However, it is also a source of 
> (apparent) non-determinism, and it's arguably better for programability if we 
> adhere strictly to the configured constraints.
> Therefore, the new behavior will be:
>  * once the retention time for a window passes, Streams will drop any 
> later-arriving records (with a warning log and a metric)
>  * likewise, IQ will first check whether the window is younger than its 
> retention time before answering queries.
> No changes need to be made to the underlying segment management, this is 
> purely to make the behavior more strict wrt the configuration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7106) Remove segment/segmentInterval from Window definition

2018-06-26 Thread John Roesler (JIRA)
John Roesler created KAFKA-7106:
---

 Summary: Remove segment/segmentInterval from Window definition
 Key: KAFKA-7106
 URL: https://issues.apache.org/jira/browse/KAFKA-7106
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Currently, Window configures segment and segmentInterval properties, but these 
aren't truly properties of a window in general.

Rather, they are properties of the particular implementation that we currently 
have: a segmented store. Therefore, these properties should be moved to 
configure only that implementation.

 

This may be related to KAFKA-4730, since an in-memory window store wouldn't 
necessarily need to be segmented.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7107) Ability to configure state store for JoinWindows in KStream-KStream join

2018-06-26 Thread John Roesler (JIRA)
John Roesler created KAFKA-7107:
---

 Summary: Ability to configure state store for JoinWindows in 
KStream-KStream join
 Key: KAFKA-7107
 URL: https://issues.apache.org/jira/browse/KAFKA-7107
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Currently, the KStream-KStream join operation internally provisions window 
stores to support the JoinWindow configuration.

 

However, unlike all the other stateful processors, it does not allow 
configuration of the stores. We should consider adding DSL methods taking 
Materialized configs for these stores.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7080) WindowStoreBuilder incorrectly initializes CachingWindowStore

2018-07-10 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-7080.
-
Resolution: Fixed

Fixed in https://github.com/apache/kafka/pull/5257

> WindowStoreBuilder incorrectly initializes CachingWindowStore
> -
>
> Key: KAFKA-7080
> URL: https://issues.apache.org/jira/browse/KAFKA-7080
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0, 1.0.1, 1.1.0, 2.0.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.1.0
>
>
> When caching is enabled on the WindowStoreBuilder, it creates a 
> CachingWindowStore. However, it incorrectly passes storeSupplier.segments() 
> (the number of segments) to the segmentInterval argument.
>  
> The impact is low, since any valid number of segments is also a valid segment 
> size, but it likely results in much smaller segments than intended. For 
> example, the segments may be sized 3ms instead of 60,000ms.
>  
> Ideally the WindowBytesStoreSupplier interface would allow suppliers to 
> advertise their segment size instead of segment count. I plan to create a KIP 
> to propose this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7161) KTable Reduce should check for invalid conditions

2018-07-13 Thread John Roesler (JIRA)
John Roesler created KAFKA-7161:
---

 Summary: KTable Reduce should check for invalid conditions
 Key: KAFKA-7161
 URL: https://issues.apache.org/jira/browse/KAFKA-7161
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler
Assignee: John Roesler


KTableReduce has the opportunity to explicitly check if the state is 
inconsistent with the oldValues arriving from the stream. If it did so, it 
could help detect topology changes that needed an app reset and fail fast 
before any data corruption occurs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7222) KIP-328: Add Window Grace Period (and deprecate Window Retention)

2018-07-31 Thread John Roesler (JIRA)
John Roesler created KAFKA-7222:
---

 Summary: KIP-328: Add Window Grace Period (and deprecate Window 
Retention)
 Key: KAFKA-7222
 URL: https://issues.apache.org/jira/browse/KAFKA-7222
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler
Assignee: John Roesler


As described in 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables]

 

This ticket only covers the grace period portion of the work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7223) KIP-328: Add in-memory Suppression

2018-07-31 Thread John Roesler (JIRA)
John Roesler created KAFKA-7223:
---

 Summary: KIP-328: Add in-memory Suppression
 Key: KAFKA-7223
 URL: https://issues.apache.org/jira/browse/KAFKA-7223
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


As described in 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.]

 

This ticket is to implement Suppress, but only for in-memory buffers.

(depends on KAFKA-7222)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7224) KIP-328: Add spill-to-disk for Suppression

2018-07-31 Thread John Roesler (JIRA)
John Roesler created KAFKA-7224:
---

 Summary: KIP-328: Add spill-to-disk for Suppression
 Key: KAFKA-7224
 URL: https://issues.apache.org/jira/browse/KAFKA-7224
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler
Assignee: John Roesler


As described in 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables]

Following on KAFKA-7223, implement the spill-to-disk buffering strategy.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7267) KafkaStreams Scala DSL process method should accept a ProcessorSupplier

2018-08-09 Thread John Roesler (JIRA)
John Roesler created KAFKA-7267:
---

 Summary: KafkaStreams Scala DSL process method should accept a 
ProcessorSupplier
 Key: KAFKA-7267
 URL: https://issues.apache.org/jira/browse/KAFKA-7267
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


scala.KafkaStreams#process currently expects a ()=>Processor, which is 
semantically equivalent to a ProcessorSupplier, but it's the only such method 
to do so.

All the similar methods in the Scala DSL take a Supplier like their Java 
counterparts.

Note that on Scala 2.12+, SAM conversion allows callers to pass either a 
function or a supplier when the parameter is a ProcessorSupplier. (But if the 
parameter is a function, you must pass a function)

But on scala 2.11-, callers will have to pass a function if the parameter is a 
function and a supplier if the parameter is a supplier. This means that 
currently, 2.11 users are confronted with an api that demands they construct 
suppliers for all the methods *except* process, which demands a function.

Mitigating factor: we have some implicits available to convert a Function0 to a 
supplier, and we could add an implicit from ProcessorSupplier to Function0 to 
smooth over the API.

 

What to do about it?

We could just change the existing method to take a ProcessorSupplier instead.
 * 2.12+ users would not notice a difference during compilation, as SAM 
conversion would kick in. However, if they just swap in the new jar without 
recompiling, I think they'd get a MethodDefNotFound error.
 * 2.11- users would not be able to compile their existing code. They'd have to 
swap their function out for a ProcessorSupplier or pull the implicit conversion 
into scope.
 * Note that we can delay this action until we drop 2.11 support, and we would 
break no one.

We could deprecate the existing method and add a new one taking a 
ProcessorSupplier.
 * All scala users would be able to compile their existing code and also swap 
in the new version at runtime.
 * Anyone explicitly passing a function would get a deprecation warning, 
though, regardless of SAM conversion or implicit conversion, since neither 
conversion won't kick in if there's actually a method overload expecting a 
function. This would drive everyone to explicitly create a supplier 
(unnecessarily)

We could leave the existing method without deprecating it and add a new one 
taking a ProcessorSupplier.
 * All scala users would be able to compile their existing code and also swap 
in the new version at runtime.
 * There would be no unfortunate deprecation warnings.
 * The interface would list two process methods, which is untidy.
 * Once we drop 2.11 support, we would just drop the function variant.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7269) KStream.merge is not documented

2018-08-09 Thread John Roesler (JIRA)
John Roesler created KAFKA-7269:
---

 Summary: KStream.merge is not documented
 Key: KAFKA-7269
 URL: https://issues.apache.org/jira/browse/KAFKA-7269
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


If I understand the operator correctly, it should be documented as a stateless 
transformation at 
https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#stateless-transformations



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7270) Add latest releases to streams_upgrade_test.py

2018-08-09 Thread John Roesler (JIRA)
John Roesler created KAFKA-7270:
---

 Summary: Add latest releases to streams_upgrade_test.py
 Key: KAFKA-7270
 URL: https://issues.apache.org/jira/browse/KAFKA-7270
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler
Assignee: John Roesler


namely, 2.0, 1.1.1, 1.0.2, 0.11.3, 1.10.2.2

Note that the relevant older branches need to add the relevant bugfix versions 
for themselves and their ancestor versions.

The 2.0 branch actually needs to add the 2.0 version!

Trunk needs to add all the versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7271) Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers

2018-08-09 Thread John Roesler (JIRA)
John Roesler created KAFKA-7271:
---

 Summary: Fix ignored test in streams_upgrade_test.py: 
test_upgrade_downgrade_brokers
 Key: KAFKA-7271
 URL: https://issues.apache.org/jira/browse/KAFKA-7271
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler
Assignee: John Roesler


Fix in the oldest branch that ignores the test and cherry-pick forward.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7272) Fix ignored test in streams_upgrade_test.py: test_simple_upgrade_downgrade

2018-08-09 Thread John Roesler (JIRA)
John Roesler created KAFKA-7272:
---

 Summary: Fix ignored test in streams_upgrade_test.py: 
test_simple_upgrade_downgrade
 Key: KAFKA-7272
 URL: https://issues.apache.org/jira/browse/KAFKA-7272
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler
Assignee: John Roesler


Fix starting from the oldest version that ignores the test



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7275) Prototype lock-free metrics

2018-08-10 Thread John Roesler (JIRA)
John Roesler created KAFKA-7275:
---

 Summary: Prototype lock-free metrics
 Key: KAFKA-7275
 URL: https://issues.apache.org/jira/browse/KAFKA-7275
 Project: Kafka
  Issue Type: Improvement
  Components: metrics, streams
Reporter: John Roesler
Assignee: John Roesler


Currently, we have to be a little conservative in how granularly we measure 
things to avoid heavy synchronization costs in the metrics.

It should be possible to refactor the thread-safe implementation to use 
volatile and java.util.concurrent.atomic instead and realize a pretty large 
performance improvement.

However, before investing too much time in it, we should run some benchmarks to 
gauge how much improvement we can expect.

I'd propose to run the benchmarks on trunk with debug turned on, and then to 
just remove all synchronization and run again to get an upper-bound performance 
improvement.

If the results are promising, we can start prototyping a lock-free 
implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7277) Migrate Streams API to Duration instead of longMs times

2018-08-10 Thread John Roesler (JIRA)
John Roesler created KAFKA-7277:
---

 Summary: Migrate Streams API to Duration instead of longMs times
 Key: KAFKA-7277
 URL: https://issues.apache.org/jira/browse/KAFKA-7277
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


Right now Streams API unversally represents time as ms-since-unix-epoch.

There's nothing wrong, per se, with this, but Duration is more ergonomic for an 
API.

What we don't want is to present a heterogeneous API, so we need to make sure 
the whole Streams API is in terms of Duration.

 

Implementation note: Durations potentially worsen memory pressure and gc 
performance, so internally, we will still use longMs as the representation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7284) Producer getting fenced may cause Streams to shut down

2018-08-13 Thread John Roesler (JIRA)
John Roesler created KAFKA-7284:
---

 Summary: Producer getting fenced may cause Streams to shut down
 Key: KAFKA-7284
 URL: https://issues.apache.org/jira/browse/KAFKA-7284
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.0.0
Reporter: John Roesler
Assignee: John Roesler


As part of the investigation, I will determine what other versions are affected.

 

In StreamTask, we catch a `ProducerFencedException` and throw a 
`TaskMigratedException`. However, in this case, the `RecordCollectorImpl` is 
throwing a `StreamsException`, caused by `KafkaException` caused by 
`ProducerFencedException`.

In response to a TaskMigratedException, we would rebalance, but when we get a 
StreamsException, streams shuts itself down.

In other words, we intended to do a rebalance in response to a producer fence, 
but actually, we shut down (when the fence happens inside the record collector).


Coincidentally, Guozhang noticed and fixed this in a recent PR: 
[https://github.com/apache/kafka/pull/5428/files#diff-4e5612eeba09dabf30d0b8430f269ff6]

 

The scope of this ticket is to extract that fix and associated tests, and send 
a separate PR to trunk and 2.0, and also to determine what other versions, if 
any, are affected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7222) KIP-328: Add Window Grace Period (and deprecate Window Retention)

2018-08-14 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-7222.
-
   Resolution: Fixed
Fix Version/s: 2.1.0

[https://github.com/apache/kafka/pull/5369] is merged

> KIP-328: Add Window Grace Period (and deprecate Window Retention)
> -
>
> Key: KAFKA-7222
> URL: https://issues.apache.org/jira/browse/KAFKA-7222
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.1.0
>
>
> As described in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables]
>  
> This ticket only covers the grace period portion of the work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7315) Streams serialization docs contain a broken link for Avro

2018-08-20 Thread John Roesler (JIRA)
John Roesler created KAFKA-7315:
---

 Summary: Streams serialization docs contain a broken link for Avro
 Key: KAFKA-7315
 URL: https://issues.apache.org/jira/browse/KAFKA-7315
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


https://kafka.apache.org/documentation/streams/developer-guide/datatypes.html#avro



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6033) Kafka Streams does not work with musl-libc (alpine linux)

2018-08-31 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-6033.
-
Resolution: Won't Fix

This is unfortunately out of our hands.

If you think I'm wrong about this, please reopen the ticket.

Thanks,

-John

> Kafka Streams does not work with musl-libc (alpine linux)
> -
>
> Key: KAFKA-6033
> URL: https://issues.apache.org/jira/browse/KAFKA-6033
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.0
> Environment: Alpine 3.6
>Reporter: Jeffrey Zampieron
>Priority: Major
>
> Using the released version of kafka fails on alpine based images b/c of 
> rocksdb using the jni and failing to load.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4988) JVM crash when running on Alpine Linux

2018-08-31 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-4988.
-
Resolution: Won't Fix

I think this issue is out of our hands.

If you think there is something for us to do, please feel free to reopen the 
ticket and comment.

Thanks,

-John

> JVM crash when running on Alpine Linux
> --
>
> Key: KAFKA-4988
> URL: https://issues.apache.org/jira/browse/KAFKA-4988
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Vincent Rischmann
>Priority: Minor
>
> I'm developing my Kafka Streams application using Docker and I run my jars 
> using the official openjdk:8-jre-alpine image.
> I'm just starting to use windowing and now the JVM crashes because of an 
> issue with RocksDB I think.
> It's trivial to fix on my part, just use the debian jessie based image. 
> However, it would be cool if alpine was supported too since its docker images 
> are quite a bit less heavy
> {quote}
> Exception in thread "StreamThread-1" java.lang.UnsatisfiedLinkError: 
> /tmp/librocksdbjni3285995384052305662.so: Error loading shared library 
> ld-linux-x86-64.so.2: No such file or directory (needed by 
> /tmp/librocksdbjni3285995384052305662.so)
>   at java.lang.ClassLoader$NativeLibrary.load(Native Method)
>   at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
>   at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
>   at java.lang.Runtime.load0(Runtime.java:809)
>   at java.lang.System.load(System.java:1086)
>   at 
> org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
>   at 
> org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
>   at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
>   at org.rocksdb.RocksDB.(RocksDB.java:35)
>   at org.rocksdb.Options.(Options.java:22)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:115)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:148)
>   at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7f60f34ce088, pid=1, tid=0x7f60f3705ab0
> #
> # JRE version: OpenJDK Runtime Environment (8.0_121-b13) (build 1.8.0_121-b13)
> # Java VM: OpenJDK 64-Bit Server VM (25.121-b13 mixed mode linux-amd64 
> compressed oops)
> # Derivati

[jira] [Created] (KAFKA-7367) Verify that Streams never creates RocksDB stores unless they are needed

2018-08-31 Thread John Roesler (JIRA)
John Roesler created KAFKA-7367:
---

 Summary: Verify that Streams never creates RocksDB stores unless 
they are needed
 Key: KAFKA-7367
 URL: https://issues.apache.org/jira/browse/KAFKA-7367
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


We have gotten some reports of Streams creating RocksDB stores unnecessarily 
for stateless processes.

We can and should verify this doesn't happen by creating integration tests for 
*every* stateless operator that verify that after processing, the state 
directory is still empty.

These tests could potentially be backported as far as we care to so that we can 
identify and fix potential unnecessary stores in past versions as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7368) Support joining Windowed KTables

2018-08-31 Thread John Roesler (JIRA)
John Roesler created KAFKA-7368:
---

 Summary: Support joining Windowed KTables
 Key: KAFKA-7368
 URL: https://issues.apache.org/jira/browse/KAFKA-7368
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Currently, there is no good way to join two `KTable, V>`, aka 
windowed KTables.

They are KTable, so they have a `join` operator available, but it currently 
will use a regular KeyValue store instead of a Windowed store, so it will grow 
without bound and new windows enter.

One option is to convert both KTables into KStream, and join them (which is a 
windowed join), and then convert them back into KTables for further processing, 
but this is an awkward way to accomplish an apparently straightforward task.

It should instead be possible to directly support it, but the trick will be to 
make it impossible to accidentally use a window store for normal (aka 
non-windowed) KTables.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7386) Streams Scala wrapper should not cache serdes

2018-09-07 Thread John Roesler (JIRA)
John Roesler created KAFKA-7386:
---

 Summary: Streams Scala wrapper should not cache serdes
 Key: KAFKA-7386
 URL: https://issues.apache.org/jira/browse/KAFKA-7386
 Project: Kafka
  Issue Type: Bug
Reporter: John Roesler
Assignee: John Roesler


for example, 
[https://github.com/apache/kafka/blob/trunk/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala#L28]
 invokes Serdes.String() once and caches the result.

However, the implementation of the String serde has a non-empty configure 
method that is variant in whether it's used as a key or value serde. So we 
won't get correct execution if we create one serde and use it for both keys and 
values.

The fix is simple: change all the `val` declarations in scala.Serdes to `def`. 
Thanks to the referential transparency for parameterless methods in scala, no 
user-facing code will break.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7393) Consider making suppression node names independent of topologically numbered names.

2018-09-10 Thread John Roesler (JIRA)
John Roesler created KAFKA-7393:
---

 Summary: Consider making suppression node names independent of 
topologically numbered names.
 Key: KAFKA-7393
 URL: https://issues.apache.org/jira/browse/KAFKA-7393
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


See [https://github.com/apache/kafka/pull/5567#discussion_r214188984] (please 
keep the discussion on this Jira ticket).

There is an opportunity to use a slightly different naming strategy for 
suppression nodes so that they can be moved around the topology without 
upsetting other nodes' names.

The main downside is that it might be confusing to have a separate naming 
strategy for just this one kind of node.

Then again, this could create important opportunities for topology-compatible 
optimizations.

 

At a higher design/implementation cost, but a lower ongoing complexity, we 
could consider this naming approach for all node types.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7399) FunctionConversions in Streams-Scala should be private

2018-09-11 Thread John Roesler (JIRA)
John Roesler created KAFKA-7399:
---

 Summary: FunctionConversions in Streams-Scala should be private
 Key: KAFKA-7399
 URL: https://issues.apache.org/jira/browse/KAFKA-7399
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


FunctionConversions defines several implicit conversions for internal use, but 
the class was made public accidentally.

We should deprecate it and replace it with an equivalent private class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7402) Kafka Streams should implement AutoCloseable where appropriate

2018-09-11 Thread John Roesler (JIRA)
John Roesler created KAFKA-7402:
---

 Summary: Kafka Streams should implement AutoCloseable where 
appropriate
 Key: KAFKA-7402
 URL: https://issues.apache.org/jira/browse/KAFKA-7402
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Various components in Streams have close methods but do not implement 
AutoCloseable. This means that they can't be used in try-with-resources blocks.

Remedying that would simplify our tests and make life easier for users as well.

KafkaStreams itself is a notable example of this, but we can take the 
opportunity to look for other components that make sense as AutoCloseable as 
well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7435) Consider standardizing the config object pattern on interface/implementation.

2018-09-24 Thread John Roesler (JIRA)
John Roesler created KAFKA-7435:
---

 Summary: Consider standardizing the config object pattern on 
interface/implementation.
 Key: KAFKA-7435
 URL: https://issues.apache.org/jira/browse/KAFKA-7435
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler
 Fix For: 3.0.0


Currently, the majority of Streams's config objects are structured as a 
"external" builder class (with protected state) and an "internal" subclass 
exposing getters to the state. This is serviceable, but there is an alternative 
we can consider: to use an interface for the external API and the 
implementation class for the internal one.

Advantages:
 * we could use private state, which improves maintainability
 * the setters and getters would all be defined in the same class, improving 
readability
 * users browsing the public API would be able to look at an interface that 
contains less extraneous internal details than the current class
 * there is more flexibility in implementation

Alternatives
 * instead of external-class/internal-subclass, we could use an external 
*final* class with package-protected state and an internal accessor class (not 
a subclass, obviously). This would make it impossible for users to try and 
create custom subclasses of our config objects, which is generally not allowed 
already, but is currently a runtime class cast exception.

Example implementation: [https://github.com/apache/kafka/pull/5677]

This change would break binary, but not source, compatibility, so the earliest 
we could consider it is 3.0.

To be clear, I'm *not* saying this *should* be done, just calling for a 
discussion. Otherwise, I'd make a KIP.

Thoughts?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7477) Improve Streams close timeout semantics

2018-10-03 Thread John Roesler (JIRA)
John Roesler created KAFKA-7477:
---

 Summary: Improve Streams close timeout semantics
 Key: KAFKA-7477
 URL: https://issues.apache.org/jira/browse/KAFKA-7477
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


See [https://github.com/apache/kafka/pull/5682#discussion_r221473451]

The current timeout semantics are a little "magical":
 * 0 means to block forever
 * negative numbers cause the close to complete immediately without checking 
the state

I think this would make more sense:
 * reject negative numbers
 * make 0 just signal and return immediately (after checking the state once)
 * if I want to wait "forever", I can use {{ofYears(1)}} or 
{{ofMillis(Long.MAX_VALUE)}} or some other intuitively "long enough to be 
forever" value instead of a magic value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7393) Consider making suppression node names independent of topologically numbered names.

2018-10-03 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-7393.
-
Resolution: Won't Fix

We realized that Suppressed needs to support a name parameter in keeping with 
KIP-372, which provides a way to solve this problem. If you supply `withName` 
to Suppressed, it will be independent of topologically numbered names, as 
desired.

> Consider making suppression node names independent of topologically numbered 
> names.
> ---
>
> Key: KAFKA-7393
> URL: https://issues.apache.org/jira/browse/KAFKA-7393
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> See [https://github.com/apache/kafka/pull/5567#discussion_r214188984] (please 
> keep the discussion on this Jira ticket).
> There is an opportunity to use a slightly different naming strategy for 
> suppression nodes so that they can be moved around the topology without 
> upsetting other nodes' names.
> The main downside is that it might be confusing to have a separate naming 
> strategy for just this one kind of node.
> Then again, this could create important opportunities for topology-compatible 
> optimizations.
>  
> At a higher design/implementation cost, but a lower ongoing complexity, we 
> could consider this naming approach for all node types.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7852) Add public version of EmbeddedKafkaCluster

2019-01-22 Thread John Roesler (JIRA)
John Roesler created KAFKA-7852:
---

 Summary: Add public version of EmbeddedKafkaCluster
 Key: KAFKA-7852
 URL: https://issues.apache.org/jira/browse/KAFKA-7852
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


Currently, Kafka Client and Streams applications do not have good support for 
writing integration tests.

Streams added the TopologyTestDriver, which is a much more efficient approach 
for testing Streams applications, specifically, but it's still more in the 
domain of unit testing.

 

For integration tests, the current state is that people import test artifacts 
from Kafka, which is not a well controlled public API.

 

It might be possible to offer a shim implementation of Kafka for testing, but 
the API is so large and complicated that this seems like a huge effort that's 
not likely to achieve or maintain perfect fidelity.

Therefore, it seems like the best thing would just be to clean up the 
EmbeddedKafkaCluster and offer it in a public test-utils module.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7852) Add public version of EmbeddedKafkaCluster

2019-01-23 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-7852.
-
Resolution: Duplicate

> Add public version of EmbeddedKafkaCluster
> --
>
> Key: KAFKA-7852
> URL: https://issues.apache.org/jira/browse/KAFKA-7852
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Priority: Major
>
> Currently, Kafka Client and Streams applications do not have good support for 
> writing integration tests.
> Streams added the TopologyTestDriver, which is a much more efficient approach 
> for testing Streams applications, specifically, but it's still more in the 
> domain of unit testing.
>  
> For integration tests, the current state is that people import test artifacts 
> from Kafka, which is not a well controlled public API.
>  
> It might be possible to offer a shim implementation of Kafka for testing, but 
> the API is so large and complicated that this seems like a huge effort that's 
> not likely to achieve or maintain perfect fidelity.
> Therefore, it seems like the best thing would just be to clean up the 
> EmbeddedKafkaCluster and offer it in a public test-utils module.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7223) KIP-328: Add in-memory Suppression

2019-01-24 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-7223.
-
Resolution: Fixed

> KIP-328: Add in-memory Suppression
> --
>
> Key: KAFKA-7223
> URL: https://issues.apache.org/jira/browse/KAFKA-7223
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.1.0
>
>
> As described in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.]
>  
> This ticket is to implement Suppress, but only for in-memory buffers.
> (depends on KAFKA-7222)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7916) Streams store cleanup: unify wrapping

2019-02-11 Thread John Roesler (JIRA)
John Roesler created KAFKA-7916:
---

 Summary: Streams store cleanup: unify wrapping
 Key: KAFKA-7916
 URL: https://issues.apache.org/jira/browse/KAFKA-7916
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler
Assignee: John Roesler


The internal store handling in Streams has become quite complex, with many 
layers of wrapping for different bookeeping operations.

The first thing we can do about this is to at least unify the wrapping 
strategy, such that *all* store wrappers extend WrappedStateStore. This would 
make the code easier to understand, since all wrappers would have the same 
basic shape.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7917) Streams store cleanup: collapse layers

2019-02-11 Thread John Roesler (JIRA)
John Roesler created KAFKA-7917:
---

 Summary: Streams store cleanup: collapse layers
 Key: KAFKA-7917
 URL: https://issues.apache.org/jira/browse/KAFKA-7917
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


Following on KAFKA-7916, we can consider collapsing the "streams management 
layers" into one.

Right now, we have:
 * metering (also handles moving from pojo world to bytes world)
 * change-logging
 * caching

This is good compositional style, but we also have some runtime overhead of 
calling through all these layers, as well as some mental overhead of 
understanding how many and which layers we are going through.

Also, there are dependencies between the caching and change-logging layers.

I _think_ it would simplify the code if we collapsed these into one layer with 
boolean switches to turn on or off the different aspects. (rather than wrapping 
the store with the different layers or not depending on the same boolean 
conditions)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-02-11 Thread John Roesler (JIRA)
John Roesler created KAFKA-7918:
---

 Summary: Streams store cleanup: inline byte-store generic 
parameters
 Key: KAFKA-7918
 URL: https://issues.apache.org/jira/browse/KAFKA-7918
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Currently, the fundamental layer of stores in Streams is the "bytes store".

The easiest way to identify this is in `org.apache.kafka.streams.state.Stores`, 
all the `StoreBuilder`s require a `XXBytesStoreSupplier`. 

We provide several implementations of these bytes stores, typically an 
in-memory one and a persistent one (aka RocksDB).

Inside these bytes stores, the key is always `Bytes` and the value is always 
`byte[]` (serialization happens at a higher level). However, the store 
implementations are generically typed, just `K` and `V`.

This is good for flexibility, but it makes the code a little harder to 
understand. I think that we used to do serialization at a lower level, so the 
generics are a hold-over from that.

It would simplify the code if we just inlined the actual k/v types and maybe 
even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
`InMemoryKeyValueBytesStore`, and so forth.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7919) Reorganize Stores builders

2019-02-11 Thread John Roesler (JIRA)
John Roesler created KAFKA-7919:
---

 Summary: Reorganize Stores builders
 Key: KAFKA-7919
 URL: https://issues.apache.org/jira/browse/KAFKA-7919
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


I have heard complaints from a few people that they find the whole process of 
using `Materialized`, `Stores`, `StoreBuilder`s, and `StoreSupplier`s confusing.

I think it would help if we separated Stores into separate StoreBuilders and 
BytesStoreSuppliers factory classes. Or maybe even break the suppliers factory 
down further into `KeyValueBytesStoreSuppliers`, etc. Then, the javadocs in 
`Materialized.as` can point people to the right factory class to use, and, when 
they get there, they'll only be confronted with options they can make use of.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7917) Streams store cleanup: collapse layers

2019-02-13 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-7917.
-
Resolution: Won't Fix

> Streams store cleanup: collapse layers
> --
>
> Key: KAFKA-7917
> URL: https://issues.apache.org/jira/browse/KAFKA-7917
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> Following on KAFKA-7916, we can consider collapsing the "streams management 
> layers" into one.
> Right now, we have:
>  * metering (also handles moving from pojo world to bytes world)
>  * change-logging
>  * caching
> This is good compositional style, but we also have some runtime overhead of 
> calling through all these layers, as well as some mental overhead of 
> understanding how many and which layers we are going through.
> Also, there are dependencies between the caching and change-logging layers.
> I _think_ it would simplify the code if we collapsed these into one layer 
> with boolean switches to turn on or off the different aspects. (rather than 
> wrapping the store with the different layers or not depending on the same 
> boolean conditions)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7928) Deprecate WindowStore.put(key, value)

2019-02-13 Thread John Roesler (JIRA)
John Roesler created KAFKA-7928:
---

 Summary: Deprecate WindowStore.put(key, value)
 Key: KAFKA-7928
 URL: https://issues.apache.org/jira/browse/KAFKA-7928
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


Specifically, `org.apache.kafka.streams.state.WindowStore#put(K, V)`

This method is strange... A window store needs to have a timestamp associated 
with the key, so if you do a put without a timestamp, it's up to the store to 
just make one up.

Even the javadoc on the method recommends not to use it, due to this confusing 
behavior.

We should just deprecate it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7916) Streams store cleanup: unify wrapping

2019-02-14 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-7916.
-
Resolution: Fixed

> Streams store cleanup: unify wrapping
> -
>
> Key: KAFKA-7916
> URL: https://issues.apache.org/jira/browse/KAFKA-7916
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> The internal store handling in Streams has become quite complex, with many 
> layers of wrapping for different bookeeping operations.
> The first thing we can do about this is to at least unify the wrapping 
> strategy, such that *all* store wrappers extend WrappedStateStore. This would 
> make the code easier to understand, since all wrappers would have the same 
> basic shape.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7932) Streams needs to handle new Producer exceptions

2019-02-14 Thread John Roesler (JIRA)
John Roesler created KAFKA-7932:
---

 Summary: Streams needs to handle new Producer exceptions
 Key: KAFKA-7932
 URL: https://issues.apache.org/jira/browse/KAFKA-7932
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Following on KAFKA-7763, Streams needs to handle the new behavior.

See also https://github.com/apache/kafka/pull/6066

Streams code (StreamTask.java) needs to be modified to handle the new exception.

Also, from another upstream change, `initTxn` can also throw TimeoutException 
now: default `MAX_BLOCK_MS_CONFIG` in producer is 60 seconds, so I think just 
wrapping it as StreamsException should be reasonable, similar to what we do for 
`producer#send`'s TimeoutException 
([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L220-L225]
 ).

 

Note we need to handle in three functions: init/commit/abortTxn.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7943) Add Suppress system test with caching disabled.

2019-02-18 Thread John Roesler (JIRA)
John Roesler created KAFKA-7943:
---

 Summary: Add Suppress system test with caching disabled.
 Key: KAFKA-7943
 URL: https://issues.apache.org/jira/browse/KAFKA-7943
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7944) Add more natural Suppress test

2019-02-18 Thread John Roesler (JIRA)
John Roesler created KAFKA-7944:
---

 Summary: Add more natural Suppress test
 Key: KAFKA-7944
 URL: https://issues.apache.org/jira/browse/KAFKA-7944
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Can be integration or system test.

The idea is to test with tighter time bounds with windows of say 30 seconds and 
use system time without adding any extra time for verification.

Currently, all the tests rely on artificially advancing system time, which 
should be reliable, but you never know. So, we want to add a test that works 
exactly like production code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8040) Streams needs to retry initTransactions

2019-03-04 Thread John Roesler (JIRA)
John Roesler created KAFKA-8040:
---

 Summary: Streams needs to retry initTransactions
 Key: KAFKA-8040
 URL: https://issues.apache.org/jira/browse/KAFKA-8040
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1
Reporter: John Roesler
Assignee: John Roesler
 Fix For: 2.3.0


Following on KAFKA-7763, Streams needs to handle the new behavior.

See also [https://github.com/apache/kafka/pull/6066]

Streams code (StreamTask.java) needs to be modified to handle the new exception.

Also, from another upstream change, `initTxn` can also throw TimeoutException 
now: default `MAX_BLOCK_MS_CONFIG` in producer is 60 seconds, so I think just 
wrapping it as StreamsException should be reasonable, similar to what we do for 
`producer#send`'s TimeoutException 
([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L220-L225]
 ).

 

Note we need to handle in three functions: init/commit/abortTxn.

 

See also https://github.com/apache/kafka/pull/6066#issuecomment-464403448



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8080) Remove streams_eos_test system test

2019-03-08 Thread John Roesler (JIRA)
John Roesler created KAFKA-8080:
---

 Summary: Remove streams_eos_test system test
 Key: KAFKA-8080
 URL: https://issues.apache.org/jira/browse/KAFKA-8080
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


After KAFKA-7944 / [https://github.com/apache/kafka/pull/6382] , the system 
test streams_eos_test.py is mostly redundant.

Quoting my analysis from 
[https://github.com/apache/kafka/pull/6382#discussion_r263536548,] 
{quote}Ok, so the smoke test and the eos test are similar, but not identical.

The smoke test application has more features in it, though. So, we have more 
feature coverage under eos when we test with the smoke test.

The eos test evaluates two topologies one with no repartition, and one with a 
repartition. The smoke test topology contains repartitions, so it only tests 
_with_ repartition. I think that it should be sufficient to test only _with_ 
repartition.

The eos test verification specifically checks that "all transactions finished" 
({{org.apache.kafka.streams.tests.EosTestDriver#verifyAllTransactionFinished}}).
 I'm not clear on exactly what we're looking for here. It looks like we create 
a transactional producer and send a record to each partition and then expect to 
get all those records back, without seeing any other records. But I'm not sure 
why we're doing this. If we want to drop the eos test, then we might need to 
add this to the smoke test verification.
{quote}
And [~guozhang]'s reply:
{quote}{{verifyAllTransactionFinished}} aimed to avoid a situation that some 
dangling txn is open forever, without committing or aborting. Because the 
consumer needs to guarantee offset ordering when returning data, with 
read-committed they will also be blocked on those open txn's data forever (this 
usually indicates a broker-side issue, not streams though, but still).

I think we should still retain this check if we want to merge in the EosTest to 
SmokeTest.
{quote}
 

As described above, the task is simply to add a similar check to the end of the 
verification logic in the SmokeTestDriver. Then, we can remove the EOS system 
test, as well as all the Java code that supports it.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7944) Add more natural Suppress test

2019-03-12 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-7944.
-
Resolution: Fixed

> Add more natural Suppress test
> --
>
> Key: KAFKA-7944
> URL: https://issues.apache.org/jira/browse/KAFKA-7944
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> Can be integration or system test.
> The idea is to test with tighter time bounds with windows of say 30 seconds 
> and use system time without adding any extra time for verification.
> Currently, all the tests rely on artificially advancing system time, which 
> should be reliable, but you never know. So, we want to add a test that works 
> exactly like production code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8111) KafkaProducer can't produce data

2019-03-15 Thread John Roesler (JIRA)
John Roesler created KAFKA-8111:
---

 Summary: KafkaProducer can't produce data
 Key: KAFKA-8111
 URL: https://issues.apache.org/jira/browse/KAFKA-8111
 Project: Kafka
  Issue Type: Bug
  Components: clients, core
Affects Versions: 2.3.0
Reporter: John Roesler


Using a Producer from the current trunk (a6691fb79), I'm unable to produce data 
to a 2.2 broker.

tl;dr;, I narrowed down the problem to 
[https://github.com/apache/kafka/commit/a42f16f98] . My hypothesis is that some 
part of that commit broke backward compatibility with older brokers.

 

Repro steps:

I'm using this Producer config:
{noformat}
final Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getCanonicalName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getCanonicalName());
return properties;{noformat}
 # create a simple Producer to produce test data to a broker
 # build against commmit a42f16f98 
 # start an older broker. (I was using 2.1, and someone else reproduced it with 
2.2)
 # run your producer and note that it doesn't produce data (seems to hang, I 
see it produce 2 records in 1 minute)
 # build against the predecessor commit 65aea1f36
 # run your producer and note that it DOES produce data (I see it produce 1M 
records every 15 second)

I've also confirmed that if I check out the current trunk (a6691fb79e2c55b3) 
and revert a42f16f98, I also observe that it produces as expected (1M every 15 
seconds).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8155) Update Streams system tests for 2.2.0 and 2.1.1 releases

2019-03-25 Thread John Roesler (JIRA)
John Roesler created KAFKA-8155:
---

 Summary: Update Streams system tests for 2.2.0 and 2.1.1 releases
 Key: KAFKA-8155
 URL: https://issues.apache.org/jira/browse/KAFKA-8155
 Project: Kafka
  Issue Type: Task
  Components: streams, system tests
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8167) Document named stateful operators

2019-03-27 Thread John Roesler (JIRA)
John Roesler created KAFKA-8167:
---

 Summary: Document named stateful operators
 Key: KAFKA-8167
 URL: https://issues.apache.org/jira/browse/KAFKA-8167
 Project: Kafka
  Issue Type: Task
  Components: documentation, streams
Affects Versions: 2.1.1, 2.2.0, 2.1.0, 2.3.0, 2.1.2, 2.2.1
Reporter: John Roesler


In KIP-372 / KAFKA-7406, we added the ability to name all persistent resources 
in support of topology compatibility. We missed documenting it, though, or at 
least, I couldn't find the docs.

 

Since this feature is a higher-level, cross-cutting concern, we should add a 
section to the docs describing the compatibility problem and the full set of 
practices that you can employ to achieve compatibility by naming persistent 
resources.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8189) Streams should have an option to require names for stateful components

2019-04-04 Thread John Roesler (JIRA)
John Roesler created KAFKA-8189:
---

 Summary: Streams should have an option to require names for 
stateful components
 Key: KAFKA-8189
 URL: https://issues.apache.org/jira/browse/KAFKA-8189
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


With the introduction of 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping]
 , in conjunction with naming state via Materialized and Suppressed, Streams 
users have the ability to alter their topologies and restart without losing 
track of operator state or repartition topics.

It would be a robust pattern to always name stateful components, but this 
pattern is vulnerable to simple coding and configuration mistakes. If 
developers lose vigilence even once and deploy a topology with *any* state not 
named, the only way to correct it is with an application reset.

Streams can support topology compatibility by offering a config option to 
require names on all stateful components. Then, if someone accidentally adds an 
anonymous stateful operator, Streams would throw an exception instead of 
generating a name, preserving the integrity of the application.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8204) Streams may flush state stores in the incorrect order

2019-04-09 Thread John Roesler (JIRA)
John Roesler created KAFKA-8204:
---

 Summary: Streams may flush state stores in the incorrect order
 Key: KAFKA-8204
 URL: https://issues.apache.org/jira/browse/KAFKA-8204
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler
Assignee: John Roesler


Cached state stores may forward records during a flush call, so Streams should 
flush the stores in topological order. Otherwise, Streams may flush a 
downstream store before an upstream one, resulting in sink results being 
committed without the corresponding state changelog updates being committed.

This behavior is partly responsible for the bug reported in KAFKA-7895 .

The fix is simply to flush the stores in topological order, then when the 
upstream store forwards records to a downstream stateful processor, the 
corresponding state changes will be correctly flushed as well.

An alternative would be to repeatedly call flush on all state stores until they 
report there is nothing left to flush, but this requires a public API change to 
enable state stores to report whether they need a flush or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

2019-04-10 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reopened KAFKA-7895:
-

> Ktable supress operator emitting more than one record for the same key per 
> window
> -
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.1.1
>Reporter: prasanthi
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.2.0, 2.1.2
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable, Long> windowedCount = groupedStream
>  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>  .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
> [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
> [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
> [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
> [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use 
> system time without adding any extra time for verification
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8254) Suppress incorrectly passes a null topic to the serdes

2019-04-18 Thread John Roesler (JIRA)
John Roesler created KAFKA-8254:
---

 Summary: Suppress incorrectly passes a null topic to the serdes
 Key: KAFKA-8254
 URL: https://issues.apache.org/jira/browse/KAFKA-8254
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.1.1, 2.2.0, 2.1.0
Reporter: John Roesler
 Fix For: 2.3.0, 2.1.2, 2.2.1


For example, in KTableSuppressProcessor, we do:
{noformat}
final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(null, 
key));
{noformat}

This violates the contract of Serializer (and Deserializer), and breaks 
integration with known Serde implementations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8260) Flaky test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2019-04-18 Thread John Roesler (JIRA)
John Roesler created KAFKA-8260:
---

 Summary: Flaky test 
ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
 Key: KAFKA-8260
 URL: https://issues.apache.org/jira/browse/KAFKA-8260
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.3.0
Reporter: John Roesler


I have seen this fail again just now. See also KAFKA-7965 and KAFKA-7936.

https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3874/testReport/junit/kafka.api/ConsumerBounceTest/testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup/

{noformat}
Error Message
org.scalatest.junit.JUnitTestFailedError: Should have received an class 
org.apache.kafka.common.errors.GroupMaxSizeReachedException during the cluster 
roll
Stacktrace
org.scalatest.junit.JUnitTestFailedError: Should have received an class 
org.apache.kafka.common.errors.GroupMaxSizeReachedException during the cluster 
roll
at 
org.scalatest.junit.AssertionsForJUnit.newAssertionFailedException(AssertionsForJUnit.scala:100)
at 
org.scalatest.junit.AssertionsForJUnit.newAssertionFailedException$(AssertionsForJUnit.scala:99)
at 
org.scalatest.junit.JUnitSuite.newAssertionFailedException(JUnitSuite.scala:71)
at org.scalatest.Assertions.fail(Assertions.scala:1089)
at org.scalatest.Assertions.fail$(Assertions.scala:1085)
at org.scalatest.junit.JUnitSuite.fail(JUnitSuite.scala:71)
at 
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:350)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8280) Flaky Test DynamicBrokerReconfigurationTest#testUncleanLeaderElectionEnable

2019-04-23 Thread John Roesler (JIRA)
John Roesler created KAFKA-8280:
---

 Summary: Flaky Test 
DynamicBrokerReconfigurationTest#testUncleanLeaderElectionEnable
 Key: KAFKA-8280
 URL: https://issues.apache.org/jira/browse/KAFKA-8280
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.3.0
Reporter: John Roesler


I saw this fail again on 
https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3979/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/testUncleanLeaderElectionEnable/

{noformat}
Error Message
java.lang.AssertionError: Unclean leader not elected
Stacktrace
java.lang.AssertionError: Unclean leader not elected
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at 
kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable(DynamicBrokerReconfigurationTest.scala:510)
{noformat}

{noformat}
Standard Output
Completed Updating config for entity: brokers '0'.
Completed Updating config for entity: brokers '1'.
Completed Updating config for entity: brokers '2'.
[2019-04-23 01:17:11,690] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
fetcherId=1] Error for partition testtopic-6 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-04-23 01:17:11,690] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
fetcherId=1] Error for partition testtopic-0 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-04-23 01:17:11,722] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
fetcherId=0] Error for partition testtopic-7 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-04-23 01:17:11,722] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
fetcherId=0] Error for partition testtopic-1 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-04-23 01:17:11,760] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
fetcherId=1] Error for partition testtopic-6 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-04-23 01:17:11,761] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
fetcherId=1] Error for partition testtopic-0 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-04-23 01:17:11,765] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
fetcherId=0] Error for partition testtopic-3 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-04-23 01:17:11,765] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
fetcherId=0] Error for partition testtopic-9 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-04-23 01:17:13,779] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
fetcherId=1] Error for partition __consumer_offsets-8 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
fetcherId=1] Error for partition __consumer_offsets-38 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
fetcherId=1] Error for partition __consumer_offsets-2 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
fetcherId=1] Error for partition __consumer_offsets-14 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
fetcherId=1] Error for partition __consumer_offsets-20 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
fetcherId=1] Error for pa

[jira] [Created] (KAFKA-8307) Kafka Streams should provide some mechanism to determine topology equality and compatibility

2019-04-30 Thread John Roesler (JIRA)
John Roesler created KAFKA-8307:
---

 Summary: Kafka Streams should provide some mechanism to determine 
topology equality and compatibility
 Key: KAFKA-8307
 URL: https://issues.apache.org/jira/browse/KAFKA-8307
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Currently, Streams provides no mechanism to compare two topologies. This is a 
common operation when users want to have tests verifying that they don't 
accidentally alter their topology. They would save the known-good topology and 
then add a unit test verifying the current code against that known-good state.

However, because there's no way to do this comparison properly, everyone is 
reduced to using the string format of the topology (from 
`Topology#describe().toString()`). The major drawback is that the string format 
is meant for human consumption. It is neither machine-parseable nor stable. So, 
these compatibility tests are doomed to fail when any minor, non-breaking, 
change is made either to the application, or to the library. This trains 
everyone to update the test whenever it fails, undermining its utility.

We should fix this problem, and provide both a mechanism to serialize the 
topology and to compare two topologies for compatibility. All in all, I think 
we need:
# a way to serialize/deserialize topology structure in a machine-parseable 
format that is future-compatible. Offhand, I'd recommend serializing the 
topology structure as JSON, and establishing a policy that attributes should 
only be added to the object graph, never removed. Note, it's out of scope to be 
able to actually run a deserialized topology; we only want to save and load the 
structure (not the logic) to facilitate comparisons.
# a method to verify the *equality* of two topologies... This method tells you 
that the two topologies are structurally identical. We can't know if the logic 
of any operator has changed, only if the structure of the graph is changed. We 
can consider whether other graph properties, like serdes, should be included.
# a method to verify the *compatibility* of two topologies... This method tells 
you that moving from topology A to topology B does not require an application 
reset. Note that this operation is not commutative: `A.compatibleWith(B)` does 
not imply `B.compatibleWith(A)`. We can discuss whether `A.compatibleWith(B) && 
B.compatibleWith(A)` implies `A.equals(B)` (I think not necessarily, because we 
may want "equality" to be stricter than "compatibility").




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8318) Session Window Aggregations generate an extra tombstone

2019-05-03 Thread John Roesler (JIRA)
John Roesler created KAFKA-8318:
---

 Summary: Session Window Aggregations generate an extra tombstone
 Key: KAFKA-8318
 URL: https://issues.apache.org/jira/browse/KAFKA-8318
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


See the discussion 
https://github.com/apache/kafka/pull/6654#discussion_r280231439

The session merging logic generates a tombstone in addition to an update when 
the session window already exists. It's not a correctness issue, just a small 
performance hit, because that tombstone is immediately invalidated by the 
update.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8396) Clean up Transformer API

2019-05-20 Thread John Roesler (JIRA)
John Roesler created KAFKA-8396:
---

 Summary: Clean up Transformer API
 Key: KAFKA-8396
 URL: https://issues.apache.org/jira/browse/KAFKA-8396
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Currently, KStream operators transformValues and flatTransformValues disable 
context forwarding, and force operators to just return the new values.

The reason is that we wanted to prevent the key from changing, since the whole 
point of a `xValues` transformation is that we _do not_ change the key, and 
hence don't need to repartition.

However, the chosen mechanism has some drawbacks: The Transform concept is 
basically a way to plug in a custom Processor within the Streams DSL, but these 
restrictions make it more like a MapValues with access to the context. For 
example, even though you can still schedule punctuations, there's no way to 
forward values as a result of them. So, as a user, it's hard to build a mental 
model of how to use a TransformValues (because it's not quite a Transformer and 
not quite a Mapper).

Also, logically, a Transformer can call forward as much as it wants, so a 
Transformer and a FlatTransformer are effectively the same thing. Then, we also 
have TransformValues and FlatTransformValues that are also two more versions of 
the same thing, just to implement the key restrictions. Internally, some of 
these can send downstream by returning OR forwarding, and others can only 
return. It's a lot for users to keep in mind.

We can clean up this API significantly by just allowing all transformers to 
call `forward`. In the `Values` case, we can wrap the ProcessorContext in one 
that checks the key is `equal` to the one that got passed in (i.e., saves a 
reference and enforces equality with that reference in any call to `forward`). 
Then, we can actually deprecate the `*ValueTransformer*` interfaces and remove 
the restriction about calling forward.

We can consider a further cleanup (TBD) to deprecate the existing Transformer 
interface entirely, and replace it with one with a `void` return type. Then, 
the Transform and FlatTransform cases collapse together, and we just need 
Transform and TransformValues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8403) Suppress needs a Materialized variant

2019-05-21 Thread John Roesler (JIRA)
John Roesler created KAFKA-8403:
---

 Summary: Suppress needs a Materialized variant
 Key: KAFKA-8403
 URL: https://issues.apache.org/jira/browse/KAFKA-8403
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


The newly added KTable Suppress operator lacks a Materialized variant, which 
would be useful if you wanted to query the results of the suppression.

Suppression results will eventually match the upstream results, but the 
intermediate distinction may be meaningful for some applications. For example, 
you could want to query only the final results of a windowed aggregation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8410) Strengthen the types of Processors, at least in the DSL, maybe in the PAPI as well

2019-05-22 Thread John Roesler (JIRA)
John Roesler created KAFKA-8410:
---

 Summary: Strengthen the types of Processors, at least in the DSL, 
maybe in the PAPI as well
 Key: KAFKA-8410
 URL: https://issues.apache.org/jira/browse/KAFKA-8410
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Presently, it's very difficult to have confidence when adding to or modifying 
processors in the DSL. There's a lot of raw types, duck-typing, and casting 
that contribute to this problem.

The root, though, is that the generic types on `Processor` refer only to 
the _input_ key and value types. No information is captured or verified about 
what the _output_ types of a processor are. For example, this leads to 
widespread confusion in the code base about whether a processor produces `V`s 
or `Change`s. The type system actually makes matters worse, since we use 
casts to make the processors conform to declared types that are in fact wrong, 
but are never checked due to erasure.

We can start to make some headway on this tech debt by adding some types to the 
ProcessorContext that bound the `` that may be passed to 
`context.forward`. Then, we can build on this by fully specifying the input and 
output types of the Processors, which in turn would let us eliminate the 
majority of unchecked casts in the DSL operators.

I'm not sure whether adding these generic types to the existing 
ProcessorContext and Processor interfaces, which would also affect the PAPI has 
any utility, or whether we should make this purely an internal change by 
introducing GenericProcessorContext and GenericProcessor peer interfaces for 
the DSL to use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7991) Add StreamsUpgradeTest for 2.2 release

2019-05-28 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-7991.
-
Resolution: Duplicate

Yep. Closing as a duplicate.

> Add StreamsUpgradeTest for 2.2 release
> --
>
> Key: KAFKA-7991
> URL: https://issues.apache.org/jira/browse/KAFKA-7991
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.3.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8452) Possible Suppress buffer optimization: de-duplicate prior value

2019-05-30 Thread John Roesler (JIRA)
John Roesler created KAFKA-8452:
---

 Summary: Possible Suppress buffer optimization: de-duplicate prior 
value
 Key: KAFKA-8452
 URL: https://issues.apache.org/jira/browse/KAFKA-8452
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


As of KAFKA-8199, the suppression buffers have to track the "prior value" in 
addition to the "old" and "new" values for each record, to support transparent 
downstream views.

In many cases, the prior value is actually the same as the old value, and we 
could avoid storing it separately. The challenge is that the old and new values 
are already serialized into a common array (as a Change via the 
FullChangeSerde), so the "prior" value would actually be a slice on the 
underlying array. But, of course, Java does not have array slices.

To get around this, we either need to switch to ByteBuffers (which support 
slices) or break apart the serialized Change into just serialized old and new 
values.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8455) Add NothingSerde to Serdes

2019-05-31 Thread John Roesler (JIRA)
John Roesler created KAFKA-8455:
---

 Summary: Add NothingSerde to Serdes
 Key: KAFKA-8455
 URL: https://issues.apache.org/jira/browse/KAFKA-8455
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Often, when reading an input topic, the key is expected to be null, but there's 
actually no way to represent this fact in Consumed, leading to confusing type 
signatures down the topology.

For example, you might use the BytesSerde, but then you have a 
KStream. When maintaining a large application, this becomes a 
hazard, since you'd need to "be really careful" not to try and dereference the 
key at any point, since you actually know it's always null.

Much better would be to actually represent the fact that the key is null, using 
the Void type. One such example of this is the NothingSerde I wrote here: 
https://github.com/confluentinc/kafka-streams-examples/blob/5.2.1-post/src/test/java/io/confluent/examples/streams/IntegrationTestUtils.java#L465

After some conversations, I've come to believe this would actually be a useful 
addition to the main Serdes collection.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8478) Poll for more records before forced processing

2019-06-04 Thread John Roesler (JIRA)
John Roesler created KAFKA-8478:
---

 Summary: Poll for more records before forced processing
 Key: KAFKA-8478
 URL: https://issues.apache.org/jira/browse/KAFKA-8478
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


While analyzing the algorithm of Streams's poll/process loop, I noticed the 
following:
The algorithm of runOnce is:

{code}
loop0:
  long poll for records (100ms)
  loop1:
loop2: for BATCH_SIZE iterations:
  process one record in each task that has data enqueued
adjust BATCH_SIZE
if loop2 processed any records, repeat loop 1
else, break loop1 and repeat loop0
{code}

There's potentially an unwanted interaction between "keep processing as long as 
any record is processed" and forcing processing after `max.task.idle.ms`.

If there are two tasks, A and B, and A runs out of records on one input before 
B, then B could keep the processing loop running, and hence prevent A from 
getting any new records, until max.task.idle.ms expires, at which point A will 
force processing on its other input partition. The intent of idling is to at 
least give A a chance of getting more records on the empty input, but under 
this situation, we'd never even check for more records before forcing 
processing.

I'm thinking we should only enforce processing if there was a completed poll 
since we noticed the task was missing inputs (otherwise, we may as well not 
bother idling at all).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8452) Possible Suppress buffer optimization: de-duplicate prior value

2019-06-20 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-8452.
-
Resolution: Fixed

> Possible Suppress buffer optimization: de-duplicate prior value
> ---
>
> Key: KAFKA-8452
> URL: https://issues.apache.org/jira/browse/KAFKA-8452
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> As of KAFKA-8199, the suppression buffers have to track the "prior value" in 
> addition to the "old" and "new" values for each record, to support 
> transparent downstream views.
> In many cases, the prior value is actually the same as the old value, and we 
> could avoid storing it separately. The challenge is that the old and new 
> values are already serialized into a common array (as a Change via the 
> FullChangeSerde), so the "prior" value would actually be a slice on the 
> underlying array. But, of course, Java does not have array slices.
> To get around this, we either need to switch to ByteBuffers (which support 
> slices) or break apart the serialized Change into just serialized old and new 
> values.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8582) Consider adding an ExpiredWindowRecordHandler to Suppress

2019-06-21 Thread John Roesler (JIRA)
John Roesler created KAFKA-8582:
---

 Summary: Consider adding an ExpiredWindowRecordHandler to Suppress
 Key: KAFKA-8582
 URL: https://issues.apache.org/jira/browse/KAFKA-8582
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


I got some feedback on Suppress:
{quote}Specifying how to handle events outside the grace period does seem like 
a business concern, and simply discarding them thus seems risky (for example 
imagine any situation where money is involved).

This sort of situation is addressed by the late-triggering approach associated 
with watermarks 
(https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), given 
this I wondered if you were considering adding anything similar?{quote}

It seems like, if a record has arrived past the grace period for its window, 
then the state of the windowed aggregation would already have been lost, so if 
we were to compute an aggregation result, it would be incorrect. Plus, since 
the window is already expired, we can't store the new (incorrect, but more 
importantly expired) aggregation result either, so any subsequent super-late 
records would also face the same blank-slate. I think this would wind up 
looking like this: if you have three timely records for a window, and then 
three more that arrive after the grace period, and you were doing a count 
aggregation, you'd see the counts emitted for the window as [1, 2, 3, 1, 1, 1]. 
I guess we could add a flag to the post-expiration results to indicate that 
they're broken, but this seems like the wrong approach. The post-expiration 
aggregation _results_ are meaningless, but I could see wanting to send the 
past-expiration _input records_ to a dead-letter queue or something instead of 
dropping them.

Along this line of thinking, I wonder if we should add an optional 
past-expiration record handler interface to the suppression operator. Then, you 
could define your own logic, whether it's a dead-letter queue, sending it to 
some alerting pipeline, or even just crashing the application before it can do 
something wrong. This would be a similar pattern to how we allow custom logic 
to handle deserialization errors by supplying a 
org.apache.kafka.streams.errors.DeserializationExceptionHandler.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8696) Clean up Sum/Count/Total metrics

2019-07-22 Thread John Roesler (JIRA)
John Roesler created KAFKA-8696:
---

 Summary: Clean up Sum/Count/Total metrics
 Key: KAFKA-8696
 URL: https://issues.apache.org/jira/browse/KAFKA-8696
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler
Assignee: John Roesler


Kafka has a family of metrics consisting of:

org.apache.kafka.common.metrics.stats.Count
org.apache.kafka.common.metrics.stats.Sum
org.apache.kafka.common.metrics.stats.Total
org.apache.kafka.common.metrics.stats.Rate.SampledTotal
org.apache.kafka.streams.processor.internals.metrics.CumulativeCount
These metrics are all related to each other, but their relationship is obscure 
(and one is redundant) (and another is internal).

I've recently been involved in a third  recapitulation of trying to work out 
which metric does what. It seems like it's time to clean up the mess and save 
everyone from having to work out the mystery for themselves.

I've proposed https://cwiki.apache.org/confluence/x/kkAyBw to fix it.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8769) Consider computing stream time independently per key

2019-08-08 Thread John Roesler (JIRA)
John Roesler created KAFKA-8769:
---

 Summary: Consider computing stream time independently per key
 Key: KAFKA-8769
 URL: https://issues.apache.org/jira/browse/KAFKA-8769
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


Currently, Streams uses a concept of "stream time", which is computed as the 
highest timestamp observed by stateful operators, per partition. This concept 
of time backs grace period, retention time, and suppression.

For use cases in which data is produced to topics in roughly chronological 
order (as in db change capture), this reckoning is fine.

Some use cases have a different pattern, though. For example, in IOT 
applications, it's common for sensors to save up quite a bit of data and then 
dump it all at once into the topic. See 
https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
 for a concrete example of the use case.

I have heard of cases where each sensor dumps 24 hours' worth of data at a time 
into the topic. This results in a pattern in which, when reading a single 
partition, the operators observe a lot of consecutive records for one key that 
increase in timestamp for 24 hours, then a bunch of consecutive records for 
another key that are also increasing in timestamp over the same 24 hour period. 
With our current stream-time definition, this means that the partition's stream 
time increases while reading the first key's data, but then stays paused while 
reading the second key's data, since the second batch of records all have 
timestamps in the "past".

E.g:
{noformat}
A@t0 (stream time: 0)
A@t1 (stream time: 1)
A@t2 (stream time: 2)
A@t3 (stream time: 3)
B@t0 (stream time: 3)
B@t1 (stream time: 3)
B@t2 (stream time: 3)
B@t3 (stream time: 3)
{noformat}

This pattern results in an unfortunate compromise in which folks are required 
to set the grace period to the max expected time skew, for example 24 hours, or 
Streams will just drop the second key's data (since it is late). But, this 
means that if they want to use Suppression for "final results", they have to 
wait 24 hours for the result.

This tradeoff is not strictly necessary, though, because each key represents a 
logically independent sequence of events. Tracking by partition is simply 
convenient, but typically not logically meaningful. That is, the partitions are 
just physically independent sequences of events, so it's convenient to track 
stream time at this granularity. It would be just as correct, and more useful 
for IOT-like use cases, to track time independently for each key.

However, before considering this change, we need to solve the 
testing/low-traffic problem. This is the opposite issue, where a partition 
doesn't get enough traffic to advance stream time and results remain "stuck" in 
the suppression buffers. We can provide some mechanism to force the advancement 
of time across all partitions, for use in testing when you want to flush out 
all results, or in production when some topic is low volume. We shouldn't 
consider tracking time _more_ granularly until this problem is solved, since it 
would just make the low-traffic problem worse.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8770) Either switch to or add an option for emit-on-change

2019-08-08 Thread John Roesler (JIRA)
John Roesler created KAFKA-8770:
---

 Summary: Either switch to or add an option for emit-on-change
 Key: KAFKA-8770
 URL: https://issues.apache.org/jira/browse/KAFKA-8770
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Currently, Streams offers two emission models:
* emit-on-window-close: (using Suppression)
* emit-on-update: (i.e., emit a new result whenever a new record is processed, 
regardless of whether the result has changed)

There is also an option to drop some intermediate results, either using caching 
or suppression.

However, there is no support for emit-on-change, in which results would be 
forwarded only if the result has changed. This has been reported to be 
extremely valuable as a performance optimizations for some high-traffic 
applications, and it reduces the computational burden both internally for 
downstream Streams operations, as well as for external systems that consume the 
results, and currently have to deal with a lot of "no-op" changes.

It would be pretty straightforward to implement this, by loading the prior 
results before a stateful operation and comparing with the new result before 
persisting or forwarding. In many cases, we load the prior result anyway, so it 
may not be a significant performance impact either.

One design challenge is what to do with timestamps. If we get one record at 
time 1 that produces a result, and then another at time 2 that produces a 
no-op, what should be the timestamp of the result, 1 or 2? emit-on-change would 
require us to say 1.

Clearly, we'd need to do some serious benchmarks to evaluate any potential 
implementation of emit-on-change.

Another design challenge is to decide if we should just automatically provide 
emit-on-change for stateful operators, or if it should be configurable. 
Configuration increases complexity, so unless the performance impact is high, 
we may just want to change the emission model without a configuration.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8868) Consider auto-generating Streams binary protocol messages

2019-09-04 Thread John Roesler (Jira)
John Roesler created KAFKA-8868:
---

 Summary: Consider auto-generating Streams binary protocol messages
 Key: KAFKA-8868
 URL: https://issues.apache.org/jira/browse/KAFKA-8868
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler
Assignee: John Roesler


Rather than maintain hand coded protocol serialization code, Streams could use 
the same code-generation framework as Clients/Core.

There isn't a perfect match, since the code generation framework includes an 
assumption that you're generating "protocol messages", rather than just 
arbitrary blobs, but I think it's close enough to justify using it, and 
improving it over time.

Using the code generation allows us to drop a lot of detail-oriented, brittle, 
and hard-to-maintain serialization logic in favor of a schema spec.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Resolved] (KAFKA-9006) Flaky Test KTableKTableForeignKeyJoinIntegrationTest.doLeftJoinFilterOutRapidlyChangingForeignKeyValues

2019-10-14 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-9006.
-
Resolution: Duplicate

> Flaky Test 
> KTableKTableForeignKeyJoinIntegrationTest.doLeftJoinFilterOutRapidlyChangingForeignKeyValues
> ---
>
> Key: KAFKA-9006
> URL: https://issues.apache.org/jira/browse/KAFKA-9006
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Bill Bejeck
>Assignee: John Roesler
>Priority: Major
>  Labels: flaky-test
>
> h3.  
> {noformat}
> Error Message
> array lengths differed, expected.length=2 actual.length=1; arrays first 
> differed at element [0]; expected: but 
> was:
> Stacktrace
> array lengths differed, expected.length=2 actual.length=1; arrays first 
> differed at element [0]; expected: but 
> was: at 
> org.junit.internal.ComparisonCriteria.arrayEquals(ComparisonCriteria.java:78) 
> at 
> org.junit.internal.ComparisonCriteria.arrayEquals(ComparisonCriteria.java:28) 
> at org.junit.Assert.internalArrayEquals(Assert.java:534) at 
> org.junit.Assert.assertArrayEquals(Assert.java:285) at 
> org.junit.Assert.assertArrayEquals(Assert.java:300) at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinIntegrationTest.doLeftJoinFilterOutRapidlyChangingForeignKeyValues(KTableKTableForeignKeyJoinIntegrationTest.java:585)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20) at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:412) at 
> org.junit.runners.Suite.runChild(Suite.java:128) at 
> org.junit.runners.Suite.runChild(Suite.java:27) at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:412) at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>  at jdk.internal.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Delega

[jira] [Resolved] (KAFKA-9022) Flaky Test KTableKTableForeignKeyJoinIntegrationTest. doInnerJoinFilterOutRapidlyChangingForeignKeyValues

2019-10-15 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-9022.
-
Resolution: Duplicate

> Flaky Test KTableKTableForeignKeyJoinIntegrationTest. 
> doInnerJoinFilterOutRapidlyChangingForeignKeyValues
> -
>
> Key: KAFKA-9022
> URL: https://issues.apache.org/jira/browse/KAFKA-9022
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Sophie Blee-Goldman
>Assignee: John Roesler
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.4.0
>
>
> array lengths differed, expected.length=2 actual.length=1; arrays first 
> differed at element [0]; expected: but 
> was:
>  
> Failed locally for me



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9058) Foreign Key Join should not require a queriable store

2019-10-16 Thread John Roesler (Jira)
John Roesler created KAFKA-9058:
---

 Summary: Foreign Key Join should not require a queriable store
 Key: KAFKA-9058
 URL: https://issues.apache.org/jira/browse/KAFKA-9058
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.4.0
Reporter: John Roesler
Assignee: John Roesler


While resolving KAFKA-9000, I uncovered a significant flaw in the 
implementation of foreign key joins. The join only works if supplied with a 
queriable store. I think this was simply an oversight during implementation and 
review.

It would be better to fix this now before the release, since the restriction it 
places on users could represent a significant burden. If they don't otherwise 
need the store to be queriable, then they shouldn't be forced to allocate 
enough storage for a full copy of the join result, or add the changelog topic 
for it either.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8902) Benchmark cooperative vs eager rebalancing

2019-10-17 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-8902.
-
Resolution: Fixed

I wrote a simple Streams application using the Processor API to update 10 
stores with every record seen.

{code}
final int numStores = 10;
final Topology topology = new Topology();

topology.addSource("source", new StringDeserializer(), new 
StringDeserializer(), "table-in");

topology.addProcessor(
"processor",
(ProcessorSupplier) () -> new Processor() {
private final List> stores = new 
ArrayList<>(numStores);

@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
for (int i = 0; i < numStores; i++) {
stores.add(
i,
(KeyValueStore) 
context.getStateStore("store" + i)
);
}
}

@Override
public void process(final String key, final String value) {
for (final KeyValueStore store : stores) {
store.put(key, value);
}
}

@Override
public void close() {
stores.clear();
}
},
"source"
);
{code}

I tested this topology using both in-memory and on-disk stores, with caching 
and logging enabled.

My benchmark consisted of running one KafkaStreams instance and measuring its 
metrics, while simulating other nodes joining and leaving the cluster (by 
constructing the simulated nodes to participate in the consumer group protocol 
without actually doing any work). I tested three cluster rebalance scenarios:
* scale up: 100 partitions / 10 nodes = 10 tasks per node starting, run 4 
minutes, add one node (each node loses one task), run 2 minutes, add one node 
(each node loses another task), run two minutes, add two nodes (each node loses 
one task), end the test at the 10 minute mark
* rolling bounce: 100 partitions / 10 nodes = 10 tasks per node starting, run 4 
minutes, bounce each node in the cluster (waiting for it to join and all nodes 
to return to RUNNING before proceeding), end the test at the 10 minute mark
* full bounce: 100 partitions / 10 nodes = 10 tasks per node starting, run 4 
minutes, bounce each node in the cluster (without waiting, so they all leave 
and join at once), end the test at the 10 minute mark

For input data, I randomly generated a dataset of 10,000 keys, and another with 
100,000 keys, all with 1kB values. This data was pre-loaded into the broker, 
with compaction and retention disabled (so that every test iteration would get 
the same sequence of updates)

I ran all the benchmarks on AWS i3.large instances, with a dedicated broker 
node running on a separate i3.large instance.

For each test configuration and scenario, I ran 20 independent trials and 
discarded the high and low results (to exclude outliers), for 18 total data 
points. The key metric was the overall throughput of a single node during the 
test.

I compared the above results from:
* 2.3.1-SNAPSHOT (the current head of the 2.3 branch) - Eager protocol
* 2.4.0-SNAPSHOT (the current head of the 2.4 branch) - Cooperative protocol
* a modified 2.4.0-SNAPSHOT with cooperative rebalancing disabled - Eager 
protocol

What I found is that under all scenarios, all three versions performed the same 
(within a 99.9% significance threshold) under the same data sets and the same 
configurations.

I didn't see any marked improvement as a result of cooperative rebalancing 
alone, but this is only the foundation for several follow-on improvements. What 
is very good to know is that I also didn't find any regression as a result of 
the new protocol implementation.

> Benchmark cooperative vs eager rebalancing
> --
>
> Key: KAFKA-8902
> URL: https://issues.apache.org/jira/browse/KAFKA-8902
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.4.0
>
>
> Cause rebalance and measure:
> * overall throughput
> * paused time
> * (also look at the metrics from 
> (https://issues.apache.org/jira/browse/KAFKA-8609)):
> ** accumulated rebalance time
> Cluster/topic sizing:
> ** 10 instances
> ** 100 tasks (each instance gets 10 tasks)
> ** 1000 stores (each task gets 10 stores)
> * standbys = [0 and 1]
> Rolling bounce:
> * with and without state loss
> * shorter and faster than session timeout (shorter in particular should be 
> interesting)
> Expand (from 9 to 10)
> Contract (fro

[jira] [Created] (KAFKA-9103) KIP-441: Add TaskLags to SubscriptionInfo

2019-10-24 Thread John Roesler (Jira)
John Roesler created KAFKA-9103:
---

 Summary: KIP-441: Add TaskLags to SubscriptionInfo
 Key: KAFKA-9103
 URL: https://issues.apache.org/jira/browse/KAFKA-9103
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler
Assignee: John Roesler


As described in KIP-441, we will add the TaskLags field to, and remove the 
ActiveTasks and StandbyTasks fields from, the SubscriptionInfo object.

This change can be made independent of the new balancing algorithm, since we 
can transparently convert back and forth between active/standby tasks and the 
tasklags formats, using a lag of 0 to denote active tasks and a lag > 0 to 
denote standbys.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9138) Add system test covering Foreign Key joins (KIP-213)

2019-11-04 Thread John Roesler (Jira)
John Roesler created KAFKA-9138:
---

 Summary: Add system test covering Foreign Key joins (KIP-213)
 Key: KAFKA-9138
 URL: https://issues.apache.org/jira/browse/KAFKA-9138
 Project: Kafka
  Issue Type: Test
  Components: streams
Reporter: John Roesler
Assignee: John Roesler


There are unit and integration tests, but we should really have a system test 
as well.

I plan to create a new test, since this feature is pretty different than the 
existing topology/data set of smoke test. Although, it might be possible for 
the new test to subsume smoke test. I'd give the new test a few releases to 
burn in before considering a merge, though.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9169) Standby Tasks point ask for incorrect offsets on resuming post suspension

2019-11-13 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-9169.
-
Resolution: Fixed

This was fixed in https://github.com/apache/kafka/pull/7681/ and merged to 
trunk (currently 2.5.0-SNAPSHOT)

> Standby Tasks point ask for incorrect offsets on resuming post suspension
> -
>
> Key: KAFKA-9169
> URL: https://issues.apache.org/jira/browse/KAFKA-9169
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2
>Reporter: Navinder Brar
>Assignee: John Roesler
>Priority: Critical
> Fix For: 2.5.0
>
>
> In versions(check 2.0) where standby tasks are suspended on each rebalance 
> the checkpoint file is updated post the flush and the expected behaviour is 
> that post assignment the same standby task gets assigned back on the machine 
> it will start reading data from changelog from the same offset from it left 
> off. 
>  
> But there looks like a bug in the code, every time post rebalance it starts 
> reading from the offset from where it read the first time the task was 
> assigned on this machine. This has 2 repercussions:
>  # After every rebalance the standby tasks start restoring huge amount of 
> data which they have already restored earlier(Verified this via 300x increase 
> Network IO on all streams instances post rebalance even when no change in 
> assignment) .
>  # If changelog has time retention those offsets will not be available in the 
> changelog, which leads to offsetOutOfRange exceptions and the stores get 
> deleted and recreated again.
>  
> I have gone through the code and I think I know the issue.
> In TaskManager# updateNewAndRestoringTasks(), the function 
> assignStandbyPartitions() gets called for all the running standby tasks where 
> it populates the Map: checkpointedOffsets from the 
> standbyTask.checkpointedOffsets() which is only updated at the time of 
> initialization of a StandbyTask(i.e. in it's constructor). 
>  
> This has an easy fix.
> Post resumption we are reading standbyTask.checkpointedOffsets() to know the 
> offset from where the standby task should start running and not from 
> stateMgr.checkpointed() which gets updated on every commit to the checkpoint 
> file. In the former case it's always reading from the same offset, even those 
> which it had already read earlier and in cases where changelog topic has a 
> retention time, it gives offsetOutOfRange exception. So, 
> standbyTask.checkpointedOffsets() is quite useless and we should use 
> stateMgr.checkpointed() instead to return offsets to task manager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10086) Standby state isn't always re-used when transitioning to active

2020-06-11 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10086.
--
Resolution: Fixed

> Standby state isn't always re-used when transitioning to active
> ---
>
> Key: KAFKA-10086
> URL: https://issues.apache.org/jira/browse/KAFKA-10086
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 2.6.0, 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.6.0, 2.7.0
>
>
> This ticket was initially just to write an integration test, but I escalated 
> it to a blocker and changed the title when the integration test actually 
> surfaced two bugs:
>  # Offset positions were not reported for in-memory stores, so tasks with 
> in-memory stores would never be considered as "caught up" and could not take 
> over active processing, preventing clusters from ever achieving balance. This 
> is a regression in 2.6
>  # When the TaskAssignor decided to switch active processing from a former 
> owner to a new one that had a standby, the lower-level cooperative rebalance 
> protocol would first de-schedule the task completely, and only later would 
> assign it to the new owner. For in-memory stores, this causes the standby 
> state not to be re-used, and for persistent stores, it creates a window in 
> which the cleanup thread might delete the state directory. In both cases, 
> even though the instance previously had a standby, once it gets the active, 
> it still had to restore the entire state from the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10165) Percentiles metric leaking memory

2020-06-17 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10165.
--
Resolution: Fixed

> Percentiles metric leaking memory
> -
>
> Key: KAFKA-10165
> URL: https://issues.apache.org/jira/browse/KAFKA-10165
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Reporter: Sophie Blee-Goldman
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.6.0
>
>
> We've hit several OOM in our soak cluster lately. We were finally able to get 
> a heap dump right after the OOM, and found over 3.5 GB of memory being 
> retained by the percentiles (or specifically by the 1MB float[] used by the 
> percentiles). 
> The leak does seem specific to the Percentiles class, as we see ~3000 
> instances of the Percentiles object vs only ~500 instances of the Max object, 
> which is also used in the same sensor as the Percentiles
> We did recently lower the size from 1MB to 100kB, but it's clear there is a 
> leak of some kind and a "smaller leak" is not an acceptable solution. If the 
> cause fo the leak is not immediately obvious we should just revert the 
> percentiles in 2.6 and work on stabilizing them for 2.7



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10185) Streams should log summarized restoration information at info level

2020-06-18 Thread John Roesler (Jira)
John Roesler created KAFKA-10185:


 Summary: Streams should log summarized restoration information at 
info level
 Key: KAFKA-10185
 URL: https://issues.apache.org/jira/browse/KAFKA-10185
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10200) MockProcessorContext doesn't work with WindowStores

2020-06-25 Thread John Roesler (Jira)
John Roesler created KAFKA-10200:


 Summary: MockProcessorContext doesn't work with WindowStores
 Key: KAFKA-10200
 URL: https://issues.apache.org/jira/browse/KAFKA-10200
 Project: Kafka
  Issue Type: Bug
  Components: streams, streams-test-utils
Reporter: John Roesler
Assignee: John Roesler


The recommended pattern for testing custom Processor implementations is to use 
the test-utils MockProcessorContext. If a Processor implementation needs a 
store, the store also has to be initialized with the same context. However, the 
existing (in-memory and persistent) Windowed store implementations perform 
internal casts that result in class cast exceptions if you attempt to 
initialize them with the MockProcessorContext.

A workaround is to instead embed the processor in an application and use the 
TopologyTestDriver instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10202) SmokeTest should provide a machanism to generate continuously AND verify the result

2020-06-25 Thread John Roesler (Jira)
John Roesler created KAFKA-10202:


 Summary: SmokeTest should provide a machanism to generate 
continuously AND verify the result
 Key: KAFKA-10202
 URL: https://issues.apache.org/jira/browse/KAFKA-10202
 Project: Kafka
  Issue Type: Improvement
  Components: streams, system tests
Reporter: John Roesler


Several system tests use the SmokeTestDriver, but they have to choose between 
generating for a fixed period of time (say, two minutes) before verifying 
results OR generating until cancelled with no verification.

It's not impossible to implement "generate until cancelled and then verify", 
and doing so would both speed up tests that are just blocked by the two-minute 
generation period AND decrease flakiness in other tests, when they can't 
complete all their operations within the deadline before verification begins.

One ides is to revamp the smoke test driver's verification logic to make it 
consume the input and determine the expected results, so that the verifier can 
be run as a separate process instead of passing the expected results straight 
from the generator to the verifier. This is how the RelationalSmokeTest works.

An alternative is to register a signal handler so that we can signal the driver 
to stop generating and start verifying.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10203) Rolling upgrade from 2.1.1 to trunk (2.7.x) doesn't work

2020-06-25 Thread John Roesler (Jira)
John Roesler created KAFKA-10203:


 Summary: Rolling upgrade from 2.1.1 to trunk (2.7.x) doesn't work
 Key: KAFKA-10203
 URL: https://issues.apache.org/jira/browse/KAFKA-10203
 Project: Kafka
  Issue Type: Bug
Reporter: John Roesler


As part of KAFKA-10173, I converted the upgrade test to use the SmokeTest 
application and also added new upgrade paths, including from 2.1.1 to trunk.

It is a rolling bounce scenario.

After the first instance upgrades from 2.1.1 to trunk, I observe the following 
on _both_ instances that are still on 2.1.1:
{code:java}
org.apache.kafka.streams.errors.TaskAssignmentException: stream-thread 
[SmokeTest-ed7632fc-3465-4534-8a26-2f6ad76ff80f-StreamThread-2-consumer] Number 
of assigned partitions 11 is not equal to the number of active taskIds 7, 
assignmentInfo=[version=4, supported version=4, active tasks=[0_0, 0_2, 2_0, 
1_1, 1_3, 2_2, 3_3], standby tasks={0_0=[data-0], 1_0=[max-0, min-0], 
0_1=[data-1], 0_2=[data-2], 2_0=[sum-0, cnt-0], 1_1=[max-1, min-1], 1_2=[min-2, 
max-2], 0_3=[data-3], 2_1=[cnt-1, sum-1], 
3_0=[SmokeTest-cntByCnt-repartition-0], 0_4=[data-4], 
3_1=[SmokeTest-cntByCnt-repartition-1], 2_2=[cnt-2, sum-2], 1_3=[min-3, max-3], 
3_2=[SmokeTest-cntByCnt-repartition-2], 1_4=[max-4, min-4], 2_3=[sum-3, cnt-3], 
2_4=[sum-4, cnt-4], 3_3=[SmokeTest-cntByCnt-repartition-3], 
3_4=[SmokeTest-cntByCnt-repartition-4]}, global assignment={}]
at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.processVersionOneAssignment(StreamsPartitionAssignor.java:892)
at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.processVersionTwoAssignment(StreamsPartitionAssignor.java:908)
at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.processVersionThreeAssignment(StreamsPartitionAssignor.java:925)
at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.processVersionFourAssignment(StreamsPartitionAssignor.java:932)
at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:872)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:281)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:406)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:341)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:913)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:818)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
{code}
the other 2.1.1 instance reports the same exception, with "Number of assigned 
partitions 9 is not equal to the number of active taskIds 6"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10185) Streams should log summarized restoration information at info level

2020-06-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10185.
--
Resolution: Fixed

> Streams should log summarized restoration information at info level
> ---
>
> Key: KAFKA-10185
> URL: https://issues.apache.org/jira/browse/KAFKA-10185
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> Currently, restoration progress is only visible at debug level in the 
> Consumer's Fetcher logs. Users can register a restoration listener and 
> implement their own logging, but it would substantially improve operability 
> to have some logs available at INFO level.
> Logging each partition in each restore batch at info level would be too much, 
> though, so we should print summarized logs at a decreased interval, like 
> every 10 seconds.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   >