1996fanrui commented on code in PR #865: URL: https://github.com/apache/flink-kubernetes-operator/pull/865#discussion_r1716685312
########## flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java: ########## @@ -430,6 +447,75 @@ void testScalingEventWithParallelismChange() throws Exception { }); } + @MethodSource("getExpiredEventHandlersCaseMatrix") + @ParameterizedTest + void testCleanExpiredEvents(int expiredRecordsNum, Duration eventHandlerTtl) throws Exception { + + final Clock clock = Clock.systemDefaultZone(); + initExpiredEventHandlerRecords(eventHandlerTtl, expiredRecordsNum, clock); + final Timestamp createTimeOfUnExpiredRecord = Timestamp.from(clock.instant()); + jdbcEventInteractor.createEvent( + "jobKey1", + "reason", + AutoScalerEventHandler.Type.Normal, + "message", + "eventKey", + createTimeOfUnExpiredRecord); Review Comment: unexpired is a word, right? ########## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java: ########## @@ -38,13 +47,34 @@ * @param <Context> The job autoscaler context. */ @Experimental +@Slf4j public class JdbcAutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> implements AutoScalerEventHandler<KEY, Context> { private final JdbcEventInteractor jdbcEventInteractor; + private final Duration eventHandlerTtl; + @Nullable private final ScheduledExecutorService scheduledEventHandlerCleaner; - public JdbcAutoScalerEventHandler(JdbcEventInteractor jdbcEventInteractor) { + public JdbcAutoScalerEventHandler( + JdbcEventInteractor jdbcEventInteractor, Duration eventHandlerTtl) { this.jdbcEventInteractor = jdbcEventInteractor; + this.eventHandlerTtl = Preconditions.checkNotNull(eventHandlerTtl); + + if (eventHandlerTtl.toMillis() <= 0) { + this.scheduledEventHandlerCleaner = null; + return; + } + this.scheduledEventHandlerCleaner = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("jdbc-autoscaler-events-cleaner") + .setDaemon(false) Review Comment: Why it's false here? ########## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java: ########## @@ -152,4 +165,49 @@ protected List<AutoScalerEvent> queryEvents(String jobKey, String reason) throws void setClock(@Nonnull Clock clock) { this.clock = Preconditions.checkNotNull(clock); } + + @Nullable + ExpiredEventsResult queryExpiredEventsAndMaxId(Duration eventHandlerTtl) throws Exception { + var query = + "SELECT COUNT(1) records_num, max(id) max_target_id " + + "FROM t_flink_autoscaler_event_handler " + + "WHERE create_time < ? AND id < (" + + " SELECT id FROM t_flink_autoscaler_event_handler " + + " WHERE create_time >= ? ORDER BY id ASC LIMIT 1)"; + var date = Timestamp.from(clock.instant().minusMillis(eventHandlerTtl.toMillis())); + try (var pstmt = conn.prepareStatement(query)) { + pstmt.setObject(1, date); + pstmt.setObject(2, date); + ResultSet resultSet = pstmt.executeQuery(); + if (!resultSet.next()) { + return null; + } + var result = new ExpiredEventsResult(resultSet.getInt(1), resultSet.getLong(2)); + resultSet.close(); Review Comment: Don't need it, copy from the comment of `java.sql.ResultSet#close`. Let us keep all queries are same style. `Note: A ResultSet object is automatically closed by the Statement object that generated it when that Statement object is closed, re-executed, or is used to retrieve the next result from a sequence of multiple results.` ########## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java: ########## @@ -92,12 +94,23 @@ public void createEvent( String message, String eventKey) throws Exception { + createEvent(jobKey, reason, type, message, eventKey, Timestamp.from(clock.instant())); + } + + @VisibleForTesting + void createEvent( + String jobKey, + String reason, + AutoScalerEventHandler.Type type, + String message, + String eventKey, + Timestamp createTime) + throws Exception { Review Comment: Don't need introduce new method for testing, you could setClock to control the timestamp. Check `AbstractJdbcAutoscalerEventHandlerITCase#testEventIntervalWithoutMessageKey` ########## flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java: ########## @@ -430,6 +447,75 @@ void testScalingEventWithParallelismChange() throws Exception { }); } + @MethodSource("getExpiredEventHandlersCaseMatrix") + @ParameterizedTest + void testCleanExpiredEvents(int expiredRecordsNum, Duration eventHandlerTtl) throws Exception { + + final Clock clock = Clock.systemDefaultZone(); + initExpiredEventHandlerRecords(eventHandlerTtl, expiredRecordsNum, clock); + final Timestamp createTimeOfUnExpiredRecord = Timestamp.from(clock.instant()); + jdbcEventInteractor.createEvent( + "jobKey1", + "reason", + AutoScalerEventHandler.Type.Normal, + "message", + "eventKey", + createTimeOfUnExpiredRecord); + + eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, eventHandlerTtl); + eventHandler.cleanExpiredEvents(); + + // Check + try (ResultSet countResultSet = + quickQuery("select count(1) from t_flink_autoscaler_event_handler"); + ResultSet createTimeResultSet = + quickQuery("select create_time from t_flink_autoscaler_event_handler")) { + assertThat(countResultSet).isNotNull(); + assertThat(countResultSet.next()).isTrue(); + assertThat(countResultSet.getInt(1)).isOne(); + + assertThat(createTimeResultSet).isNotNull(); + assertThat(createTimeResultSet.next()).isTrue(); + assertThat(createTimeResultSet.getTimestamp(1)).isEqualTo(createTimeOfUnExpiredRecord); + } catch (SQLException e) { + throw new RuntimeException(e); Review Comment: Don't understand why need this catch? ########## flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java: ########## @@ -430,6 +447,75 @@ void testScalingEventWithParallelismChange() throws Exception { }); } + @MethodSource("getExpiredEventHandlersCaseMatrix") + @ParameterizedTest + void testCleanExpiredEvents(int expiredRecordsNum, Duration eventHandlerTtl) throws Exception { + + final Clock clock = Clock.systemDefaultZone(); + initExpiredEventHandlerRecords(eventHandlerTtl, expiredRecordsNum, clock); + final Timestamp createTimeOfUnExpiredRecord = Timestamp.from(clock.instant()); + jdbcEventInteractor.createEvent( + "jobKey1", + "reason", + AutoScalerEventHandler.Type.Normal, + "message", + "eventKey", + createTimeOfUnExpiredRecord); + + eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, eventHandlerTtl); + eventHandler.cleanExpiredEvents(); + + // Check + try (ResultSet countResultSet = + quickQuery("select count(1) from t_flink_autoscaler_event_handler"); + ResultSet createTimeResultSet = + quickQuery("select create_time from t_flink_autoscaler_event_handler")) { + assertThat(countResultSet).isNotNull(); + assertThat(countResultSet.next()).isTrue(); + assertThat(countResultSet.getInt(1)).isOne(); Review Comment: Could we extract the unexpired record number into one parameter? It could cover more cases. ########## flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java: ########## @@ -430,6 +447,75 @@ void testScalingEventWithParallelismChange() throws Exception { }); } + @MethodSource("getExpiredEventHandlersCaseMatrix") + @ParameterizedTest + void testCleanExpiredEvents(int expiredRecordsNum, Duration eventHandlerTtl) throws Exception { + + final Clock clock = Clock.systemDefaultZone(); + initExpiredEventHandlerRecords(eventHandlerTtl, expiredRecordsNum, clock); + final Timestamp createTimeOfUnExpiredRecord = Timestamp.from(clock.instant()); + jdbcEventInteractor.createEvent( Review Comment: Why don't insert event via event handler? It's a little wired that creating event hanldler after inserting data. In addion, It could cover more cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org