lsyldliu commented on code in PR #25108: URL: https://github.com/apache/flink/pull/25108#discussion_r1685891811
########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -1520,6 +1520,58 @@ void testRefreshMaterializedTableWithInvalidParameterInContinuousMode() throws E .asSerializableString())); } + @Test + void testMaterializedTableDefinitionQueryContainsTemporaryResources() throws Exception { Review Comment: This is an unrelated test with your fix? ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -1417,7 +1417,7 @@ void testPeriodicRefreshMaterializedTableWithPartitionOptions() throws Exception sessionHandle, materializedTableIdentifier.asSerializableString(), true, - "2024-01-02 00:00:00", + "2024-01-03 00:00:00", Review Comment: I found this test should be optimized as follow: ``` List<Row> data = new ArrayList<>(); // create materialized table with partition formatter createAndVerifyCreateMaterializedTableWithData( "my_materialized_table", data, Collections.singletonMap("ds", "yyyy-MM-dd"), RefreshMode.FULL); ObjectIdentifier materializedTableIdentifier = ObjectIdentifier.of( fileSystemCatalogName, TEST_DEFAULT_DATABASE, "my_materialized_table"); // add data to all data list data.add(Row.of(1L, 1L, 1L, "2024-01-01")); data.add(Row.of(2L, 2L, 2L, "2024-01-02")); data.add(Row.of(4L, 4L, 4L, "2024-01-02")); // refresh the materialized table with period schedule long startTime = System.currentTimeMillis(); OperationHandle periodRefreshTableHandle = service.refreshMaterializedTable( sessionHandle, materializedTableIdentifier.asSerializableString(), true, "2024-01-03 00:00:00", Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); ``` Only then periodic refresh is matched with full mode, not continuous mode. ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java: ########## @@ -99,56 +103,340 @@ void testGenerateInsertStatementWithDynamicOptions() { assertThat(actualStatement).isEqualTo(expectedStatement); } - @Test - void testGetPeriodRefreshPartition() { - String schedulerTime = "2024-01-01 00:00:00"; - Map<String, String> tableOptions = new HashMap<>(); - tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd"); - tableOptions.put("partition.fields.hour.date-formatter", "HH"); - + @ParameterizedTest(name = "{index}: {0}") + @MethodSource("testData") + void testGetPeriodRefreshPartition(TestSpec testSpec) { ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog", "database", "table"); - Map<String, String> actualRefreshPartition = - MaterializedTableManager.getPeriodRefreshPartition( - schedulerTime, objectIdentifier, tableOptions, ZoneId.systemDefault()); - Map<String, String> expectedRefreshPartition = new HashMap<>(); - expectedRefreshPartition.put("day", "2024-01-01"); - expectedRefreshPartition.put("hour", "00"); + if (testSpec.errorMessage == null) { + Map<String, String> actualRefreshPartition = + MaterializedTableManager.getPeriodRefreshPartition( + testSpec.schedulerTime, + testSpec.freshness, + objectIdentifier, + testSpec.tableOptions, + ZoneId.systemDefault()); - assertThat(actualRefreshPartition).isEqualTo(expectedRefreshPartition); + assertThat(actualRefreshPartition).isEqualTo(testSpec.expectedRefreshPartition); + } else { + assertThatThrownBy( + () -> + MaterializedTableManager.getPeriodRefreshPartition( + testSpec.schedulerTime, + testSpec.freshness, + objectIdentifier, + testSpec.tableOptions, + ZoneId.systemDefault())) + .hasMessage(testSpec.errorMessage); + } } - @Test - void testGetPeriodRefreshPartitionWithInvalidSchedulerTime() { - // scheduler time is null - Map<String, String> tableOptions = new HashMap<>(); - tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd"); - tableOptions.put("partition.fields.hour.date-formatter", "HH"); + static Stream<TestSpec> testData() { + return Stream.of( + // The interval of freshness match the partition specified by the 'date-formatter'. + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofDay("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2023-12-31"), + TestSpec.create() + .schedulerTime("2024-01-02 00:00:00") + .freshness(IntervalFreshness.ofDay("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2024-01-01"), + TestSpec.create() + .schedulerTime("2024-01-02 00:00:00") + .freshness(IntervalFreshness.ofHour("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2024-01-01") + .expectedRefreshPartition("hour", "23"), + TestSpec.create() + .schedulerTime("2024-01-02 01:00:00") + .freshness(IntervalFreshness.ofHour("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2024-01-02") + .expectedRefreshPartition("hour", "00"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("2")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "22"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("4")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "20"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("8")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "16"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("12")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "12"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "59"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("2")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "58"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("4")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "56"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("5")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "55"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("6")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "54"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("10")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "50"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("12")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "48"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("15")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "45"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("30")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "30"), - ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog", "database", "table"); + // The interval of freshness is larger than the partition specified by the + // 'date-formatter'. + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofDay("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "00"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofDay("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "00") + .expectedRefreshPartition("minute", "00"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "00"), Review Comment: Add test case: ``` TestSpec.create() .schedulerTime("2024-01-01 01:00:00") .freshness(IntervalFreshness.ofHour("1")) .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") .tableOptions("partition.fields.hour.date-formatter", "HH") .tableOptions("partition.fields.minute.date-formatter", "mm") .expectedRefreshPartition("day", "2024-01-01") .expectedRefreshPartition("hour", "00") .expectedRefreshPartition("minute", "00"), ``` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java: ########## @@ -735,12 +736,26 @@ public static String formatTimestampMillis(long ts, String format, TimeZone tz) public static String formatTimestampString( String dateStr, String fromFormat, String toFormat, TimeZone tz) { + return formatTimestampStringWithOffset(dateStr, fromFormat, toFormat, tz, 0); + } + + public static String formatTimestampStringWithOffset( + String dateStr, String fromFormat, String toFormat, TimeZone tz, long offsetMills) { SimpleDateFormat fromFormatter = FORMATTER_CACHE.get(fromFormat); fromFormatter.setTimeZone(tz); SimpleDateFormat toFormatter = FORMATTER_CACHE.get(toFormat); toFormatter.setTimeZone(tz); try { - return toFormatter.format(fromFormatter.parse(dateStr)); + Date date = fromFormatter.parse(dateStr); Review Comment: Can we change the code as follows: ``` Date date = fromFormatter.parse(dateStr); if (offsetMills != 0) { date = new Date(date.getTime() + offsetMills); } ``` ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java: ########## @@ -99,56 +103,340 @@ void testGenerateInsertStatementWithDynamicOptions() { assertThat(actualStatement).isEqualTo(expectedStatement); } - @Test - void testGetPeriodRefreshPartition() { - String schedulerTime = "2024-01-01 00:00:00"; - Map<String, String> tableOptions = new HashMap<>(); - tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd"); - tableOptions.put("partition.fields.hour.date-formatter", "HH"); - + @ParameterizedTest(name = "{index}: {0}") + @MethodSource("testData") + void testGetPeriodRefreshPartition(TestSpec testSpec) { ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog", "database", "table"); - Map<String, String> actualRefreshPartition = - MaterializedTableManager.getPeriodRefreshPartition( - schedulerTime, objectIdentifier, tableOptions, ZoneId.systemDefault()); - Map<String, String> expectedRefreshPartition = new HashMap<>(); - expectedRefreshPartition.put("day", "2024-01-01"); - expectedRefreshPartition.put("hour", "00"); + if (testSpec.errorMessage == null) { + Map<String, String> actualRefreshPartition = + MaterializedTableManager.getPeriodRefreshPartition( + testSpec.schedulerTime, + testSpec.freshness, + objectIdentifier, + testSpec.tableOptions, + ZoneId.systemDefault()); - assertThat(actualRefreshPartition).isEqualTo(expectedRefreshPartition); + assertThat(actualRefreshPartition).isEqualTo(testSpec.expectedRefreshPartition); + } else { + assertThatThrownBy( + () -> + MaterializedTableManager.getPeriodRefreshPartition( + testSpec.schedulerTime, + testSpec.freshness, + objectIdentifier, + testSpec.tableOptions, + ZoneId.systemDefault())) + .hasMessage(testSpec.errorMessage); + } } - @Test - void testGetPeriodRefreshPartitionWithInvalidSchedulerTime() { - // scheduler time is null - Map<String, String> tableOptions = new HashMap<>(); - tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd"); - tableOptions.put("partition.fields.hour.date-formatter", "HH"); + static Stream<TestSpec> testData() { + return Stream.of( + // The interval of freshness match the partition specified by the 'date-formatter'. + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofDay("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2023-12-31"), + TestSpec.create() + .schedulerTime("2024-01-02 00:00:00") + .freshness(IntervalFreshness.ofDay("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2024-01-01"), + TestSpec.create() + .schedulerTime("2024-01-02 00:00:00") + .freshness(IntervalFreshness.ofHour("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2024-01-01") + .expectedRefreshPartition("hour", "23"), + TestSpec.create() + .schedulerTime("2024-01-02 01:00:00") + .freshness(IntervalFreshness.ofHour("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2024-01-02") + .expectedRefreshPartition("hour", "00"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("2")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "22"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("4")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "20"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("8")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "16"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("12")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "12"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "59"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("2")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "58"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("4")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "56"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("5")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "55"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("6")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "54"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("10")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "50"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("12")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "48"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("15")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "45"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("30")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "30"), Review Comment: Please add a current day test case: ``` TestSpec.create() .schedulerTime("2024-01-01 00:30:00") .freshness(IntervalFreshness.ofMinute("30")) .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") .tableOptions("partition.fields.hour.date-formatter", "HH") .tableOptions("partition.fields.minute.date-formatter", "mm") .expectedRefreshPartition("day", "2024-01-01") .expectedRefreshPartition("hour", "00") .expectedRefreshPartition("minute", "00"), ``` ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java: ########## @@ -99,56 +103,340 @@ void testGenerateInsertStatementWithDynamicOptions() { assertThat(actualStatement).isEqualTo(expectedStatement); } - @Test - void testGetPeriodRefreshPartition() { - String schedulerTime = "2024-01-01 00:00:00"; - Map<String, String> tableOptions = new HashMap<>(); - tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd"); - tableOptions.put("partition.fields.hour.date-formatter", "HH"); - + @ParameterizedTest(name = "{index}: {0}") + @MethodSource("testData") + void testGetPeriodRefreshPartition(TestSpec testSpec) { ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog", "database", "table"); - Map<String, String> actualRefreshPartition = - MaterializedTableManager.getPeriodRefreshPartition( - schedulerTime, objectIdentifier, tableOptions, ZoneId.systemDefault()); - Map<String, String> expectedRefreshPartition = new HashMap<>(); - expectedRefreshPartition.put("day", "2024-01-01"); - expectedRefreshPartition.put("hour", "00"); + if (testSpec.errorMessage == null) { + Map<String, String> actualRefreshPartition = + MaterializedTableManager.getPeriodRefreshPartition( + testSpec.schedulerTime, + testSpec.freshness, + objectIdentifier, + testSpec.tableOptions, + ZoneId.systemDefault()); - assertThat(actualRefreshPartition).isEqualTo(expectedRefreshPartition); + assertThat(actualRefreshPartition).isEqualTo(testSpec.expectedRefreshPartition); + } else { + assertThatThrownBy( + () -> + MaterializedTableManager.getPeriodRefreshPartition( + testSpec.schedulerTime, + testSpec.freshness, + objectIdentifier, + testSpec.tableOptions, + ZoneId.systemDefault())) + .hasMessage(testSpec.errorMessage); + } } - @Test - void testGetPeriodRefreshPartitionWithInvalidSchedulerTime() { - // scheduler time is null - Map<String, String> tableOptions = new HashMap<>(); - tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd"); - tableOptions.put("partition.fields.hour.date-formatter", "HH"); + static Stream<TestSpec> testData() { + return Stream.of( + // The interval of freshness match the partition specified by the 'date-formatter'. + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofDay("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2023-12-31"), + TestSpec.create() + .schedulerTime("2024-01-02 00:00:00") + .freshness(IntervalFreshness.ofDay("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2024-01-01"), + TestSpec.create() + .schedulerTime("2024-01-02 00:00:00") + .freshness(IntervalFreshness.ofHour("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2024-01-01") + .expectedRefreshPartition("hour", "23"), + TestSpec.create() + .schedulerTime("2024-01-02 01:00:00") + .freshness(IntervalFreshness.ofHour("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2024-01-02") + .expectedRefreshPartition("hour", "00"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("2")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "22"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("4")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "20"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("8")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "16"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("12")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "12"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "59"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("2")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "58"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("4")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "56"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("5")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "55"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("6")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "54"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("10")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "50"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("12")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "48"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("15")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "45"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("30")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "30"), - ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog", "database", "table"); + // The interval of freshness is larger than the partition specified by the + // 'date-formatter'. + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofDay("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "00"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofDay("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "00") + .expectedRefreshPartition("minute", "00"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "00"), + // The interval of freshness is less than the partition specified by the + // 'date-formatter'. + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2023-12-31"), + TestSpec.create() + .schedulerTime("2024-01-01 01:00:00") + .freshness(IntervalFreshness.ofHour("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2024-01-01"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("2")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2023-12-31"), + TestSpec.create() + .schedulerTime("2024-01-01 02:00:00") + .freshness(IntervalFreshness.ofHour("2")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2024-01-01"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("4")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2023-12-31"), + TestSpec.create() + .schedulerTime("2024-01-01 04:00:00") + .freshness(IntervalFreshness.ofHour("4")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2024-01-01"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("2")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("4")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("15")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2023-12-31"), + TestSpec.create() + .schedulerTime("2024-01-01 00:01:00") + .freshness(IntervalFreshness.ofMinute("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2024-01-01"), + + // Invalid test case. + TestSpec.create() + .schedulerTime(null) + .freshness(IntervalFreshness.ofDay("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .errorMessage( + "Scheduler time not properly set for periodic refresh of materialized table `catalog`.`database`.`table`."), Review Comment: I think this error message is not clear for use, we should emphasize the scheduler time is null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org