yuxiqian commented on code in PR #3819: URL: https://github.com/apache/flink-cdc/pull/3819#discussion_r1900670113
########## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java: ########## @@ -128,6 +138,61 @@ private static int ymdToJulian(int year, int month, int day) { return day + (153 * m + 2) / 5 + 365 * y + y / 4 - y / 100 + y / 400 - 32045; } + // -------------------------------------------------------------------------------------------- + // UNIX TIME + // -------------------------------------------------------------------------------------------- + + /** + * Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the + * "yyyy-MM-dd HH:mm:ss" format. + */ + public static String formatUnixTimestamp(long unixtime, TimeZone tz) { + return formatUnixTimestamp(unixtime, TIMESTAMP_FORMAT_STRING, tz); + } + + /** + * Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the + * given format. + */ + public static String formatUnixTimestamp(long unixtime, String format, TimeZone tz) { + SimpleDateFormat formatter = FORMATTER_CACHE.get(format); + formatter.setTimeZone(tz); + Date date = new Date(unixtime * 1000); + try { + return formatter.format(date); + } catch (Exception e) { + LOG.error("Exception when formatting.", e); + return null; + } + } + + /** Returns the value of the timestamp to seconds since '1970-01-01 00:00:00' UTC. */ + public static long unixTimestamp(long ts) { + return ts / 1000; + } Review Comment: This method seems meaningless and undocumented. ########## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java: ########## @@ -128,6 +138,61 @@ private static int ymdToJulian(int year, int month, int day) { return day + (153 * m + 2) / 5 + 365 * y + y / 4 - y / 100 + y / 400 - 32045; } + // -------------------------------------------------------------------------------------------- + // UNIX TIME + // -------------------------------------------------------------------------------------------- + + /** + * Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the + * "yyyy-MM-dd HH:mm:ss" format. + */ + public static String formatUnixTimestamp(long unixtime, TimeZone tz) { + return formatUnixTimestamp(unixtime, TIMESTAMP_FORMAT_STRING, tz); + } + + /** + * Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the + * given format. + */ + public static String formatUnixTimestamp(long unixtime, String format, TimeZone tz) { Review Comment: Call it `unixTime` / `timeZone` in arguments. ########## docs/content.zh/docs/core-concept/transform.md: ########## @@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [ | TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. | | TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. | | TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. | +| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. | +| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. | +| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.| Review Comment: ```suggestion | UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config. If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.| ``` There should be a newline. ########## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java: ########## @@ -109,7 +119,7 @@ private static long internalParseTimestampMillis(String dateStr, String format, } catch (ParseException e) { LOG.error( String.format( - "Exception when parsing datetime string '%s' in format '%s'", + "Exception when parsing datetime string '%s' in format '%s', the default value Long.MIN_VALUE(-9223372036854775808) will be returned.", Review Comment: `SystemFunctionUtils` (and `DateTimeUtils`) are evaluated inside Janino engine. Will these error messages ever be printed out? ########## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java: ########## Review Comment: Consider adding cases in `TransformE2eITCase#testTemporalFunctions` ########## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java: ########## @@ -780,6 +795,76 @@ void testTimestampTransform() throws Exception { transformFunctionEventEventOperatorTestHarness.close(); } + @Test + void testUnixTimestampTransform() throws Exception { Review Comment: Please make this test parameterized in multiple timezones. See MySqlTimezoneITCase for an example. ########## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java: ########## @@ -252,9 +263,9 @@ public void testTranslateFilterToJaninoExpression() { "TO_DATE(dt, 'yyyy-MM-dd')", "toDate(dt, \"yyyy-MM-dd\", __time_zone__)"); testFilterExpression("TO_TIMESTAMP(dt)", "toTimestamp(dt, __time_zone__)"); testFilterExpression("TIMESTAMP_DIFF('DAY', dt1, dt2)", "timestampDiff(\"DAY\", dt1, dt2)"); - testFilterExpression("IF(a>b,a,b)", "a > b ? a : b"); - testFilterExpression("NULLIF(a,b)", "nullif(a, b)"); - testFilterExpression("COALESCE(a,b,c)", "coalesce(a, b, c)"); + testFilterExpression("IF(a>b, a, b)", "a > b ? a : b"); + testFilterExpression("NULLIF(a, b)", "nullif(a, b)"); + testFilterExpression("COALESCE(a, b, c)", "coalesce(a, b, c)"); Review Comment: Irrelevant change can be split into another PR. ########## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java: ########## @@ -780,6 +795,76 @@ void testTimestampTransform() throws Exception { transformFunctionEventEventOperatorTestHarness.close(); } + @Test + void testUnixTimestampTransform() throws Exception { + PostTransformOperator transform = + PostTransformOperator.newBuilder() + .addTransform( + UNIX_TIMESTAMP_TABLEID.identifier(), + "col1, FROM_UNIXTIME(44) as from_unixtime," Review Comment: One suit of test case may not be sufficient. Still suggest something like https://github.com/apache/flink-cdc/pull/3698#discussion_r1899936303 ########## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java: ########## @@ -780,6 +795,76 @@ void testTimestampTransform() throws Exception { transformFunctionEventEventOperatorTestHarness.close(); } + @Test + void testUnixTimestampTransform() throws Exception { + PostTransformOperator transform = + PostTransformOperator.newBuilder() + .addTransform( + UNIX_TIMESTAMP_TABLEID.identifier(), + "col1, FROM_UNIXTIME(44) as from_unixtime," + + " FROM_UNIXTIME(44, 'yyyy/MM/dd HH:mm:ss') as from_unixtime_format," + + " IF(UNIX_TIMESTAMP() = UNIX_TIMESTAMP(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss')), 1, 0) as current_unix_timestamp," + + " UNIX_TIMESTAMP('1970-01-01 08:00:01') as unix_timestamp," + + " UNIX_TIMESTAMP('1970-01-01 08:00:01.001 +0800', 'yyyy-MM-dd HH:mm:ss.SSS X') as unix_timestamp_format_tz," + + " UNIX_TIMESTAMP('1970-01-01 08:00:01.001 +0800', 'yyyy-MM-dd HH:mm:ss.SSS') as unix_timestamp_format," + + " UNIX_TIMESTAMP('1970-01-01 08:00:01.001', 'yyyy-MM-dd HH:mm:ss.SSS X') as unix_timestamp_format_error", + null) + // .addTimezone("UTC") Review Comment: Remove this ########## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java: ########## @@ -780,6 +795,76 @@ void testTimestampTransform() throws Exception { transformFunctionEventEventOperatorTestHarness.close(); } + @Test + void testUnixTimestampTransform() throws Exception { + PostTransformOperator transform = + PostTransformOperator.newBuilder() + .addTransform( + UNIX_TIMESTAMP_TABLEID.identifier(), + "col1, FROM_UNIXTIME(44) as from_unixtime," + + " FROM_UNIXTIME(44, 'yyyy/MM/dd HH:mm:ss') as from_unixtime_format," + + " IF(UNIX_TIMESTAMP() = UNIX_TIMESTAMP(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss')), 1, 0) as current_unix_timestamp," + + " UNIX_TIMESTAMP('1970-01-01 08:00:01') as unix_timestamp," + + " UNIX_TIMESTAMP('1970-01-01 08:00:01.001 +0800', 'yyyy-MM-dd HH:mm:ss.SSS X') as unix_timestamp_format_tz," + + " UNIX_TIMESTAMP('1970-01-01 08:00:01.001 +0800', 'yyyy-MM-dd HH:mm:ss.SSS') as unix_timestamp_format," + + " UNIX_TIMESTAMP('1970-01-01 08:00:01.001', 'yyyy-MM-dd HH:mm:ss.SSS X') as unix_timestamp_format_error", + null) + // .addTimezone("UTC") + .addTimezone("Europe/Berlin") + .build(); + RegularEventOperatorTestHarness<PostTransformOperator, Event> + transformFunctionEventEventOperatorTestHarness = + RegularEventOperatorTestHarness.with(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = + new CreateTableEvent(UNIX_TIMESTAMP_TABLEID, UNIX_TIMESTAMP_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) UNIX_TIMESTAMP_SCHEMA.toRowDataType())); + // Insert + DataChangeEvent insertEvent = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + null, + null, + null, + null, + null, + null, + null + })); Review Comment: I wonder what's the point to use the same schema in the source and sink? These fields are all `null` in sources anyway, and all test data are hard-encoded in transform rules... Please consider a clearer way to organize test cases in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org