showuon edited a comment on pull request #11292:
URL: https://github.com/apache/kafka/pull/11292#issuecomment-915712946


   @guozhangwang , those are good questions. Let me answer them below:
   
   >     1. Do you know why the original test cases in 
AbstractWindowBytesStoreTest, like `shouldGetBackwardAll` and 
`testBackwardFetchRange` did not capture this bug? This test class is leveraged 
by the in-memory stores as well.
   
   That's right, those tests also tested in-memory stores, but it didn't test 
multiple records in the same window cases. Currently, in Window store, we store 
records in [segments -> [records] ]. 
   
   For example:
   window size = 500,
   input records:
   
   key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window
   key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window
   key: "c", value: "cc", timestamp: 510 ==> will be in [500, 1000] window
   
   So, internally, the "a" and "b" will be in the same segment, and "c" in 
another segments.
   segments: [0 /* window start */, records], [500, records].
   And the records for window start 0 will be "a" and "b".
   the records for window start 500 will be "c".
   
   Before this change, we did have a reverse iterator for segments, but not in 
"records". So, when doing `backwardFetchAll`, we'll have the records returned 
in order: "c", "a", "b", which should be "c", "b", "a" obviously. 
   
   So, back to the question: why did the original test cases not catch this 
issue?
   It's because the test input are all in different window start timestamp, 
which will have different different segments:
   ```
    private void putFirstBatch(final WindowStore<Integer, String> store,
                                  @SuppressWarnings("SameParameterValue") final 
long startTime,
                                  final InternalMockProcessorContext context) {
           context.setRecordContext(createRecordContext(startTime));
           store.put(0, "zero", startTime);
           store.put(1, "one", startTime + 1L);
           store.put(2, "two", startTime + 2L);
           store.put(3, "three", startTime + 2L);  // <-- this is the new 
record I added, to test multiple records in the same segment case
   
           store.put(4, "four", startTime + 4L);
           store.put(5, "five", startTime + 5L);
       }
   ```
   
   >     2. Related to 1), what additional coverage does the new 
`WindowStoreFetchTest` provides in addition to the above two test cases?
   
   I think I've added above. I added an additional record for 
`AbstractWindowBytesStoreTest` test. In `WindowStoreFetchTest`, we will produce 
records in timestamp:0, 1, 500, 501, 502, which will be put into window: [0, 
500] * 2 and [500, 1000] * 3. And we try to fetch them forward/backward, to see 
if the results are as expected, i.e.: in reverse order should be 502, 501, 500, 
1, 0. 
   
   The behavior works as expected in `RocksDBWindowStore`. 
   
   
   Hope that's clear. 
   I also updated in the PR description.
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to