yuxiqian commented on code in PR #3819:
URL: https://github.com/apache/flink-cdc/pull/3819#discussion_r1900670113


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java:
##########
@@ -128,6 +138,61 @@ private static int ymdToJulian(int year, int month, int 
day) {
         return day + (153 * m + 2) / 5 + 365 * y + y / 4 - y / 100 + y / 400 - 
32045;
     }
 
+    // 
--------------------------------------------------------------------------------------------
+    // UNIX TIME
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to 
datetime string in the
+     * "yyyy-MM-dd HH:mm:ss" format.
+     */
+    public static String formatUnixTimestamp(long unixtime, TimeZone tz) {
+        return formatUnixTimestamp(unixtime, TIMESTAMP_FORMAT_STRING, tz);
+    }
+
+    /**
+     * Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to 
datetime string in the
+     * given format.
+     */
+    public static String formatUnixTimestamp(long unixtime, String format, 
TimeZone tz) {
+        SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
+        formatter.setTimeZone(tz);
+        Date date = new Date(unixtime * 1000);
+        try {
+            return formatter.format(date);
+        } catch (Exception e) {
+            LOG.error("Exception when formatting.", e);
+            return null;
+        }
+    }
+
+    /** Returns the value of the timestamp to seconds since '1970-01-01 
00:00:00' UTC. */
+    public static long unixTimestamp(long ts) {
+        return ts / 1000;
+    }

Review Comment:
   This method seems meaningless and undocumented.



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java:
##########
@@ -128,6 +138,61 @@ private static int ymdToJulian(int year, int month, int 
day) {
         return day + (153 * m + 2) / 5 + 365 * y + y / 4 - y / 100 + y / 400 - 
32045;
     }
 
