1996fanrui commented on code in PR #865:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/865#discussion_r1722780881


##########
flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java:
##########
@@ -430,6 +446,74 @@ void testScalingEventWithParallelismChange() throws 
Exception {
                         });
     }
 
+    @MethodSource("getExpiredEventHandlersCaseMatrix")
+    @ParameterizedTest
+    void testCleanExpiredEvents(
+            int expiredRecordsNum, Duration eventHandlerTtl, int 
unexpiredRecordsNum)
+            throws Exception {
+        try (Connection con = getConnection();
+                PreparedStatement ps =
+                        con.prepareStatement("delete from 
t_flink_autoscaler_event_handler")) {
+            ps.execute();
+        }
+        // Init the expired records.
+        initTestingEventHandlerRecords(
+                eventHandlerTtl,
+                expiredRecordsNum,
+                (duration, index) ->
+                        Clock.fixed(
+                                createTime.minusMillis(
+                                        eventHandlerTtl.toMillis() + index * 
1000L + 1L),
+                                ZoneId.systemDefault()));
+        // Init the unexpired records.
+        initTestingEventHandlerRecords(
+                eventHandlerTtl,
+                unexpiredRecordsNum,
+                (duration, index) ->
+                        Clock.fixed(createTime.plusMillis(index), 
ZoneId.systemDefault()));
+
+        eventHandler.cleanExpiredEvents();
+
+        try (Connection connection = getConnection();
+                PreparedStatement ps =
+                        connection.prepareStatement(
+                                "select count(1) from 
t_flink_autoscaler_event_handler");
+                ResultSet countResultSet = ps.executeQuery()) {
+            countResultSet.next();
+            
assertThat(countResultSet.getInt(1)).isEqualTo(unexpiredRecordsNum);
+        }
+    }
+
+    private void initTestingEventHandlerRecords(
+            Duration eventHandlerTtl,
+            int recordsNum,
+            BiFunction<Duration, Integer, Clock> clockGetter) {
+
+        for (int i = 0; i < recordsNum; i++) {
+            jdbcEventInteractor.setClock(clockGetter.apply(eventHandlerTtl, 
i));
+            eventHandler.handleEvent(
+                    ctx,
+                    AutoScalerEventHandler.Type.Normal,
+                    "reason",
+                    "message",
+                    "messageKey",
+                    null);
+        }
+        jdbcEventInteractor.setClock(defaultClock);
+    }
+
+    private static Stream<Arguments> getExpiredEventHandlersCaseMatrix() {
+        return Stream.of(
+                Arguments.of(1024, Duration.ofMinutes(2), 1),
+                Arguments.of(1024, Duration.ofMinutes(2), 0),
+                Arguments.of(1024 * 5, Duration.ofMinutes(2), 2),
+                Arguments.of(1024 * 5, Duration.ofMinutes(2), 0),
+                Arguments.of(1024, Duration.ofMinutes(100), 3),
+                Arguments.of(1024, Duration.ofMinutes(100), 0),
+                Arguments.of(1024 * 5, Duration.ofMinutes(100), 100),
+                Arguments.of(1024 * 5, Duration.ofMinutes(100), 0));

Review Comment:
   You could try run `unexpiredRecordsNum = 0` locally, all records cannot be 
cleaned.
   
   In addition, it's needed to add the case that `expiredRecordsNum = 0.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to