yuxiqian commented on code in PR #3819: URL: https://github.com/apache/flink-cdc/pull/3819#discussion_r1908120223
########## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java: ########## @@ -780,6 +814,320 @@ void testTimestampTransform() throws Exception { transformFunctionEventEventOperatorTestHarness.close(); } + @Test + void testFromUnixTimeTransform() throws Exception { + // In UTC, from_unix_time(0s) ==> 1970-01-01 00:00:00 + testFromUnixTimeTransformWithTimeZone("UTC", 0L, "1970-01-01 00:00:00"); + // In UTC, from_unix_time(44s) ==> 1970-01-01 00:00:44 + testFromUnixTimeTransformWithTimeZone("UTC", 44L, "1970-01-01 00:00:44"); + // In Berlin, the time zone is +1:00, from_unix_time(44s) ==> 1970-01-01 01:00:44 + testFromUnixTimeTransformWithTimeZone("Europe/Berlin", 44L, "1970-01-01 01:00:44"); + // In Shanghai, the time zone is +8:00, from_unix_time(44s) ==> 1970-01-01 08:00:44 + testFromUnixTimeTransformWithTimeZone("Asia/Shanghai", 44L, "1970-01-01 08:00:44"); + } + + private void testFromUnixTimeTransformWithTimeZone( + String timeZone, Long seconds, String unixTimeStr) throws Exception { + PostTransformOperator transform = + PostTransformOperator.newBuilder() + .addTransform( + FROM_UNIX_TIME_TABLEID.identifier(), + "col1, FROM_UNIXTIME(seconds) as from_unix_time," + + " FROM_UNIXTIME(seconds, format_str) as from_unix_time_format", + null) + .addTimezone(timeZone) + .build(); + RegularEventOperatorTestHarness<PostTransformOperator, Event> + transformFunctionEventEventOperatorTestHarness = + RegularEventOperatorTestHarness.with(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = + new CreateTableEvent(FROM_UNIX_TIME_TABLEID, FROM_UNIX_TIME_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) FROM_UNIX_TIME_SCHEMA.toRowDataType())); + BinaryRecordDataGenerator expectedRecordDataGenerator = + new BinaryRecordDataGenerator( + ((RowType) EXPECTED_FROM_UNIX_TIME_SCHEMA.toRowDataType())); + DataChangeEvent insertEvent = + DataChangeEvent.insertEvent( + FROM_UNIX_TIME_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + seconds, + new BinaryStringData("yyyy-MM-dd HH:mm:ss") + })); + DataChangeEvent insertEventExpect = + DataChangeEvent.insertEvent( + FROM_UNIX_TIME_TABLEID, + expectedRecordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + new BinaryStringData(unixTimeStr), + new BinaryStringData(unixTimeStr) + })); + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent( + FROM_UNIX_TIME_TABLEID, EXPECTED_FROM_UNIX_TIME_SCHEMA))); + transform.processElement(new StreamRecord<>(insertEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect)); + transformFunctionEventEventOperatorTestHarness.close(); + } + + /* + Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), + using the specified timezone in table config. + + If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, + this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, + the default value Long.MIN_VALUE(-9223372036854775808) will be returned. + */ + @Test + void testUnixTimestampTransformInBerlin() throws Exception { + PostTransformOperator transform = + PostTransformOperator.newBuilder() + .addTransform( + UNIX_TIMESTAMP_TABLEID.identifier(), + "col1," + + " UNIX_TIMESTAMP(date_time_str) as unix_timestamp," + + " UNIX_TIMESTAMP(date_time_str, unix_timestamp_format) as unix_timestamp_format", + null) + .addTimezone("Europe/Berlin") + .build(); + RegularEventOperatorTestHarness<PostTransformOperator, Event> + transformFunctionEventEventOperatorTestHarness = + RegularEventOperatorTestHarness.with(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = + new CreateTableEvent(UNIX_TIMESTAMP_TABLEID, UNIX_TIMESTAMP_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) UNIX_TIMESTAMP_SCHEMA.toRowDataType())); + BinaryRecordDataGenerator expectedRecordDataGenerator = + new BinaryRecordDataGenerator( + ((RowType) EXPECTED_UNIX_TIMESTAMP_SCHEMA.toRowDataType())); + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent( + UNIX_TIMESTAMP_TABLEID, EXPECTED_UNIX_TIMESTAMP_SCHEMA))); + + // In Berlin, "1970-01-01 08:00:01.001" formated by "yyyy-MM-dd HH:mm:ss.SSS" ==> 25201L Review Comment: nit: formated -> formatted ########## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java: ########## @@ -780,6 +814,320 @@ void testTimestampTransform() throws Exception { transformFunctionEventEventOperatorTestHarness.close(); } + @Test + void testFromUnixTimeTransform() throws Exception { + // In UTC, from_unix_time(0s) ==> 1970-01-01 00:00:00 + testFromUnixTimeTransformWithTimeZone("UTC", 0L, "1970-01-01 00:00:00"); + // In UTC, from_unix_time(44s) ==> 1970-01-01 00:00:44 + testFromUnixTimeTransformWithTimeZone("UTC", 44L, "1970-01-01 00:00:44"); + // In Berlin, the time zone is +1:00, from_unix_time(44s) ==> 1970-01-01 01:00:44 + testFromUnixTimeTransformWithTimeZone("Europe/Berlin", 44L, "1970-01-01 01:00:44"); + // In Shanghai, the time zone is +8:00, from_unix_time(44s) ==> 1970-01-01 08:00:44 + testFromUnixTimeTransformWithTimeZone("Asia/Shanghai", 44L, "1970-01-01 08:00:44"); + } + + private void testFromUnixTimeTransformWithTimeZone( + String timeZone, Long seconds, String unixTimeStr) throws Exception { + PostTransformOperator transform = + PostTransformOperator.newBuilder() + .addTransform( + FROM_UNIX_TIME_TABLEID.identifier(), + "col1, FROM_UNIXTIME(seconds) as from_unix_time," + + " FROM_UNIXTIME(seconds, format_str) as from_unix_time_format", + null) + .addTimezone(timeZone) + .build(); + RegularEventOperatorTestHarness<PostTransformOperator, Event> + transformFunctionEventEventOperatorTestHarness = + RegularEventOperatorTestHarness.with(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = + new CreateTableEvent(FROM_UNIX_TIME_TABLEID, FROM_UNIX_TIME_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) FROM_UNIX_TIME_SCHEMA.toRowDataType())); + BinaryRecordDataGenerator expectedRecordDataGenerator = + new BinaryRecordDataGenerator( + ((RowType) EXPECTED_FROM_UNIX_TIME_SCHEMA.toRowDataType())); + DataChangeEvent insertEvent = + DataChangeEvent.insertEvent( + FROM_UNIX_TIME_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + seconds, + new BinaryStringData("yyyy-MM-dd HH:mm:ss") + })); + DataChangeEvent insertEventExpect = + DataChangeEvent.insertEvent( + FROM_UNIX_TIME_TABLEID, + expectedRecordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + new BinaryStringData(unixTimeStr), + new BinaryStringData(unixTimeStr) + })); + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent( + FROM_UNIX_TIME_TABLEID, EXPECTED_FROM_UNIX_TIME_SCHEMA))); + transform.processElement(new StreamRecord<>(insertEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect)); + transformFunctionEventEventOperatorTestHarness.close(); + } + + /* + Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), + using the specified timezone in table config. + + If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, + this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, + the default value Long.MIN_VALUE(-9223372036854775808) will be returned. + */ + @Test + void testUnixTimestampTransformInBerlin() throws Exception { + PostTransformOperator transform = + PostTransformOperator.newBuilder() + .addTransform( + UNIX_TIMESTAMP_TABLEID.identifier(), + "col1," + + " UNIX_TIMESTAMP(date_time_str) as unix_timestamp," + + " UNIX_TIMESTAMP(date_time_str, unix_timestamp_format) as unix_timestamp_format", + null) + .addTimezone("Europe/Berlin") + .build(); + RegularEventOperatorTestHarness<PostTransformOperator, Event> + transformFunctionEventEventOperatorTestHarness = + RegularEventOperatorTestHarness.with(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = + new CreateTableEvent(UNIX_TIMESTAMP_TABLEID, UNIX_TIMESTAMP_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) UNIX_TIMESTAMP_SCHEMA.toRowDataType())); + BinaryRecordDataGenerator expectedRecordDataGenerator = + new BinaryRecordDataGenerator( + ((RowType) EXPECTED_UNIX_TIMESTAMP_SCHEMA.toRowDataType())); + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent( + UNIX_TIMESTAMP_TABLEID, EXPECTED_UNIX_TIMESTAMP_SCHEMA))); + + // In Berlin, "1970-01-01 08:00:01.001" formated by "yyyy-MM-dd HH:mm:ss.SSS" ==> 25201L + DataChangeEvent insertEvent1 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + new BinaryStringData("1970-01-01 08:00:01.001"), + new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS") + })); + DataChangeEvent insertEventExpect1 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + expectedRecordDataGenerator.generate( + new Object[] {new BinaryStringData("1"), 25201L, 25201L})); + transform.processElement(new StreamRecord<>(insertEvent1)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect1)); + + // In Berlin, "1970-01-01 08:00:01.001 +0800" formated by "yyyy-MM-dd HH:mm:ss.SSS X" ==> 1L + DataChangeEvent insertEvent2 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("2"), + new BinaryStringData("1970-01-01 08:00:01.001 +0800"), + new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X") + })); + DataChangeEvent insertEventExpect2 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + expectedRecordDataGenerator.generate( + new Object[] {new BinaryStringData("2"), 25201L, 1L})); + transform.processElement(new StreamRecord<>(insertEvent2)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect2)); + + // In Berlin, "1970-01-01 08:00:01.001 +0800" formated by "yyyy-MM-dd HH:mm:ss.SSS" ==> + // 25201L + DataChangeEvent insertEvent3 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("3"), + new BinaryStringData("1970-01-01 08:00:01.001 +0800"), + new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS") + })); + DataChangeEvent insertEventExpect3 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + expectedRecordDataGenerator.generate( + new Object[] {new BinaryStringData("3"), 25201L, 25201L})); + transform.processElement(new StreamRecord<>(insertEvent3)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect3)); + + // In Berlin, "1970-01-01 08:00:01.001" formated by "yyyy-MM-dd HH:mm:ss.SSS X" ==> + // -9223372036854775808L + DataChangeEvent insertEvent4 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("4"), + new BinaryStringData("1970-01-01 08:00:01.001"), + new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X") + })); + DataChangeEvent insertEventExpect4 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + expectedRecordDataGenerator.generate( + new Object[] { + new BinaryStringData("4"), 25201L, -9223372036854775808L + })); + transform.processElement(new StreamRecord<>(insertEvent4)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect4)); + transformFunctionEventEventOperatorTestHarness.close(); + } + + @Test + void testUnixTimestampTransformInShanghai() throws Exception { + PostTransformOperator transform = + PostTransformOperator.newBuilder() + .addTransform( + UNIX_TIMESTAMP_TABLEID.identifier(), + "col1," + + " UNIX_TIMESTAMP(date_time_str) as unix_timestamp," + + " UNIX_TIMESTAMP(date_time_str, unix_timestamp_format) as unix_timestamp_format", + null) + .addTimezone("Asia/Shanghai") + .build(); + RegularEventOperatorTestHarness<PostTransformOperator, Event> + transformFunctionEventEventOperatorTestHarness = + RegularEventOperatorTestHarness.with(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = + new CreateTableEvent(UNIX_TIMESTAMP_TABLEID, UNIX_TIMESTAMP_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) UNIX_TIMESTAMP_SCHEMA.toRowDataType())); + BinaryRecordDataGenerator expectedRecordDataGenerator = + new BinaryRecordDataGenerator( + ((RowType) EXPECTED_UNIX_TIMESTAMP_SCHEMA.toRowDataType())); + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent( + UNIX_TIMESTAMP_TABLEID, EXPECTED_UNIX_TIMESTAMP_SCHEMA))); + + // In Shanghai, "1970-01-01 08:00:01.001" formated by "yyyy-MM-dd HH:mm:ss.SSS" ==> 1L + DataChangeEvent insertEvent1 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + new BinaryStringData("1970-01-01 08:00:01.001"), + new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS") + })); + DataChangeEvent insertEventExpect1 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + expectedRecordDataGenerator.generate( + new Object[] {new BinaryStringData("1"), 1L, 1L})); + transform.processElement(new StreamRecord<>(insertEvent1)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect1)); + + // In Shanghai, "1970-01-01 08:00:01.001 +0800" formated by "yyyy-MM-dd HH:mm:ss.SSS X" ==> + // 1L Review Comment: Since Asia/Shanghai is equivalent to +08:00, maybe we can specify another time zone to make our test more robust? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org