wuchong commented on a change in pull request #15658: URL: https://github.com/apache/flink/pull/15658#discussion_r616333796
########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java ########## @@ -174,27 +209,264 @@ public static void printAsTableauForm( printWriter.flush(); } - public static String[] rowToString(Row row) { - return rowToString(row, NULL_COLUMN, false); + public static String[] rowToString( + Row row, ResolvedSchema resolvedSchema, ZoneId sessionTimeZone) { + return rowToString(row, NULL_COLUMN, false, resolvedSchema, sessionTimeZone); } - public static String[] rowToString(Row row, String nullColumn, boolean printRowKind) { + public static String[] rowToString( + Row row, + String nullColumn, + boolean printRowKind, + ResolvedSchema resolvedSchema, + ZoneId sessionTimeZone) { final int len = printRowKind ? row.getArity() + 1 : row.getArity(); final List<String> fields = new ArrayList<>(len); if (printRowKind) { fields.add(row.getKind().shortString()); } for (int i = 0; i < row.getArity(); i++) { final Object field = row.getField(i); + final LogicalType fieldType = + resolvedSchema.getColumnDataTypes().get(i).getLogicalType(); if (field == null) { fields.add(nullColumn); } else { - fields.add(StringUtils.arrayAwareToString(field)); + fields.add( + StringUtils.arrayAwareToString( + formattedTimestamp(field, fieldType, sessionTimeZone))); } } return fields.toArray(new String[0]); } + /** + * Normalizes field that contains TIMESTAMP and TIMESTAMP_LTZ type data. + * + * <p>This method also supports nested type ARRAY, ROW, MAP. + */ + private static Object formattedTimestamp( + Object field, LogicalType fieldType, ZoneId sessionTimeZone) { + final LogicalTypeRoot typeRoot = fieldType.getTypeRoot(); + if (field == null) { + return "null"; + } + switch (typeRoot) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return formatTimestampData(field, fieldType, sessionTimeZone); + case ARRAY: + LogicalType elementType = ((ArrayType) fieldType).getElementType(); + if (field instanceof List) { + List<?> array = (List<?>) field; + Object[] formattedArray = new Object[array.size()]; + for (int i = 0; i < array.size(); i++) { + formattedArray[i] = + formattedTimestamp(array.get(i), elementType, sessionTimeZone); + } + return formattedArray; + } else if (field instanceof ArrayData) { + ArrayData array = (ArrayData) field; + Object[] formattedArray = new Object[array.size()]; + for (int i = 0; i < array.size(); i++) { + formattedArray[i] = + formattedTimestamp( + array.getTimestamp(i, getPrecision(elementType)), Review comment: This will throw exception if it's not an array of timestamp. I think we can ignore `ArrayData` for now. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java ########## @@ -174,27 +209,264 @@ public static void printAsTableauForm( printWriter.flush(); } - public static String[] rowToString(Row row) { - return rowToString(row, NULL_COLUMN, false); + public static String[] rowToString( + Row row, ResolvedSchema resolvedSchema, ZoneId sessionTimeZone) { + return rowToString(row, NULL_COLUMN, false, resolvedSchema, sessionTimeZone); } - public static String[] rowToString(Row row, String nullColumn, boolean printRowKind) { + public static String[] rowToString( + Row row, + String nullColumn, + boolean printRowKind, + ResolvedSchema resolvedSchema, + ZoneId sessionTimeZone) { final int len = printRowKind ? row.getArity() + 1 : row.getArity(); final List<String> fields = new ArrayList<>(len); if (printRowKind) { fields.add(row.getKind().shortString()); } for (int i = 0; i < row.getArity(); i++) { final Object field = row.getField(i); + final LogicalType fieldType = + resolvedSchema.getColumnDataTypes().get(i).getLogicalType(); if (field == null) { fields.add(nullColumn); } else { - fields.add(StringUtils.arrayAwareToString(field)); + fields.add( + StringUtils.arrayAwareToString( + formattedTimestamp(field, fieldType, sessionTimeZone))); } } return fields.toArray(new String[0]); } + /** + * Normalizes field that contains TIMESTAMP and TIMESTAMP_LTZ type data. + * + * <p>This method also supports nested type ARRAY, ROW, MAP. + */ + private static Object formattedTimestamp( + Object field, LogicalType fieldType, ZoneId sessionTimeZone) { + final LogicalTypeRoot typeRoot = fieldType.getTypeRoot(); + if (field == null) { + return "null"; + } + switch (typeRoot) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return formatTimestampData(field, fieldType, sessionTimeZone); + case ARRAY: + LogicalType elementType = ((ArrayType) fieldType).getElementType(); + if (field instanceof List) { + List<?> array = (List<?>) field; + Object[] formattedArray = new Object[array.size()]; + for (int i = 0; i < array.size(); i++) { + formattedArray[i] = + formattedTimestamp(array.get(i), elementType, sessionTimeZone); + } + return formattedArray; + } else if (field instanceof ArrayData) { + ArrayData array = (ArrayData) field; + Object[] formattedArray = new Object[array.size()]; + for (int i = 0; i < array.size(); i++) { + formattedArray[i] = + formattedTimestamp( + array.getTimestamp(i, getPrecision(elementType)), + elementType, + sessionTimeZone); + } + return new GenericArrayData(formattedArray).toObjectArray(); + } else if (field.getClass().isArray()) { + // primitive type + if (field.getClass() == byte[].class) { + byte[] array = (byte[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == short[].class) { + short[] array = (short[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == int[].class) { + int[] array = (int[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == long[].class) { + long[] array = (long[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == float[].class) { + float[] array = (float[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == double[].class) { + double[] array = (double[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == boolean[].class) { + boolean[] array = (boolean[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == char[].class) { + char[] array = (char[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else { + // non-primitive type + Object[] array = (Object[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } + } else { + return field; + } + case ROW: + if (fieldType instanceof RowType && field instanceof Row) { + Row row = (Row) field; + Row formattedRow = new Row(row.getKind(), row.getArity()); + for (int i = 0; i < ((RowType) fieldType).getFields().size(); i++) { + LogicalType type = ((RowType) fieldType).getFields().get(i).getType(); + formattedRow.setField( + i, formattedTimestamp(row.getField(i), type, sessionTimeZone)); + } + return formattedRow; + + } else if (fieldType instanceof RowType && field instanceof RowData) { + RowData rowData = (RowData) field; + Row formattedRow = new Row(rowData.getRowKind(), rowData.getArity()); + for (int i = 0; i < ((RowType) fieldType).getFields().size(); i++) { + LogicalType type = ((RowType) fieldType).getFields().get(i).getType(); + RowData.FieldGetter fieldGetter = RowData.createFieldGetter(type, i); + formattedRow.setField( + i, + formattedTimestamp( + fieldGetter.getFieldOrNull(rowData), + type, + sessionTimeZone)); + } + return formattedRow; + } else { + return field; + } + case MAP: + LogicalType keyType = ((MapType) fieldType).getKeyType(); + LogicalType valueType = ((MapType) fieldType).getValueType(); + if (fieldType instanceof MapType && field instanceof Map) { + Map<Object, Object> map = ((Map) field); + Map<Object, Object> formattedMap = new HashMap<>(map.size()); + for (Object key : map.keySet()) { + formattedMap.put( + formattedTimestamp(key, keyType, sessionTimeZone), + formattedTimestamp(map.get(key), valueType, sessionTimeZone)); + } + return formattedMap; + } else if (fieldType instanceof MapType && field instanceof MapData) { + MapData map = ((MapData) field); + Map<Object, Object> formattedMap = new HashMap<>(map.size()); + Object[] keyArray = + (Object[]) formattedTimestamp(map.keyArray(), keyType, sessionTimeZone); + Object[] valueArray = + (Object[]) + formattedTimestamp( + map.valueArray(), valueType, sessionTimeZone); + for (int i = 0; i < keyArray.length; i++) { + formattedMap.put(keyArray[i], valueArray[i]); + } + return formattedMap; + } else { + return field; + } + default: + return field; + } + } + + /** + * Formats the print content of TIMESTAMP and TIMESTAMP_LTZ type data, consider the user + * configured time zone. + */ + private static Object formatTimestampData( Review comment: Would be better to call this method `formatTimestampField`. It sounds like formatting a `TimestampData` parameter. ########## File path: flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java ########## @@ -81,7 +188,10 @@ public void testStringDisplayWidth() { @Test public void testPrintWithEmptyResult() { PrintUtils.printAsTableauForm( - getSchema(), Collections.<Row>emptyList().iterator(), new PrintWriter(outContent)); + getSchema(), + Collections.<Row>emptyList().iterator(), + new PrintWriter(outContent), + ZoneId.of("UTC")); Review comment: Use `UTC_ZONE_ID` instead? ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java ########## @@ -239,9 +511,9 @@ public static String genBorderLine(int[] colWidths) { // determine proper column width based on types for (int i = 0; i < columns.size(); ++i) { - LogicalType type = columns.get(i).getDataType().getLogicalType(); + DataType type = columns.get(i).getDataType(); Review comment: We can revert changes in this method now? ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TimestampStringUtils.java ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.annotation.Internal; + +import java.time.LocalDateTime; + +/** Utils to represent a LocalDateTime to String, considered the precision. */ +@Internal +public class TimestampStringUtils { + + // TODO this method is copied from org.apache.flink.table.runtime.functions.SqlDDateTimeUtils, Review comment: ```suggestion // TODO this method is copied from org.apache.flink.table.runtime.functions.SqlDateTimeUtils, ``` ########## File path: flink-table/flink-sql-client/src/test/resources/sql/select.q ########## @@ -59,14 +59,61 @@ GROUP BY id; +----+-------------+----------------------+----------------------+-------------------------------+ | op | id | cnt | uv | max_ts | +----+-------------+----------------------+----------------------+-------------------------------+ -| +I | 1 | 1 | 1 | 2021-04-13T20:12:11.123456789 | -| +I | 2 | 1 | 1 | 2021-04-13T19:12:11.123456789 | -| -U | 2 | 1 | 1 | 2021-04-13T19:12:11.123456789 | -| +U | 2 | 2 | 1 | 2021-04-13T21:12:11.123456789 | +| +I | 1 | 1 | 1 | 2021-04-13 20:12:11.123456789 | +| +I | 2 | 1 | 1 | 2021-04-13 19:12:11.123456789 | +| -U | 2 | 1 | 1 | 2021-04-13 19:12:11.123456789 | +| +U | 2 | 2 | 1 | 2021-04-13 21:12:11.123456789 | +----+-------------+----------------------+----------------------+-------------------------------+ Received a total of 4 rows !ok +# ========================================================================== +# test TIMESTAMP and TIMESTAMP_LTZ type display with tableau result mode +# ========================================================================== + +SET table.local-time-zone = Asia/Shanghai; +[INFO] Session property has been set. +!info + +SELECT id, ts0, ts3, ts9, + TO_TIMESTAMP_LTZ(1, 0) AS ts_ltz0, + TO_TIMESTAMP_LTZ(1, 3) AS ts_ltz3, + CAST(ts9 as TIMESTAMP_LTZ(9)) AS ts_ltz9 +FROM (VALUES + (1, TIMESTAMP '2021-04-13 20:12:11', TIMESTAMP '2021-04-13 20:12:11.123', TIMESTAMP '2021-04-13 20:12:11.123456789'), + (2, TIMESTAMP '2021-04-13 21:12:11', TIMESTAMP '2021-04-13 21:12:11.001', TIMESTAMP '2021-04-13 21:12:11.1')) + as T(id, ts0, ts3, ts9); ++----+-------------+---------------------+-------------------------+-------------------------------+-------------------------+-------------------------+-------------------------------+ +| op | id | ts0 | ts3 | ts9 | ts_ltz0 | ts_ltz3 | ts_ltz9 | ++----+-------------+---------------------+-------------------------+-------------------------------+-------------------------+-------------------------+-------------------------------+ +| +I | 1 | 2021-04-13 20:12:11 | 2021-04-13 20:12:11.123 | 2021-04-13 20:12:11.123456789 | 1970-01-01 08:00:01.000 | 1970-01-01 08:00:00.001 | 2021-04-13 20:12:11.123456789 | +| +I | 2 | 2021-04-13 21:12:11 | 2021-04-13 21:12:11.001 | 2021-04-13 21:12:11.100000000 | 1970-01-01 08:00:01.000 | 1970-01-01 08:00:00.001 | 2021-04-13 21:12:11.100000000 | ++----+-------------+---------------------+-------------------------+-------------------------------+-------------------------+-------------------------+-------------------------------+ +Received a total of 2 rows +!ok + +SET table.local-time-zone = UTC; +[INFO] Session property has been set. +!info + +SELECT id, ts0, ts3, ts9, + TO_TIMESTAMP_LTZ(1, 0) AS ts_ltz0, + TO_TIMESTAMP_LTZ(1, 3) AS ts_ltz3, + CAST(ts9 as TIMESTAMP_LTZ(9)) AS ts_ltz9 +FROM (VALUES + (1, TIMESTAMP '2021-04-13 20:12:11', TIMESTAMP '2021-04-13 20:12:11.123', TIMESTAMP '2021-04-13 20:12:11.123456789'), + (2, TIMESTAMP '2021-04-13 21:12:11', TIMESTAMP '2021-04-13 21:12:11.001', TIMESTAMP '2021-04-13 21:12:11.1')) + as T(id, ts0, ts3, ts9); ++----+-------------+---------------------+-------------------------+-------------------------------+-------------------------+-------------------------+-------------------------------+ +| op | id | ts0 | ts3 | ts9 | ts_ltz0 | ts_ltz3 | ts_ltz9 | ++----+-------------+---------------------+-------------------------+-------------------------------+-------------------------+-------------------------+-------------------------------+ +| +I | 1 | 2021-04-13 20:12:11 | 2021-04-13 20:12:11.123 | 2021-04-13 20:12:11.123456789 | 1970-01-01 00:00:01.000 | 1970-01-01 00:00:00.001 | 2021-04-13 20:12:11.123456789 | +| +I | 2 | 2021-04-13 21:12:11 | 2021-04-13 21:12:11.001 | 2021-04-13 21:12:11.100000000 | 1970-01-01 00:00:01.000 | 1970-01-01 00:00:00.001 | 2021-04-13 21:12:11.100000000 | Review comment: Could you also add tests for batch query? ########## File path: flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java ########## @@ -91,9 +90,10 @@ File dir = firstFile.getParentFile(); final List<String> paths = new ArrayList<>(); final FilenameFilter filter = new PatternFilenameFilter(".*\\.q$"); - for (File f : Util.first(dir.listFiles(filter), new File[0])) { - paths.add(f.getAbsolutePath().substring(commonPrefixLength)); - } + // for (File f : Util.first(dir.listFiles(filter), new File[0])) { + // paths.add(f.getAbsolutePath().substring(commonPrefixLength)); + // } + paths.add("sql/select.q"); Review comment: Please revert this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org