C0urante commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1175063429


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 
150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, 
tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, 
tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 
20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, 
tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 
0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 
100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 
200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, 
tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, 
tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 
0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, 
tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest 
startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier 
than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is 
still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and 
the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther 
apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, 
tp, 1499));
+            assertEquals(OptionalLong.of(11000), 
store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), 
store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // We should not expire more syncs from the store than necessary;
+        // Each new sync should expire at most two other syncs from the cache
+        long iterations = 1000000;
+        long maxStep = Long.MAX_VALUE / iterations;
+        // Test a variety of steps (corresponding to the offset.lag.max 
configuration)
+        for (long step = 1; step < maxStep; step = (step << 1) + 1)  {
+            try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+                int lastCount = 1;
+                store.start();
+                for (long offset = 0; offset <= iterations; offset += step) {
+                    store.sync(tp, offset, offset);
+                    // Invariant A: the latest sync is present
+                    assertEquals(offset, store.syncFor(tp, 
0).upstreamOffset());
+                    // Invariant D: the earliest sync is present
+                    assertEquals(0L, store.syncFor(tp, 63).upstreamOffset());
+                    int count = countDistinctStoredSyncs(store, tp);
+                    int diff = count - lastCount;
+                    assertTrue(diff >= -1,
+                            "Store expired too many syncs: " + diff + " after 
receiving offset " + offset);
+                    lastCount = count;
+                }
+            }
+        }
+    }
+
+    private void assertSparseSync(FakeOffsetSyncStore store, long syncOffset, 
long previousOffset) {
+        assertEquals(OptionalLong.of(previousOffset == -1 ? previousOffset : 
previousOffset + 1), store.translateDownstream(null, tp, syncOffset - 1));
+        assertEquals(OptionalLong.of(syncOffset), 
store.translateDownstream(null, tp, syncOffset));
+        assertEquals(OptionalLong.of(syncOffset + 1), 
store.translateDownstream(null, tp, syncOffset + 1));
+        assertEquals(OptionalLong.of(syncOffset + 1), 
store.translateDownstream(null, tp, syncOffset + 2));
+    }
+
+    private int countDistinctStoredSyncs(FakeOffsetSyncStore store, 
TopicPartition topicPartition) {
+        int count = 1;
+        for (int i = 1; i < OffsetSyncStore.SYNCS_PER_PARTITION; i++) {
+            if (store.syncFor(topicPartition, i - 1) != 
store.syncFor(topicPartition, i)) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private void assertSparseSyncInvariant(FakeOffsetSyncStore store, 
TopicPartition topicPartition) {
+        for (int j = 0; j < OffsetSyncStore.SYNCS_PER_PARTITION; j++) {
+            for (int i = 0; i <= j; i++) {

Review Comment:
   Invariants B and C deal exclusively with values of `i` and `j` that are not 
equal; shouldn't this be reflected here (especially since the `jUpstream == 
iUpstream` branch below makes it unnecessary to test the case when `i == j`)?
   ```suggestion
               for (int i = 0; i < j; i++) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to