[
https://issues.apache.org/jira/browse/KAFKA-5140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212812#comment-16212812
]
Guozhang Wang commented on KAFKA-5140:
--------------------------------------
Discovered the root cause of this flaky test is this:
https://issues.apache.org/jira/browse/KAFKA-6098
More specifically, here is the (augmented) log trails to expose this issue:
1. After the reset tool is called, and
{{assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC)}} has passed (but the
ZK path has not been deleted due to the above issue). The streams app will
resume executing and in the first rebalance the StreamPartitionAssignor will
try to create the deleted topic again:
{code}
org.apache.kafka.streams.integration.ResetIntegrationTest >
testReprocessingFromScratchAfterResetWithIntermediateUserTopic STANDARD_OUT
[2017-10-19 18:28:12,027] INFO [GroupCoordinator 0]: Preparing to rebalance
group cleanup-integration-test1 with old generation 4 (__consumer_offsets-1)
(kafka.coordinator.group.GroupCoordinator:72)
[2017-10-19 18:28:12,028] INFO [GroupCoordinator 0]: Stabilized group
cleanup-integration-test1 generation 5 (__consumer_offsets-1)
(kafka.coordinator.group.GroupCoordinator:72)
[2017-10-19 18:28:12,029] WARN stream-thread
[cleanup-integration-test1-e86cdf4e-781a-408a-8414-1115d9558914-StreamThread-10]
Constructed client metadata
{e86cdf4e-781a-408a-8414-1115d9558914=ClientMetadata{hostInfo=null,
consumers=[cleanup-integration-test1-e86cdf4e-781a-408a-8414-1115d9558914-StreamThread-10-consumer-bd575ab5-c159-4c8a-9130-1ac896c23595],
state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([])
prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}} from the member
subscriptions.
(org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:241)
[2017-10-19 18:28:12,029] WARN stream-thread
[cleanup-integration-test1-e86cdf4e-781a-408a-8414-1115d9558914-StreamThread-10]
Starting to validate internal topics in partition assignor.
(org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:236)
{code}
2. And then following entry can be found at the broker side:
{code}
[2017-10-19 18:28:12,280] INFO [Admin Manager on Broker 0]: Error
processing create topic request for topic
cleanup-integration-test1-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition
with arguments (numPartitions=1, replicationFactor=1, replicasAssignments={},
configs={cleanup.policy=delete}) (kafka.server.AdminManager:80)
org.apache.kafka.common.errors.TopicExistsException: Topic
'cleanup-integration-test1-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition'
already exists.
{code}
3. {{StreamsKafkaClient}} takes the error code as OK, and then moves on to the
validation phase, which will be blocking forever.
{code}
// wait until each one of the topic metadata has been propagated to
at least one broker
while (!allTopicsCreated(topicNamesToMakeReady, topicsToMakeReady))
{
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
// ignore
}
}
{code}
4. And after 2 seconds (which is the session timeout value), the stream
consumer will be kicked out of the group as it is blocked on the above phase.
{code}
org.apache.kafka.streams.integration.ResetIntegrationTest >
testReprocessingFromScratchAfterResetWithIntermediateUserTopic STANDARD_OUT
[2017-10-19 18:28:14,030] INFO [GroupCoordinator 0]: Member
cleanup-integration-test1-e86cdf4e-781a-408a-8414-1115d9558914-StreamThread-10-consumer-bd575ab5-c159-4c8a-9130-1ac896c23595
in group cleanup-integration-test1 has failed, removing it from the group
(kafka.coordinator.group.GroupCoordinator:72)
[2017-10-19 18:28:14,032] INFO [GroupCoordinator 0]: Preparing to rebalance
group cleanup-integration-test1 with old generation 5 (__consumer_offsets-1)
(kafka.coordinator.group.GroupCoordinator:72)
[2017-10-19 18:28:14,032] INFO [GroupCoordinator 0]: Group
cleanup-integration-test1 with generation 6 is now empty (__consumer_offsets-1)
(kafka.coordinator.group.GroupCoordinator:72)
{code}
And after 30 seconds the test will fail.
> Flaky ResetIntegrationTest
> --------------------------
>
> Key: KAFKA-5140
> URL: https://issues.apache.org/jira/browse/KAFKA-5140
> Project: Kafka
> Issue Type: Bug
> Components: streams, unit tests
> Affects Versions: 0.10.2.0
> Reporter: Matthias J. Sax
> Assignee: Guozhang Wang
> Fix For: 0.11.0.0
>
>
> {noformat}
> org.apache.kafka.streams.integration.ResetIntegrationTest >
> testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED
> java.lang.AssertionError:
> Expected: <[KeyValue(2986681642095, 1), KeyValue(2986681642055, 1),
> KeyValue(2986681642075, 1), KeyValue(2986681642035, 1),
> KeyValue(2986681642095, 1), KeyValue(2986681642055, 1),
> KeyValue(2986681642115, 1), KeyValue(2986681642075, 1),
> KeyValue(2986681642075, 2), KeyValue(2986681642095, 2),
> KeyValue(2986681642115, 1), KeyValue(2986681642135, 1),
> KeyValue(2986681642095, 2), KeyValue(2986681642115, 2),
> KeyValue(2986681642155, 1), KeyValue(2986681642135, 1),
> KeyValue(2986681642115, 2), KeyValue(2986681642135, 2),
> KeyValue(2986681642155, 1), KeyValue(2986681642175, 1),
> KeyValue(2986681642135, 2), KeyValue(2986681642155, 2),
> KeyValue(2986681642175, 1), KeyValue(2986681642195, 1),
> KeyValue(2986681642135, 3), KeyValue(2986681642155, 2),
> KeyValue(2986681642175, 2), KeyValue(2986681642195, 1),
> KeyValue(2986681642155, 3), KeyValue(2986681642175, 2),
> KeyValue(2986681642195, 2), KeyValue(2986681642155, 3),
> KeyValue(2986681642175, 3), KeyValue(2986681642195, 2),
> KeyValue(2986681642155, 4), KeyValue(2986681642175, 3),
> KeyValue(2986681642195, 3)]>
> but: was <[KeyValue(2986681642095, 1), KeyValue(2986681642055, 1),
> KeyValue(2986681642075, 1), KeyValue(2986681642035, 1),
> KeyValue(2986681642095, 1), KeyValue(2986681642055, 1),
> KeyValue(2986681642115, 1), KeyValue(2986681642075, 1),
> KeyValue(2986681642075, 2), KeyValue(2986681642095, 2),
> KeyValue(2986681642115, 1), KeyValue(2986681642135, 1),
> KeyValue(2986681642095, 2), KeyValue(2986681642115, 2),
> KeyValue(2986681642155, 1), KeyValue(2986681642135, 1),
> KeyValue(2986681642115, 2), KeyValue(2986681642135, 2),
> KeyValue(2986681642155, 1), KeyValue(2986681642175, 1),
> KeyValue(2986681642135, 2), KeyValue(2986681642155, 2),
> KeyValue(2986681642175, 1), KeyValue(2986681642195, 1),
> KeyValue(2986681642135, 3), KeyValue(2986681642155, 2),
> KeyValue(2986681642175, 2), KeyValue(2986681642195, 1),
> KeyValue(2986681642155, 3), KeyValue(2986681642175, 2),
> KeyValue(2986681642195, 2), KeyValue(2986681642155, 3)]>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
> at
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:190)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)