1996fanrui commented on code in PR #865: URL: https://github.com/apache/flink-kubernetes-operator/pull/865#discussion_r1722780881
########## flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java: ########## @@ -430,6 +446,74 @@ void testScalingEventWithParallelismChange() throws Exception { }); } + @MethodSource("getExpiredEventHandlersCaseMatrix") + @ParameterizedTest + void testCleanExpiredEvents( + int expiredRecordsNum, Duration eventHandlerTtl, int unexpiredRecordsNum) + throws Exception { + try (Connection con = getConnection(); + PreparedStatement ps = + con.prepareStatement("delete from t_flink_autoscaler_event_handler")) { + ps.execute(); + } + // Init the expired records. + initTestingEventHandlerRecords( + eventHandlerTtl, + expiredRecordsNum, + (duration, index) -> + Clock.fixed( + createTime.minusMillis( + eventHandlerTtl.toMillis() + index * 1000L + 1L), + ZoneId.systemDefault())); + // Init the unexpired records. + initTestingEventHandlerRecords( + eventHandlerTtl, + unexpiredRecordsNum, + (duration, index) -> + Clock.fixed(createTime.plusMillis(index), ZoneId.systemDefault())); + + eventHandler.cleanExpiredEvents(); + + try (Connection connection = getConnection(); + PreparedStatement ps = + connection.prepareStatement( + "select count(1) from t_flink_autoscaler_event_handler"); + ResultSet countResultSet = ps.executeQuery()) { + countResultSet.next(); + assertThat(countResultSet.getInt(1)).isEqualTo(unexpiredRecordsNum); + } + } + + private void initTestingEventHandlerRecords( + Duration eventHandlerTtl, + int recordsNum, + BiFunction<Duration, Integer, Clock> clockGetter) { + + for (int i = 0; i < recordsNum; i++) { + jdbcEventInteractor.setClock(clockGetter.apply(eventHandlerTtl, i)); + eventHandler.handleEvent( + ctx, + AutoScalerEventHandler.Type.Normal, + "reason", + "message", + "messageKey", + null); + } + jdbcEventInteractor.setClock(defaultClock); + } + + private static Stream<Arguments> getExpiredEventHandlersCaseMatrix() { + return Stream.of( + Arguments.of(1024, Duration.ofMinutes(2), 1), + Arguments.of(1024, Duration.ofMinutes(2), 0), + Arguments.of(1024 * 5, Duration.ofMinutes(2), 2), + Arguments.of(1024 * 5, Duration.ofMinutes(2), 0), + Arguments.of(1024, Duration.ofMinutes(100), 3), + Arguments.of(1024, Duration.ofMinutes(100), 0), + Arguments.of(1024 * 5, Duration.ofMinutes(100), 100), + Arguments.of(1024 * 5, Duration.ofMinutes(100), 0)); Review Comment: You could try run `unexpiredRecordsNum = 0` locally, all records cannot be cleaned. In addition, it's needed to add the case that `expiredRecordsNum = 0.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org