[ 
https://issues.apache.org/jira/browse/KAFKA-9512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17031156#comment-17031156
 ] 

Vinoth Chandar commented on KAFKA-9512:
---------------------------------------

{code:java}
restartedStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
    @Override
    public void onRestoreStart(final TopicPartition topicPartition, final 
String storeName, final long startingOffset, final long endingOffset) {
        
restoreStartLagInfo.putAll(restartedStreams.allLocalStorePartitionLags());
    }

    @Override
    public void onBatchRestored(final TopicPartition topicPartition, final 
String storeName, final long batchEndOffset, final long numRestored) {
    }

    @Override
    public void onRestoreEnd(final TopicPartition topicPartition, final String 
storeName, final long totalRestored) {
        restoreEndLagInfo.putAll(restartedStreams.allLocalStorePartitionLags());
    }
});

restartedStreams.start();
TestUtils.waitForCondition(() -> 
restartedStreams.allLocalStorePartitionLags().get(stateStoreName).get(0).offsetLag()
 == 0,
    "Standby should eventually catchup and have zero lag.");
final LagInfo fullLagInfo = restoreStartLagInfo.get(stateStoreName).get(0);
assertThat(fullLagInfo.currentOffsetPosition(), equalTo(0L));
assertThat(fullLagInfo.endOffsetPosition(), equalTo(5L));
assertThat(fullLagInfo.offsetLag(), equalTo(5L));

assertThat(restoreEndLagInfo.get(stateStoreName).get(0), equalTo(zeroLagInfo)); 
<-- NPE line {code}
 

NPE can happen only if an empty lag map is added to `onRestoreEnd` i.e when 
restoration finishes there is no entry for changelog topic? This is not 
possible esp for standby, since there should be a standby task . The test 
clearly waits till we reach zero lag, using the same stateStoreName.. and that 
seems to be working.. 

 

So I wonder if there is some race between restoration ending and the standy 
task creation? In any case, the problematic line seems redundant anyway, since 
it just checks for the same thing as the waitForCondition() 

 

 

 

> Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration
> -------------------------------------------------------------------
>
>                 Key: KAFKA-9512
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9512
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams, unit tests
>    Affects Versions: 2.5.0
>            Reporter: Matthias J. Sax
>            Assignee: Vinoth Chandar
>            Priority: Critical
>              Labels: flaky-test
>             Fix For: 2.5.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/497/testReport/junit/org.apache.kafka.streams.integration/LagFetchIntegrationTest/shouldFetchLagsDuringRestoration/]
> {quote}java.lang.NullPointerException at 
> org.apache.kafka.streams.integration.LagFetchIntegrationTest.shouldFetchLagsDuringRestoration(LagFetchIntegrationTest.java:306){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to