godfreyhe commented on code in PR #21182:
URL: https://github.com/apache/flink/pull/21182#discussion_r1043322430


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java:
##########
@@ -62,13 +61,10 @@ protected TimeIndicatorTypeInfo(boolean isEventTime) {
         this.isEventTime = isEventTime;
     }
 
-    // this replaces the effective serializer by a LongSerializer
-    // it is a hacky but efficient solution to keep the object creation 
overhead low but still
-    // be compatible with the corresponding SqlTimestampTypeInfo
     @Override
     @SuppressWarnings("unchecked")
     public TypeSerializer<Timestamp> createSerializer(ExecutionConfig 
executionConfig) {
-        return (TypeSerializer) LongSerializer.INSTANCE;
+        return (TypeSerializer) LocalDateTimeSerializer.INSTANCE;

Review Comment:
   can we improve the conversion logic of `ResolvedSchema to TableSchema` ?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java:
##########
@@ -346,6 +346,41 @@ public Schema toSchema() {
         return builder.build();
     }
 
+    /** Helps to migrate to the new {@link Schema} class, retain comments when 
needed. */
+    public Schema toSchema(Map<String, String> comments) {

Review Comment:
   please let `toSchema()` call `toSchema(Map<String, String> comments)`



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java:
##########
@@ -49,7 +47,6 @@ public List<String> supportedProperties() {
     @Override
     public TableSource createTableSource(TableSourceFactory.Context context) {
         CatalogTable table = checkNotNull(context.getTable());
-        Preconditions.checkArgument(table instanceof CatalogTableImpl);

Review Comment:
   why we need remove this check?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1600,23 +1619,34 @@ private Object[][] buildTableColumns(ResolvedSchema 
schema) {
                                                             "PRI(%s)",
                                                             String.join(", ", 
columns))));
                         });
-
+        boolean nonComments = isSchemaNonColumnComments(schema);
         return schema.getColumns().stream()
                 .map(
                         (c) -> {
                             final LogicalType logicalType = 
c.getDataType().getLogicalType();
-                            return new Object[] {
-                                c.getName(),
-                                logicalType.copy(true).asSummaryString(),
-                                logicalType.isNullable(),
-                                fieldToPrimaryKey.getOrDefault(c.getName(), 
null),
-                                c.explainExtras().orElse(null),
-                                fieldToWatermark.getOrDefault(c.getName(), 
null)
-                            };
+                            final ArrayList<Object> result =
+                                    new ArrayList<>(
+                                            Arrays.asList(
+                                                    c.getName(),
+                                                    
logicalType.copy(true).asSummaryString(),
+                                                    logicalType.isNullable(),
+                                                    
fieldToPrimaryKey.getOrDefault(
+                                                            c.getName(), null),
+                                                    
c.explainExtras().orElse(null),
+                                                    
fieldToWatermark.getOrDefault(
+                                                            c.getName(), 
null)));
+                            if (!nonComments) {
+                                result.add(c.getComment().orElse(null));
+                            }
+                            return result.toArray();
                         })
                 .toArray(Object[][]::new);
     }
 
+    private boolean isSchemaNonColumnComments(ResolvedSchema schema) {

Review Comment:
   another solution: we always print the comments whether the schema has 
comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to