1996fanrui commented on code in PR #865: URL: https://github.com/apache/flink-kubernetes-operator/pull/865#discussion_r1722589069
########## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java: ########## @@ -152,4 +154,53 @@ protected List<AutoScalerEvent> queryEvents(String jobKey, String reason) throws void setClock(@Nonnull Clock clock) { this.clock = Preconditions.checkNotNull(clock); } + + @Nullable + ExpiredEventsResult queryExpiredEventsAndMaxId(Duration eventHandlerTtl) throws Exception { + var query = + "SELECT COUNT(1) records_num, min(id) min_existed_id, max(id) max_target_id " + + "FROM t_flink_autoscaler_event_handler " + + "WHERE create_time < ? AND id < (" + + " SELECT id FROM t_flink_autoscaler_event_handler " + + " WHERE create_time >= ? ORDER BY id ASC LIMIT 1)"; + var date = Timestamp.from(clock.instant().minusMillis(eventHandlerTtl.toMillis())); + try (var pstmt = conn.prepareStatement(query)) { + pstmt.setObject(1, date); + pstmt.setObject(2, date); + ResultSet resultSet = pstmt.executeQuery(); + if (!resultSet.next()) { + return null; + } + var result = + new ExpiredEventsResult( + resultSet.getInt(1), resultSet.getLong(2), resultSet.getLong(3)); + resultSet.close(); + return result; + } + } + + void deleteExpiredEventsByIdRange(long startId, long endId) throws Exception { + var query = "delete from t_flink_autoscaler_event_handler where id >= ? and id <= ?"; + try (var pstmt = conn.prepareStatement(query)) { + pstmt.setObject(1, startId); + pstmt.setObject(2, endId); + pstmt.execute(); + } + } + + /** + * The class to represent the query result of the max id in the expired records and the number + * of the expired event handlers. + */ + static class ExpiredEventsResult { + int expiredRecords; + long minId; + long maxId; + + public ExpiredEventsResult(int expiredRecords, long minId, long maxId) { + this.expiredRecords = expiredRecords; Review Comment: After introducing minId, the `expiredRecords` is not needed anymore, right? Also, please update the comment of ExpiredEventsResult as well. ########## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java: ########## @@ -104,6 +134,41 @@ public void handleScalingEvent( } } + @Override + public void close() { + if (Objects.nonNull(scheduledEventHandlerCleaner) + && !scheduledEventHandlerCleaner.isShutdown()) { + scheduledEventHandlerCleaner.shutdownNow(); + } + } + + @VisibleForTesting + void cleanExpiredEvents() { + try { + JdbcEventInteractor.ExpiredEventsResult expiredResult = + jdbcEventInteractor.queryExpiredEventsAndMaxId(eventHandlerTtl); + if (Objects.isNull(expiredResult)) { + log.warn("No expired event handlers queried at {}", new Date()); + return; + } + var numToClean = expiredResult.expiredRecords; + var batch = 4098; + var restInterval = 10L; + while (numToClean > 0) { + jdbcEventInteractor.deleteExpiredEventsByMaxIdAndBatch(expiredResult.maxId, batch); + numToClean -= batch; + log.debug("Deleted expired event handler records by batch successfully."); + Thread.sleep(restInterval); + } + log.info( + "Deleted expired {} event handler records successfully whose id is smaller than {}.", Review Comment: smaller than or equal to ? ########## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java: ########## @@ -104,6 +134,43 @@ public void handleScalingEvent( } } + @Override + public void close() { + if (Objects.nonNull(scheduledEventHandlerCleaner) + && !scheduledEventHandlerCleaner.isShutdown()) { + scheduledEventHandlerCleaner.shutdownNow(); + } + } + + @VisibleForTesting + void cleanExpiredEvents() { + try { + JdbcEventInteractor.ExpiredEventsResult expiredResult = + jdbcEventInteractor.queryExpiredEventsAndMaxId(eventHandlerTtl); + if (Objects.isNull(expiredResult)) { + log.warn("No expired event handlers queried at {}", new Date()); + return; + } + var batch = 4098; + var startId = expiredResult.minId; + var endId = Math.min(startId + batch, expiredResult.maxId); + while (startId <= expiredResult.maxId) { + jdbcEventInteractor.deleteExpiredEventsByIdRange(startId, endId); + startId += batch; + endId = Math.min(endId + batch, expiredResult.maxId); + log.debug("Deleted expired event handler records by batch successfully."); + Thread.sleep(10L); + } Review Comment: It seems the loop is pretty easier than while loop, and we don't need to introduce a series of variables. ```suggestion var batch = 4098; for (var startId = expiredResult.minId; startId <= expiredResult.maxId; startId += batch) { jdbcEventInteractor.deleteExpiredEventsByIdRange(startId, Math.min(start + batch, expiredResult.maxId)); log.debug("Deleted expired event handler records by batch successfully."); Thread.sleep(10L); } ``` ########## flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java: ########## @@ -430,6 +446,70 @@ void testScalingEventWithParallelismChange() throws Exception { }); } + @MethodSource("getExpiredEventHandlersCaseMatrix") + @ParameterizedTest + void testCleanExpiredEvents( + int expiredRecordsNum, Duration eventHandlerTtl, int unexpiredRecordsNum) + throws Exception { + try (Connection con = getConnection(); + PreparedStatement ps = + con.prepareStatement("delete from t_flink_autoscaler_event_handler")) { + ps.execute(); + } Review Comment: Did you find the `t_flink_autoscaler_event_handler` table has data? ########## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java: ########## @@ -152,4 +165,49 @@ protected List<AutoScalerEvent> queryEvents(String jobKey, String reason) throws void setClock(@Nonnull Clock clock) { this.clock = Preconditions.checkNotNull(clock); } + + @Nullable + ExpiredEventsResult queryExpiredEventsAndMaxId(Duration eventHandlerTtl) throws Exception { + var query = + "SELECT COUNT(1) records_num, max(id) max_target_id " + + "FROM t_flink_autoscaler_event_handler " + + "WHERE create_time < ? AND id < (" + + " SELECT id FROM t_flink_autoscaler_event_handler " + + " WHERE create_time >= ? ORDER BY id ASC LIMIT 1)"; + var date = Timestamp.from(clock.instant().minusMillis(eventHandlerTtl.toMillis())); + try (var pstmt = conn.prepareStatement(query)) { + pstmt.setObject(1, date); + pstmt.setObject(2, date); + ResultSet resultSet = pstmt.executeQuery(); + if (!resultSet.next()) { + return null; + } + var result = new ExpiredEventsResult(resultSet.getInt(1), resultSet.getLong(2)); + resultSet.close(); Review Comment: is this comment not addressed? ########## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java: ########## @@ -152,4 +154,53 @@ protected List<AutoScalerEvent> queryEvents(String jobKey, String reason) throws void setClock(@Nonnull Clock clock) { this.clock = Preconditions.checkNotNull(clock); } + + @Nullable + ExpiredEventsResult queryExpiredEventsAndMaxId(Duration eventHandlerTtl) throws Exception { + var query = + "SELECT COUNT(1) records_num, min(id) min_existed_id, max(id) max_target_id " Review Comment: could you unify the name of min_existed_id and max_target_id? ########## flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java: ########## @@ -430,6 +446,70 @@ void testScalingEventWithParallelismChange() throws Exception { }); } + @MethodSource("getExpiredEventHandlersCaseMatrix") + @ParameterizedTest + void testCleanExpiredEvents( + int expiredRecordsNum, Duration eventHandlerTtl, int unexpiredRecordsNum) + throws Exception { + try (Connection con = getConnection(); + PreparedStatement ps = + con.prepareStatement("delete from t_flink_autoscaler_event_handler")) { + ps.execute(); + } + // Init the expired records. + initTestingEventHandlerRecords( + eventHandlerTtl, + expiredRecordsNum, + (duration, index) -> + Clock.fixed( + createTime.minusMillis( + eventHandlerTtl.toMillis() + index * 1000L + 1L), + ZoneId.systemDefault())); + // Init the unexpired records. + initTestingEventHandlerRecords( + eventHandlerTtl, + unexpiredRecordsNum, + (duration, index) -> + Clock.fixed(createTime.plusMillis(index), ZoneId.systemDefault())); + + eventHandler.cleanExpiredEvents(); + + try (Connection connection = getConnection(); + PreparedStatement ps = + connection.prepareStatement( + "select count(1) from t_flink_autoscaler_event_handler"); + ResultSet countResultSet = ps.executeQuery()) { + countResultSet.next(); + assertThat(countResultSet.getInt(1)).isEqualTo(unexpiredRecordsNum); + } + } + + private void initTestingEventHandlerRecords( + Duration eventHandlerTtl, + int recordsNum, + BiFunction<Duration, Integer, Clock> clockGetter) { + + for (int i = 0; i < recordsNum; i++) { + jdbcEventInteractor.setClock(clockGetter.apply(eventHandlerTtl, i)); + eventHandler.handleEvent( + ctx, + AutoScalerEventHandler.Type.Normal, + "reason", + "message", + "messageKey", + null); + } + jdbcEventInteractor.setClock(defaultClock); + } + + private static Stream<Arguments> getExpiredEventHandlersCaseMatrix() { + return Stream.of( + Arguments.of(1024, Duration.ofMinutes(2), 1), + Arguments.of(1024 * 5, Duration.ofMinutes(2), 2), + Arguments.of(1024, Duration.ofMinutes(100), 3), + Arguments.of(1024 * 5, Duration.ofMinutes(100), 100)); Review Comment: This test don't cover the case that `unexpiredRecordsNum` is 0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org