1996fanrui commented on code in PR #865:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/865#discussion_r1716685312


##########
flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java:
##########
@@ -430,6 +447,75 @@ void testScalingEventWithParallelismChange() throws 
Exception {
                         });
     }
 
+    @MethodSource("getExpiredEventHandlersCaseMatrix")
+    @ParameterizedTest
+    void testCleanExpiredEvents(int expiredRecordsNum, Duration 
eventHandlerTtl) throws Exception {
+
+        final Clock clock = Clock.systemDefaultZone();
+        initExpiredEventHandlerRecords(eventHandlerTtl, expiredRecordsNum, 
clock);
+        final Timestamp createTimeOfUnExpiredRecord = 
Timestamp.from(clock.instant());
+        jdbcEventInteractor.createEvent(
+                "jobKey1",
+                "reason",
+                AutoScalerEventHandler.Type.Normal,
+                "message",
+                "eventKey",
+                createTimeOfUnExpiredRecord);

Review Comment:
   unexpired is a word, right?



##########
flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java:
##########
@@ -38,13 +47,34 @@
  * @param <Context> The job autoscaler context.
  */
 @Experimental
+@Slf4j
 public class JdbcAutoScalerEventHandler<KEY, Context extends 
JobAutoScalerContext<KEY>>
         implements AutoScalerEventHandler<KEY, Context> {
 
     private final JdbcEventInteractor jdbcEventInteractor;
+    private final Duration eventHandlerTtl;
+    @Nullable private final ScheduledExecutorService 
scheduledEventHandlerCleaner;
 
-    public JdbcAutoScalerEventHandler(JdbcEventInteractor jdbcEventInteractor) 
{
+    public JdbcAutoScalerEventHandler(
+            JdbcEventInteractor jdbcEventInteractor, Duration eventHandlerTtl) 
{
         this.jdbcEventInteractor = jdbcEventInteractor;
+        this.eventHandlerTtl = Preconditions.checkNotNull(eventHandlerTtl);
+
+        if (eventHandlerTtl.toMillis() <= 0) {
+            this.scheduledEventHandlerCleaner = null;
+            return;
+        }
+        this.scheduledEventHandlerCleaner =
+                Executors.newSingleThreadScheduledExecutor(
+                        new ThreadFactoryBuilder()
+                                
.setNameFormat("jdbc-autoscaler-events-cleaner")
+                                .setDaemon(false)

Review Comment:
   Why it's false here?



##########
flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java:
##########
@@ -152,4 +165,49 @@ protected List<AutoScalerEvent> queryEvents(String jobKey, 
String reason) throws
     void setClock(@Nonnull Clock clock) {
         this.clock = Preconditions.checkNotNull(clock);
     }
+
+    @Nullable
+    ExpiredEventsResult queryExpiredEventsAndMaxId(Duration eventHandlerTtl) 
throws Exception {
+        var query =
+                "SELECT COUNT(1) records_num, max(id) max_target_id "
+                        + "FROM t_flink_autoscaler_event_handler "
+                        + "WHERE create_time < ? AND id < ("
+                        + "   SELECT id FROM t_flink_autoscaler_event_handler "
+                        + "   WHERE create_time >= ? ORDER BY id ASC LIMIT 1)";
+        var date = 
Timestamp.from(clock.instant().minusMillis(eventHandlerTtl.toMillis()));
+        try (var pstmt = conn.prepareStatement(query)) {
+            pstmt.setObject(1, date);
+            pstmt.setObject(2, date);
+            ResultSet resultSet = pstmt.executeQuery();
+            if (!resultSet.next()) {
+                return null;
+            }
+            var result = new ExpiredEventsResult(resultSet.getInt(1), 
resultSet.getLong(2));
+            resultSet.close();

Review Comment:
   Don't need it, copy from the comment of `java.sql.ResultSet#close`. Let us 
keep all queries are same style.
   
   `Note: A ResultSet object is automatically closed by the Statement object 
that generated it when that Statement object is closed, re-executed, or is used 
to retrieve the next result from a sequence of multiple results.`



##########
flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java:
##########
@@ -92,12 +94,23 @@ public void createEvent(
             String message,
             String eventKey)
             throws Exception {
+        createEvent(jobKey, reason, type, message, eventKey, 
Timestamp.from(clock.instant()));
+    }
+
+    @VisibleForTesting
+    void createEvent(
+            String jobKey,
+            String reason,
+            AutoScalerEventHandler.Type type,
+            String message,
+            String eventKey,
+            Timestamp createTime)
+            throws Exception {

Review Comment:
   Don't need introduce new method for testing, you could setClock to control 
the timestamp.
   
   Check 
`AbstractJdbcAutoscalerEventHandlerITCase#testEventIntervalWithoutMessageKey`



##########
flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java:
##########
@@ -430,6 +447,75 @@ void testScalingEventWithParallelismChange() throws 
Exception {
                         });
     }
 
+    @MethodSource("getExpiredEventHandlersCaseMatrix")
+    @ParameterizedTest
+    void testCleanExpiredEvents(int expiredRecordsNum, Duration 
eventHandlerTtl) throws Exception {
+
+        final Clock clock = Clock.systemDefaultZone();
+        initExpiredEventHandlerRecords(eventHandlerTtl, expiredRecordsNum, 
clock);
+        final Timestamp createTimeOfUnExpiredRecord = 
Timestamp.from(clock.instant());
+        jdbcEventInteractor.createEvent(
+                "jobKey1",
+                "reason",
+                AutoScalerEventHandler.Type.Normal,
+                "message",
+                "eventKey",
+                createTimeOfUnExpiredRecord);
+
+        eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, 
eventHandlerTtl);
+        eventHandler.cleanExpiredEvents();
+
+        // Check
+        try (ResultSet countResultSet =
+                        quickQuery("select count(1) from 
t_flink_autoscaler_event_handler");
+                ResultSet createTimeResultSet =
+                        quickQuery("select create_time from 
t_flink_autoscaler_event_handler")) {
+            assertThat(countResultSet).isNotNull();
+            assertThat(countResultSet.next()).isTrue();
+            assertThat(countResultSet.getInt(1)).isOne();
+
+            assertThat(createTimeResultSet).isNotNull();
+            assertThat(createTimeResultSet.next()).isTrue();
+            
assertThat(createTimeResultSet.getTimestamp(1)).isEqualTo(createTimeOfUnExpiredRecord);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);

Review Comment:
   Don't understand why need this catch?



##########
flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java:
##########
@@ -430,6 +447,75 @@ void testScalingEventWithParallelismChange() throws 
Exception {
                         });
     }
 
+    @MethodSource("getExpiredEventHandlersCaseMatrix")
+    @ParameterizedTest
+    void testCleanExpiredEvents(int expiredRecordsNum, Duration 
eventHandlerTtl) throws Exception {
+
+        final Clock clock = Clock.systemDefaultZone();
+        initExpiredEventHandlerRecords(eventHandlerTtl, expiredRecordsNum, 
clock);
+        final Timestamp createTimeOfUnExpiredRecord = 
Timestamp.from(clock.instant());
+        jdbcEventInteractor.createEvent(
+                "jobKey1",
+                "reason",
+                AutoScalerEventHandler.Type.Normal,
+                "message",
+                "eventKey",
+                createTimeOfUnExpiredRecord);
+
+        eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, 
eventHandlerTtl);
+        eventHandler.cleanExpiredEvents();
+
+        // Check
+        try (ResultSet countResultSet =
+                        quickQuery("select count(1) from 
t_flink_autoscaler_event_handler");
+                ResultSet createTimeResultSet =
+                        quickQuery("select create_time from 
t_flink_autoscaler_event_handler")) {
+            assertThat(countResultSet).isNotNull();
+            assertThat(countResultSet.next()).isTrue();
+            assertThat(countResultSet.getInt(1)).isOne();

Review Comment:
   Could we extract the unexpired record number into one parameter? It could 
cover more cases.



##########
flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java:
##########
@@ -430,6 +447,75 @@ void testScalingEventWithParallelismChange() throws 
Exception {
                         });
     }
 
+    @MethodSource("getExpiredEventHandlersCaseMatrix")
+    @ParameterizedTest
+    void testCleanExpiredEvents(int expiredRecordsNum, Duration 
eventHandlerTtl) throws Exception {
+
+        final Clock clock = Clock.systemDefaultZone();
+        initExpiredEventHandlerRecords(eventHandlerTtl, expiredRecordsNum, 
clock);
+        final Timestamp createTimeOfUnExpiredRecord = 
Timestamp.from(clock.instant());
+        jdbcEventInteractor.createEvent(

Review Comment:
   Why don't insert event via event handler? It's a little wired that creating 
event hanldler after inserting data. In addion, It could cover more cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to