[
https://issues.apache.org/jira/browse/KAFKA-9512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17031156#comment-17031156
]
Vinoth Chandar commented on KAFKA-9512:
---------------------------------------
{code:java}
restartedStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
@Override
public void onRestoreStart(final TopicPartition topicPartition, final
String storeName, final long startingOffset, final long endingOffset) {
restoreStartLagInfo.putAll(restartedStreams.allLocalStorePartitionLags());
}
@Override
public void onBatchRestored(final TopicPartition topicPartition, final
String storeName, final long batchEndOffset, final long numRestored) {
}
@Override
public void onRestoreEnd(final TopicPartition topicPartition, final String
storeName, final long totalRestored) {
restoreEndLagInfo.putAll(restartedStreams.allLocalStorePartitionLags());
}
});
restartedStreams.start();
TestUtils.waitForCondition(() ->
restartedStreams.allLocalStorePartitionLags().get(stateStoreName).get(0).offsetLag()
== 0,
"Standby should eventually catchup and have zero lag.");
final LagInfo fullLagInfo = restoreStartLagInfo.get(stateStoreName).get(0);
assertThat(fullLagInfo.currentOffsetPosition(), equalTo(0L));
assertThat(fullLagInfo.endOffsetPosition(), equalTo(5L));
assertThat(fullLagInfo.offsetLag(), equalTo(5L));
assertThat(restoreEndLagInfo.get(stateStoreName).get(0), equalTo(zeroLagInfo));
<-- NPE line {code}
NPE can happen only if an empty lag map is added to `onRestoreEnd` i.e when
restoration finishes there is no entry for changelog topic? This is not
possible esp for standby, since there should be a standby task . The test
clearly waits till we reach zero lag, using the same stateStoreName.. and that
seems to be working..
So I wonder if there is some race between restoration ending and the standy
task creation? In any case, the problematic line seems redundant anyway, since
it just checks for the same thing as the waitForCondition()
> Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration
> -------------------------------------------------------------------
>
> Key: KAFKA-9512
> URL: https://issues.apache.org/jira/browse/KAFKA-9512
> Project: Kafka
> Issue Type: Bug
> Components: streams, unit tests
> Affects Versions: 2.5.0
> Reporter: Matthias J. Sax
> Assignee: Vinoth Chandar
> Priority: Critical
> Labels: flaky-test
> Fix For: 2.5.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/497/testReport/junit/org.apache.kafka.streams.integration/LagFetchIntegrationTest/shouldFetchLagsDuringRestoration/]
> {quote}java.lang.NullPointerException at
> org.apache.kafka.streams.integration.LagFetchIntegrationTest.shouldFetchLagsDuringRestoration(LagFetchIntegrationTest.java:306){quote}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)