lsyldliu commented on code in PR #25108:
URL: https://github.com/apache/flink/pull/25108#discussion_r1685891811


##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -1520,6 +1520,58 @@ void 
testRefreshMaterializedTableWithInvalidParameterInContinuousMode() throws E
                                         .asSerializableString()));
     }
 
+    @Test
+    void testMaterializedTableDefinitionQueryContainsTemporaryResources() 
throws Exception {

Review Comment:
   This is an unrelated test with your fix?



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -1417,7 +1417,7 @@ void 
testPeriodicRefreshMaterializedTableWithPartitionOptions() throws Exception
                         sessionHandle,
                         materializedTableIdentifier.asSerializableString(),
                         true,
-                        "2024-01-02 00:00:00",
+                        "2024-01-03 00:00:00",

Review Comment:
   I found this test should be optimized as follow:
   ```
           List<Row> data = new ArrayList<>();
           // create materialized table with partition formatter
           createAndVerifyCreateMaterializedTableWithData(
                   "my_materialized_table",
                   data,
                   Collections.singletonMap("ds", "yyyy-MM-dd"),
                   RefreshMode.FULL);
   
           ObjectIdentifier materializedTableIdentifier =
                   ObjectIdentifier.of(
                           fileSystemCatalogName, TEST_DEFAULT_DATABASE, 
"my_materialized_table");
   
           // add data to all data list
           data.add(Row.of(1L, 1L, 1L, "2024-01-01"));
           data.add(Row.of(2L, 2L, 2L, "2024-01-02"));
           data.add(Row.of(4L, 4L, 4L, "2024-01-02"));
   
           // refresh the materialized table with period schedule
           long startTime = System.currentTimeMillis();
           OperationHandle periodRefreshTableHandle =
                   service.refreshMaterializedTable(
                           sessionHandle,
                           materializedTableIdentifier.asSerializableString(),
                           true,
                           "2024-01-03 00:00:00",
                           Collections.emptyMap(),
                           Collections.emptyMap(),
                           Collections.emptyMap());
   ```
   Only then periodic refresh is matched with full mode, not continuous mode.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java:
##########
@@ -99,56 +103,340 @@ void testGenerateInsertStatementWithDynamicOptions() {
         assertThat(actualStatement).isEqualTo(expectedStatement);
     }
 
-    @Test
-    void testGetPeriodRefreshPartition() {
-        String schedulerTime = "2024-01-01 00:00:00";
-        Map<String, String> tableOptions = new HashMap<>();
-        tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd");
-        tableOptions.put("partition.fields.hour.date-formatter", "HH");
-
+    @ParameterizedTest(name = "{index}: {0}")
+    @MethodSource("testData")
+    void testGetPeriodRefreshPartition(TestSpec testSpec) {
         ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog", 
"database", "table");
 
-        Map<String, String> actualRefreshPartition =
-                MaterializedTableManager.getPeriodRefreshPartition(
-                        schedulerTime, objectIdentifier, tableOptions, 
ZoneId.systemDefault());
-        Map<String, String> expectedRefreshPartition = new HashMap<>();
-        expectedRefreshPartition.put("day", "2024-01-01");
-        expectedRefreshPartition.put("hour", "00");
+        if (testSpec.errorMessage == null) {
+            Map<String, String> actualRefreshPartition =
+                    MaterializedTableManager.getPeriodRefreshPartition(
+                            testSpec.schedulerTime,
+                            testSpec.freshness,
+                            objectIdentifier,
+                            testSpec.tableOptions,
+                            ZoneId.systemDefault());
 
-        assertThat(actualRefreshPartition).isEqualTo(expectedRefreshPartition);
+            
assertThat(actualRefreshPartition).isEqualTo(testSpec.expectedRefreshPartition);
+        } else {
+            assertThatThrownBy(
+                            () ->
+                                    
MaterializedTableManager.getPeriodRefreshPartition(
+                                            testSpec.schedulerTime,
+                                            testSpec.freshness,
+                                            objectIdentifier,
+                                            testSpec.tableOptions,
+                                            ZoneId.systemDefault()))
+                    .hasMessage(testSpec.errorMessage);
+        }
     }
 
-    @Test
-    void testGetPeriodRefreshPartitionWithInvalidSchedulerTime() {
-        // scheduler time is null
-        Map<String, String> tableOptions = new HashMap<>();
-        tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd");
-        tableOptions.put("partition.fields.hour.date-formatter", "HH");
+    static Stream<TestSpec> testData() {
+        return Stream.of(
+                // The interval of freshness match the partition specified by 
the 'date-formatter'.
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofDay("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .expectedRefreshPartition("day", "2023-12-31"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-02 00:00:00")
+                        .freshness(IntervalFreshness.ofDay("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .expectedRefreshPartition("day", "2024-01-01"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-02 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2024-01-01")
+                        .expectedRefreshPartition("hour", "23"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-02 01:00:00")
+                        .freshness(IntervalFreshness.ofHour("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2024-01-02")
+                        .expectedRefreshPartition("hour", "00"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("2"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "22"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("4"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "20"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("8"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "16"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("12"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "12"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "59"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("2"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "58"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("4"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "56"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("5"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "55"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("6"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "54"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("10"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "50"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("12"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "48"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("15"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "45"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("30"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "30"),
 
-        ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog", 
"database", "table");
+                // The interval of freshness is larger than the partition 
specified by the
+                // 'date-formatter'.
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofDay("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "00"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofDay("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "00")
+                        .expectedRefreshPartition("minute", "00"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "00"),

Review Comment:
   Add test case:
   ```
                   TestSpec.create()
                           .schedulerTime("2024-01-01 01:00:00")
                           .freshness(IntervalFreshness.ofHour("1"))
                           .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                           
.tableOptions("partition.fields.hour.date-formatter", "HH")
                           
.tableOptions("partition.fields.minute.date-formatter", "mm")
                           .expectedRefreshPartition("day", "2024-01-01")
                           .expectedRefreshPartition("hour", "00")
                           .expectedRefreshPartition("minute", "00"),
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java:
##########
@@ -735,12 +736,26 @@ public static String formatTimestampMillis(long ts, 
String format, TimeZone tz)
 
     public static String formatTimestampString(
             String dateStr, String fromFormat, String toFormat, TimeZone tz) {
+        return formatTimestampStringWithOffset(dateStr, fromFormat, toFormat, 
tz, 0);
+    }
+
+    public static String formatTimestampStringWithOffset(
+            String dateStr, String fromFormat, String toFormat, TimeZone tz, 
long offsetMills) {
         SimpleDateFormat fromFormatter = FORMATTER_CACHE.get(fromFormat);
         fromFormatter.setTimeZone(tz);
         SimpleDateFormat toFormatter = FORMATTER_CACHE.get(toFormat);
         toFormatter.setTimeZone(tz);
         try {
-            return toFormatter.format(fromFormatter.parse(dateStr));
+            Date date = fromFormatter.parse(dateStr);

Review Comment:
   Can we change the code as follows:
   ```
               Date date = fromFormatter.parse(dateStr);
   
               if (offsetMills != 0) {
                   date = new Date(date.getTime() + offsetMills);
               }
   ```



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java:
##########
@@ -99,56 +103,340 @@ void testGenerateInsertStatementWithDynamicOptions() {
         assertThat(actualStatement).isEqualTo(expectedStatement);
     }
 
-    @Test
-    void testGetPeriodRefreshPartition() {
-        String schedulerTime = "2024-01-01 00:00:00";
-        Map<String, String> tableOptions = new HashMap<>();
-        tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd");
-        tableOptions.put("partition.fields.hour.date-formatter", "HH");
-
+    @ParameterizedTest(name = "{index}: {0}")
+    @MethodSource("testData")
+    void testGetPeriodRefreshPartition(TestSpec testSpec) {
         ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog", 
"database", "table");
 
-        Map<String, String> actualRefreshPartition =
-                MaterializedTableManager.getPeriodRefreshPartition(
-                        schedulerTime, objectIdentifier, tableOptions, 
ZoneId.systemDefault());
-        Map<String, String> expectedRefreshPartition = new HashMap<>();
-        expectedRefreshPartition.put("day", "2024-01-01");
-        expectedRefreshPartition.put("hour", "00");
+        if (testSpec.errorMessage == null) {
+            Map<String, String> actualRefreshPartition =
+                    MaterializedTableManager.getPeriodRefreshPartition(
+                            testSpec.schedulerTime,
+                            testSpec.freshness,
+                            objectIdentifier,
+                            testSpec.tableOptions,
+                            ZoneId.systemDefault());
 
-        assertThat(actualRefreshPartition).isEqualTo(expectedRefreshPartition);
+            
assertThat(actualRefreshPartition).isEqualTo(testSpec.expectedRefreshPartition);
+        } else {
+            assertThatThrownBy(
+                            () ->
+                                    
MaterializedTableManager.getPeriodRefreshPartition(
+                                            testSpec.schedulerTime,
+                                            testSpec.freshness,
+                                            objectIdentifier,
+                                            testSpec.tableOptions,
+                                            ZoneId.systemDefault()))
+                    .hasMessage(testSpec.errorMessage);
+        }
     }
 
-    @Test
-    void testGetPeriodRefreshPartitionWithInvalidSchedulerTime() {
-        // scheduler time is null
-        Map<String, String> tableOptions = new HashMap<>();
-        tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd");
-        tableOptions.put("partition.fields.hour.date-formatter", "HH");
+    static Stream<TestSpec> testData() {
+        return Stream.of(
+                // The interval of freshness match the partition specified by 
the 'date-formatter'.
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofDay("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .expectedRefreshPartition("day", "2023-12-31"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-02 00:00:00")
+                        .freshness(IntervalFreshness.ofDay("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .expectedRefreshPartition("day", "2024-01-01"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-02 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2024-01-01")
+                        .expectedRefreshPartition("hour", "23"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-02 01:00:00")
+                        .freshness(IntervalFreshness.ofHour("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2024-01-02")
+                        .expectedRefreshPartition("hour", "00"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("2"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "22"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("4"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "20"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("8"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "16"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("12"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "12"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "59"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("2"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "58"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("4"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "56"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("5"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "55"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("6"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "54"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("10"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "50"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("12"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "48"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("15"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "45"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("30"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "30"),

Review Comment:
   Please add a current day test case:
   ```
                   TestSpec.create()
                           .schedulerTime("2024-01-01 00:30:00")
                           .freshness(IntervalFreshness.ofMinute("30"))
                           .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
                           
.tableOptions("partition.fields.hour.date-formatter", "HH")
                           
.tableOptions("partition.fields.minute.date-formatter", "mm")
                           .expectedRefreshPartition("day", "2024-01-01")
                           .expectedRefreshPartition("hour", "00")
                           .expectedRefreshPartition("minute", "00"),
   ```



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java:
##########
@@ -99,56 +103,340 @@ void testGenerateInsertStatementWithDynamicOptions() {
         assertThat(actualStatement).isEqualTo(expectedStatement);
     }
 
-    @Test
-    void testGetPeriodRefreshPartition() {
-        String schedulerTime = "2024-01-01 00:00:00";
-        Map<String, String> tableOptions = new HashMap<>();
-        tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd");
-        tableOptions.put("partition.fields.hour.date-formatter", "HH");
-
+    @ParameterizedTest(name = "{index}: {0}")
+    @MethodSource("testData")
+    void testGetPeriodRefreshPartition(TestSpec testSpec) {
         ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog", 
"database", "table");
 
-        Map<String, String> actualRefreshPartition =
-                MaterializedTableManager.getPeriodRefreshPartition(
-                        schedulerTime, objectIdentifier, tableOptions, 
ZoneId.systemDefault());
-        Map<String, String> expectedRefreshPartition = new HashMap<>();
-        expectedRefreshPartition.put("day", "2024-01-01");
-        expectedRefreshPartition.put("hour", "00");
+        if (testSpec.errorMessage == null) {
+            Map<String, String> actualRefreshPartition =
+                    MaterializedTableManager.getPeriodRefreshPartition(
+                            testSpec.schedulerTime,
+                            testSpec.freshness,
+                            objectIdentifier,
+                            testSpec.tableOptions,
+                            ZoneId.systemDefault());
 
-        assertThat(actualRefreshPartition).isEqualTo(expectedRefreshPartition);
+            
assertThat(actualRefreshPartition).isEqualTo(testSpec.expectedRefreshPartition);
+        } else {
+            assertThatThrownBy(
+                            () ->
+                                    
MaterializedTableManager.getPeriodRefreshPartition(
+                                            testSpec.schedulerTime,
+                                            testSpec.freshness,
+                                            objectIdentifier,
+                                            testSpec.tableOptions,
+                                            ZoneId.systemDefault()))
+                    .hasMessage(testSpec.errorMessage);
+        }
     }
 
-    @Test
-    void testGetPeriodRefreshPartitionWithInvalidSchedulerTime() {
-        // scheduler time is null
-        Map<String, String> tableOptions = new HashMap<>();
-        tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd");
-        tableOptions.put("partition.fields.hour.date-formatter", "HH");
+    static Stream<TestSpec> testData() {
+        return Stream.of(
+                // The interval of freshness match the partition specified by 
the 'date-formatter'.
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofDay("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .expectedRefreshPartition("day", "2023-12-31"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-02 00:00:00")
+                        .freshness(IntervalFreshness.ofDay("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .expectedRefreshPartition("day", "2024-01-01"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-02 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2024-01-01")
+                        .expectedRefreshPartition("hour", "23"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-02 01:00:00")
+                        .freshness(IntervalFreshness.ofHour("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2024-01-02")
+                        .expectedRefreshPartition("hour", "00"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("2"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "22"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("4"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "20"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("8"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "16"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("12"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "12"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "59"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("2"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "58"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("4"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "56"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("5"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "55"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("6"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "54"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("10"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "50"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("12"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "48"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("15"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "45"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("30"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "30"),
 
-        ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog", 
"database", "table");
+                // The interval of freshness is larger than the partition 
specified by the
+                // 'date-formatter'.
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofDay("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "00"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofDay("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "00")
+                        .expectedRefreshPartition("minute", "00"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        
.tableOptions("partition.fields.minute.date-formatter", "mm")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23")
+                        .expectedRefreshPartition("minute", "00"),
+                // The interval of freshness is less than the partition 
specified by the
+                // 'date-formatter'.
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .expectedRefreshPartition("day", "2023-12-31"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 01:00:00")
+                        .freshness(IntervalFreshness.ofHour("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .expectedRefreshPartition("day", "2024-01-01"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("2"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .expectedRefreshPartition("day", "2023-12-31"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 02:00:00")
+                        .freshness(IntervalFreshness.ofHour("2"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .expectedRefreshPartition("day", "2024-01-01"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofHour("4"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .expectedRefreshPartition("day", "2023-12-31"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 04:00:00")
+                        .freshness(IntervalFreshness.ofHour("4"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .expectedRefreshPartition("day", "2024-01-01"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("2"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("4"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("15"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .expectedRefreshPartition("day", "2023-12-31")
+                        .expectedRefreshPartition("hour", "23"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:00:00")
+                        .freshness(IntervalFreshness.ofMinute("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .expectedRefreshPartition("day", "2023-12-31"),
+                TestSpec.create()
+                        .schedulerTime("2024-01-01 00:01:00")
+                        .freshness(IntervalFreshness.ofMinute("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .expectedRefreshPartition("day", "2024-01-01"),
+
+                // Invalid test case.
+                TestSpec.create()
+                        .schedulerTime(null)
+                        .freshness(IntervalFreshness.ofDay("1"))
+                        .tableOptions("partition.fields.day.date-formatter", 
"yyyy-MM-dd")
+                        .tableOptions("partition.fields.hour.date-formatter", 
"HH")
+                        .errorMessage(
+                                "Scheduler time not properly set for periodic 
refresh of materialized table `catalog`.`database`.`table`."),

Review Comment:
   I think this error message is not clear for use, we should emphasize the 
scheduler time is null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to