LadyForest commented on a change in pull request #13011:
URL: https://github.com/apache/flink/pull/13011#discussion_r615712845



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1293,6 +1328,121 @@ private TableResult buildShowResult(String columnName, 
String[] objects) {
                 Arrays.stream(objects).map((c) -> new String[] 
{c}).toArray(String[][]::new));
     }
 
+    private String buildShowCreateTableRow(
+            ResolvedCatalogBaseTable<?> table,
+            ObjectIdentifier sqlIdentifier,
+            boolean isTemporary) {
+        CatalogBaseTable.TableKind kind = table.getTableKind();
+        if (kind == CatalogBaseTable.TableKind.VIEW) {
+            throw new TableException(
+                    String.format(
+                            "SHOW CREATE TABLE does not support showing CREATE 
VIEW statement with identifier %s.",
+                            sqlIdentifier.asSerializableString()));
+        }
+        StringBuilder sb =
+                new StringBuilder(
+                        String.format(
+                                "CREATE %sTABLE %s (\n",
+                                isTemporary ? "TEMPORARY " : "",
+                                sqlIdentifier.asSerializableString()));
+        ResolvedSchema schema = table.getResolvedSchema();
+        // append columns
+        sb.append(
+                schema.getColumns().stream()
+                        .map(column -> String.format("%s%s", printIndent, 
getColumnString(column)))
+                        .collect(Collectors.joining(",\n")));
+        // append watermark spec
+        if (!schema.getWatermarkSpecs().isEmpty()) {
+            sb.append(",\n");
+            sb.append(
+                    schema.getWatermarkSpecs().stream()
+                            .map(
+                                    watermarkSpec ->
+                                            String.format(
+                                                    "%sWATERMARK FOR %s AS %s",
+                                                    printIndent,
+                                                    String.join(
+                                                            ".",
+                                                            
EncodingUtils.escapeIdentifier(
+                                                                    
watermarkSpec
+                                                                            
.getRowtimeAttribute())),
+                                                    watermarkSpec
+                                                            
.getWatermarkExpression()
+                                                            
.asSummaryString()))
+                            .collect(Collectors.joining("\n")));
+        }
+        // append constraint
+        if (schema.getPrimaryKey().isPresent()) {
+            sb.append(",\n");
+            sb.append(String.format("%s%s", printIndent, 
schema.getPrimaryKey().get()));
+        }
+        sb.append("\n) ");
+        // append comment
+        String comment = table.getComment();
+        if (StringUtils.isNotEmpty(comment)) {
+            sb.append(String.format("COMMENT '%s'\n", comment));
+        }
+        // append partitions
+        ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) table;
+        if (catalogTable.isPartitioned()) {
+            sb.append("PARTITIONED BY (")
+                    .append(
+                            catalogTable.getPartitionKeys().stream()
+                                    .map(key -> String.format("`%s`", key))
+                                    .collect(Collectors.joining(", ")))
+                    .append(")\n");
+        }
+        // append `with` properties
+        Map<String, String> options = table.getOptions();
+        sb.append("WITH (\n")
+                .append(
+                        options.entrySet().stream()
+                                .map(
+                                        entry ->
+                                                String.format(
+                                                        "%s'%s' = '%s'",
+                                                        printIndent,
+                                                        entry.getKey(),
+                                                        entry.getValue()))
+                                .collect(Collectors.joining(",\n")))
+                .append("\n)\n");
+        return sb.toString();
+    }
+
+    private String getColumnString(Column column) {
+        final StringBuilder sb = new StringBuilder();
+        sb.append(EncodingUtils.escapeIdentifier(column.getName()));
+        sb.append(" ");
+        // skip data type for computed column
+        if (column instanceof Column.ComputedColumn) {
+            sb.append(
+                    column.explainExtras()
+                            .orElseThrow(
+                                    () ->
+                                            new TableException(
+                                                    String.format(
+                                                            "Column expression 
can not be null for computed column '%s'",
+                                                            
column.getName()))));
+        } else {
+            DataType dataType = column.getDataType();
+            String type = dataType.toString();

Review comment:
       > Use `dataType#asSerializableString` instead of 
`toString`/`asSummaryString`. `asSerializableString` doesn't show timestamp 
kind and we don't need the following special logic then.
   > 
   > Actually, we should always use `asSerializableString`, rather than 
`asSummaryString`, because only the serialized string is fully supported to be 
parsed, see `LogicalTypeParser`.
   > 
   > Besides, it would be better to avoid using `toString` and explicitly use 
`asSerializableString`, otherwise, it's hard to trace where the 
`asSerializableString`/`asSummaryString` are used.
   
   Hi Jark, I agree with you that `asSerializableString` goes first. Just one 
thing to confirm, `LogicalTypeParser#parseTypeByKeyword` uses 
`VarCharType.MAX_LENGTH` as varchar length, and for 
`VarCharType#asSerializableString` will always print string data type as 
`VARCHAR(2147483647)`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to