+    // 
--------------------------------------------------------------------------------------------
+    // UNIX TIME
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to 
datetime string in the
+     * "yyyy-MM-dd HH:mm:ss" format.
+     */
+    public static String formatUnixTimestamp(long unixtime, TimeZone tz) {
+        return formatUnixTimestamp(unixtime, TIMESTAMP_FORMAT_STRING, tz);
+    }
+
+    /**
+     * Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to 
datetime string in the
+     * given format.
+     */
+    public static String formatUnixTimestamp(long unixtime, String format, 
TimeZone tz) {

Review Comment:
   Call it `unixTime` / `timeZone` in arguments.



##########
docs/content.zh/docs/core-concept/transform.md:
##########
@@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to 
parse expressions and [
 | TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | 
timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) 
number of timepointunit between timepoint1 and timepoint2. The unit for the 
interval is given by the first argument, which should be one of the following 
values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
 | TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date 
string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
 | TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | 
Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd 
HH:mm:ss') to a timestamp, without time zone. |
+| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns 
a representation of the numeric argument as a value in string format (default 
is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing 
seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the 
UNIX_TIMESTAMP() function. The return value is expressed in the session time 
zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 
00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in 
‘Asia/Tokyo’ time zone. |
+| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. 
This function is not deterministic which means the value would be recalculated 
for each record. |
+| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | 
Converts a date time string string1 with format string2 (by default: yyyy-MM-dd 
HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified 
timezone in table config.If a time zone is specified in the date time string 
and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function 
will use the specified timezone in the date time string instead of the timezone 
in table config. If the date time string can not be parsed, the default value 
Long.MIN_VALUE(-9223372036854775808) will be returned.|

Review Comment:
   ```suggestion
   | UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | 
Converts a date time string string1 with format string2 (by default: yyyy-MM-dd 
HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified 
timezone in table config.
   
   If a time zone is specified in the date time string and parsed by UTC+X 
format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the 
specified timezone in the date time string instead of the timezone in table 
config. If the date time string can not be parsed, the default value 
Long.MIN_VALUE(-9223372036854775808) will be returned.|
   ```
   
   There should be a newline.



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java:
##########
@@ -109,7 +119,7 @@ private static long internalParseTimestampMillis(String 
dateStr, String format,
         } catch (ParseException e) {
             LOG.error(
                     String.format(
-                            "Exception when parsing datetime string '%s' in 
format '%s'",
+                            "Exception when parsing datetime string '%s' in 
format '%s', the default value Long.MIN_VALUE(-9223372036854775808) will be 
returned.",

Review Comment:
   `SystemFunctionUtils` (and `DateTimeUtils`) are evaluated inside Janino 
engine. Will these error messages ever be printed out?



##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java:
##########


Review Comment:
   Consider adding cases in `TransformE2eITCase#testTemporalFunctions`



##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java:
##########
@@ -780,6 +795,76 @@ void testTimestampTransform() throws Exception {
         transformFunctionEventEventOperatorTestHarness.close();
     }
 
+    @Test
+    void testUnixTimestampTransform() throws Exception {

Review Comment:
   Please make this test parameterized in multiple timezones.
   
   See MySqlTimezoneITCase for an example.



##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java:
##########
@@ -252,9 +263,9 @@ public void testTranslateFilterToJaninoExpression() {
                 "TO_DATE(dt, 'yyyy-MM-dd')", "toDate(dt, \"yyyy-MM-dd\", 
__time_zone__)");
         testFilterExpression("TO_TIMESTAMP(dt)", "toTimestamp(dt, 
__time_zone__)");
         testFilterExpression("TIMESTAMP_DIFF('DAY', dt1, dt2)", 
"timestampDiff(\"DAY\", dt1, dt2)");
-        testFilterExpression("IF(a>b,a,b)", "a > b ? a : b");
-        testFilterExpression("NULLIF(a,b)", "nullif(a, b)");
-        testFilterExpression("COALESCE(a,b,c)", "coalesce(a, b, c)");
+        testFilterExpression("IF(a>b, a, b)", "a > b ? a : b");
+        testFilterExpression("NULLIF(a, b)", "nullif(a, b)");
+        testFilterExpression("COALESCE(a, b, c)", "coalesce(a, b, c)");

Review Comment:
   Irrelevant change can be split into another PR.



##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java:
##########
@@ -780,6 +795,76 @@ void testTimestampTransform() throws Exception {
         transformFunctionEventEventOperatorTestHarness.close();
     }
 
+    @Test
+    void testUnixTimestampTransform() throws Exception {
+        PostTransformOperator transform =
+                PostTransformOperator.newBuilder()
+                        .addTransform(
+                                UNIX_TIMESTAMP_TABLEID.identifier(),
+                                "col1, FROM_UNIXTIME(44) as from_unixtime,"

Review Comment:
   One suit of test case may not be sufficient. Still suggest something like 
https://github.com/apache/flink-cdc/pull/3698#discussion_r1899936303



##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java:
##########
@@ -780,6 +795,76 @@ void testTimestampTransform() throws Exception {
         transformFunctionEventEventOperatorTestHarness.close();
     }
 
+    @Test
+    void testUnixTimestampTransform() throws Exception {
+        PostTransformOperator transform =
+                PostTransformOperator.newBuilder()
+                        .addTransform(
+                                UNIX_TIMESTAMP_TABLEID.identifier(),
+                                "col1, FROM_UNIXTIME(44) as from_unixtime,"
+                                        + " FROM_UNIXTIME(44, 'yyyy/MM/dd 
HH:mm:ss') as from_unixtime_format,"
+                                        + " IF(UNIX_TIMESTAMP() = 
UNIX_TIMESTAMP(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss')), 1, 0) as 
current_unix_timestamp,"
+                                        + " UNIX_TIMESTAMP('1970-01-01 
08:00:01') as unix_timestamp,"
+                                        + " UNIX_TIMESTAMP('1970-01-01 
08:00:01.001 +0800', 'yyyy-MM-dd HH:mm:ss.SSS X') as unix_timestamp_format_tz,"
+                                        + " UNIX_TIMESTAMP('1970-01-01 
08:00:01.001 +0800', 'yyyy-MM-dd HH:mm:ss.SSS') as unix_timestamp_format,"
+                                        + " UNIX_TIMESTAMP('1970-01-01 
08:00:01.001', 'yyyy-MM-dd HH:mm:ss.SSS X') as unix_timestamp_format_error",
+                                null)
+                        //                        .addTimezone("UTC")

Review Comment:
   Remove this



##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java:
##########
@@ -780,6 +795,76 @@ void testTimestampTransform() throws Exception {
         transformFunctionEventEventOperatorTestHarness.close();
     }
 
+    @Test
+    void testUnixTimestampTransform() throws Exception {
+        PostTransformOperator transform =
+                PostTransformOperator.newBuilder()
+                        .addTransform(
+                                UNIX_TIMESTAMP_TABLEID.identifier(),
+                                "col1, FROM_UNIXTIME(44) as from_unixtime,"
+                                        + " FROM_UNIXTIME(44, 'yyyy/MM/dd 
HH:mm:ss') as from_unixtime_format,"
+                                        + " IF(UNIX_TIMESTAMP() = 
UNIX_TIMESTAMP(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss')), 1, 0) as 
current_unix_timestamp,"
+                                        + " UNIX_TIMESTAMP('1970-01-01 
08:00:01') as unix_timestamp,"
+                                        + " UNIX_TIMESTAMP('1970-01-01 
08:00:01.001 +0800', 'yyyy-MM-dd HH:mm:ss.SSS X') as unix_timestamp_format_tz,"
+                                        + " UNIX_TIMESTAMP('1970-01-01 
08:00:01.001 +0800', 'yyyy-MM-dd HH:mm:ss.SSS') as unix_timestamp_format,"
+                                        + " UNIX_TIMESTAMP('1970-01-01 
08:00:01.001', 'yyyy-MM-dd HH:mm:ss.SSS X') as unix_timestamp_format_error",
+                                null)
+                        //                        .addTimezone("UTC")
+                        .addTimezone("Europe/Berlin")
+                        .build();
+        RegularEventOperatorTestHarness<PostTransformOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        RegularEventOperatorTestHarness.with(transform, 1);
+        // Initialization
+        transformFunctionEventEventOperatorTestHarness.open();
+        // Create table
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(UNIX_TIMESTAMP_TABLEID, 
UNIX_TIMESTAMP_SCHEMA);
+        BinaryRecordDataGenerator recordDataGenerator =
+                new BinaryRecordDataGenerator(((RowType) 
UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
+        // Insert
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        UNIX_TIMESTAMP_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("1"),
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null
+                                }));

Review Comment:
   I wonder what's the point to use the same schema in the source and sink? 
These fields are all `null` in sources anyway, and all test data are 
hard-encoded in transform rules... Please consider a clearer way to organize 
test cases in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to