lincoln-lil commented on code in PR #20363:
URL: https://github.com/apache/flink/pull/20363#discussion_r934082373


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1356,6 +1379,15 @@ public TableResultInternal executeInternal(Operation 
operation) {
             return (TableResultInternal) compiledPlan.execute();
         } else if (operation instanceof NopOperation) {
             return TableResultImpl.TABLE_RESULT_OK;
+        } else if (operation instanceof AnalyzeTableOperation) {
+            if (isStreamingMode) {
+                throw new TableException("AnalyzeTable is not supported for 
streaming mode now");

Review Comment:
   'AnalyzeTable' ->  'ANALYZE TABLE'



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1356,6 +1379,15 @@ public TableResultInternal executeInternal(Operation 
operation) {
             return (TableResultInternal) compiledPlan.execute();
         } else if (operation instanceof NopOperation) {
             return TableResultImpl.TABLE_RESULT_OK;
+        } else if (operation instanceof AnalyzeTableOperation) {

Review Comment:
   nit: maybe better to move this new meaningful operation before `NopOperation`



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1894,4 +1926,319 @@ public TableImpl createTable(QueryOperation 
tableOperation) {
     public String explainPlan(InternalPlan compiledPlan, ExplainDetail... 
extraDetails) {
         return planner.explainPlan(compiledPlan, extraDetails);
     }
+
+    private TableResultInternal analyzeTable(AnalyzeTableOperation operation)
+            throws TableNotPartitionedException, TableNotExistException, 
PartitionNotExistException,
+                    TablePartitionedException {
+        CatalogTable table =
+                
catalogManager.getTable(operation.getTableIdentifier()).get().getTable();
+        ResolvedSchema schema =
+                
table.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+        List<Column> columns =
+                operation.getColumns().stream()
+                        .map(c -> schema.getColumn(c).get())
+                        .collect(Collectors.toList());
+        Catalog catalog =
+                
catalogManager.getCatalog(operation.getTableIdentifier().getCatalogName()).get();
+        ObjectPath objectPath = operation.getTableIdentifier().toObjectPath();
+
+        if (table.isPartitioned()) {
+            List<CatalogPartitionSpec> targetPartitions =
+                    
operation.getPartitionSpecs().orElse(catalog.listPartitions(objectPath));
+            for (CatalogPartitionSpec partitionSpec : targetPartitions) {
+                String statSql =
+                        generateAnalyzeSql(operation.getTableIdentifier(), 
partitionSpec, columns);
+                TableResult tableResult = executeSql(statSql);
+                List<Row> result = 
CollectionUtil.iteratorToList(tableResult.collect());
+                Preconditions.checkArgument(result.size() == 1);
+                Row row = result.get(0);
+                CatalogTableStatistics tableStat = 
convertToTableStatistics(row);
+                catalog.alterPartitionStatistics(objectPath, partitionSpec, 
tableStat, false);
+                if (!columns.isEmpty()) {
+                    CatalogColumnStatistics columnStat = 
convertToColumnStatistics(row, columns);
+                    catalog.alterPartitionColumnStatistics(
+                            objectPath, partitionSpec, columnStat, false);
+                }
+            }
+        } else {
+            String statSql = 
generateAnalyzeSql(operation.getTableIdentifier(), null, columns);
+            TableResult tableResult = executeSql(statSql);
+            List<Row> result = 
CollectionUtil.iteratorToList(tableResult.collect());
+            Preconditions.checkArgument(result.size() == 1);
+            Row row = result.get(0);
+            CatalogTableStatistics tableStat = convertToTableStatistics(row);
+            catalog.alterTableStatistics(objectPath, tableStat, false);
+            if (!columns.isEmpty()) {
+                CatalogColumnStatistics columnStat = 
convertToColumnStatistics(row, columns);
+                catalog.alterTableColumnStatistics(objectPath, columnStat, 
false);
+            }
+        }
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
+
+    private String generateAnalyzeSql(
+            ObjectIdentifier tableIdentifier,
+            @Nullable CatalogPartitionSpec partitionSpec,
+            List<Column> columns) {
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        Preconditions.checkArgument(
+                optionalCatalogTable.isPresent(), tableIdentifier + " does not 
exist");
+
+        String partitionFilter;
+        if (partitionSpec != null) {
+            partitionFilter =
+                    " WHERE "
+                            + 
partitionSpec.getPartitionSpec().entrySet().stream()
+                                    .map(e -> e.getKey() + "=" + e.getValue())
+                                    .collect(Collectors.joining(" AND "));
+        } else {
+            partitionFilter = "";
+        }
+
+        final String columnStatsSelects;
+        if (columns.isEmpty()) {
+            columnStatsSelects = "";
+        } else {
+            columnStatsSelects = ", " + getColumnStatsSelects(columns);
+        }
+
+        return String.format(
+                "SELECT COUNT(1) AS %s %s FROM %s %s",
+                getRowCountColumn(), columnStatsSelects, tableIdentifier, 
partitionFilter);
+    }
+
+    private String getColumnStatsSelects(List<Column> columns) {
+        return columns.stream()
+                .flatMap(
+                        f -> {
+                            String c = f.getName();
+                            List<String> columnStatSelect = new ArrayList<>();
+                            String computeNullCount =
+                                    String.format(
+                                            "(COUNT(1) - COUNT(`%s`)) AS %s",
+                                            c, getNullCountColumn(c));
+                            columnStatSelect.add(computeNullCount);
+
+                            String computeNdv =
+                                    String.format(
+                                            "APPROX_COUNT_DISTINCT(`%s`) AS 
%s",
+                                            c, getNdvColumn(c));
+
+                            switch 
(f.getDataType().getLogicalType().getTypeRoot()) {
+                                case BOOLEAN:
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "COUNT(`%s`) FILTER (WHERE 
`%s` IS TRUE) AS %s",
+                                                    c, c, 
getTrueCountColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "COUNT(`%s`) FILTER (WHERE 
`%s` IS FALSE) AS %s",
+                                                    c, c, 
getFalseCountColumn(c)));
+                                    break;
+                                case TINYINT:
+                                case SMALLINT:
+                                case INTEGER:
+                                case FLOAT:
+                                case DATE:
+                                case TIME_WITHOUT_TIME_ZONE:
+                                case BIGINT:
+                                case DOUBLE:
+                                case DECIMAL:
+                                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                                    columnStatSelect.add(computeNdv);
+                                    columnStatSelect.add(
+                                            String.format("MAX(`%s`) AS %s", 
c, getMaxColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format("MIN(`%s`) AS %s", 
c, getMinColumn(c)));
+                                    break;
+                                case CHAR:
+                                case VARCHAR:
+                                    columnStatSelect.add(computeNdv);
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    
"AVG(CAST(CHAR_LENGTH(`%s`) AS DOUBLE)) AS %s",
+                                                    c, getAvgLenColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    
"MAX(CAST(CHAR_LENGTH(`%s`) AS BIGINT)) AS %s",
+                                                    c, getMaxLenColumn(c)));
+                                    break;
+                                default:
+                                    break;
+                            }
+                            return columnStatSelect.stream();
+                        })
+                .collect(Collectors.joining(", "));
+    }
+
+    private String getRowCountColumn() {
+        return "rowCount";
+    }
+
+    private String getNullCountColumn(String column) {
+        return String.format("%s_nullCount", column);
+    }
+
+    private String getNdvColumn(String column) {
+        return String.format("%s_ndv", column);
+    }
+
+    private String getTrueCountColumn(String column) {
+        return String.format("%s_trueCount", column);
+    }
+
+    private String getFalseCountColumn(String column) {
+        return String.format("%s_falseCount", column);
+    }
+
+    private String getMaxColumn(String column) {
+        return String.format("%s_max", column);
+    }
+
+    private String getMinColumn(String column) {
+        return String.format("%s_min", column);
+    }
+
+    private String getAvgLenColumn(String column) {
+        return String.format("%s_avgLen", column);
+    }
+
+    private String getMaxLenColumn(String column) {
+        return String.format("%s_maxLen", column);
+    }
+
+    private CatalogTableStatistics convertToTableStatistics(Row row) {
+        Long rowCount = row.getFieldAs(getRowCountColumn());
+        return new CatalogTableStatistics(rowCount, -1, -1, -1);
+    }
+
+    private CatalogColumnStatistics convertToColumnStatistics(Row row, 
List<Column> columns) {
+        Preconditions.checkArgument(!columns.isEmpty());
+        Map<String, CatalogColumnStatisticsDataBase> columnStatMap = new 
HashMap<>();
+        for (Column column : columns) {
+            CatalogColumnStatisticsDataBase columnStat = 
convertToColumnStatisticsData(row, column);
+            if (columnStat != null) {
+                columnStatMap.put(column.getName(), columnStat);
+            }
+        }
+        return new CatalogColumnStatistics(columnStatMap);
+    }
+
+    private CatalogColumnStatisticsDataBase convertToColumnStatisticsData(Row 
row, Column column) {
+        String c = column.getName();
+        Long nullCount = row.getFieldAs(getNullCountColumn(c));
+        switch (column.getDataType().getLogicalType().getTypeRoot()) {
+            case BOOLEAN:
+                Long trueCount = row.getFieldAs(getTrueCountColumn(c));
+                Long falseCount = row.getFieldAs(getFalseCountColumn(c));
+                return new CatalogColumnStatisticsDataBoolean(trueCount, 
falseCount, nullCount);
+            case TINYINT:
+                Byte maxByte = row.getFieldAs(getMaxColumn(c));
+                Byte minByte = row.getFieldAs(getMinColumn(c));
+                Long ndvByte = row.getFieldAs(getNdvColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minByte != null ? minByte.longValue() : null,
+                        maxByte != null ? maxByte.longValue() : null,
+                        ndvByte,
+                        nullCount);
+            case SMALLINT:
+                Short maxShort = row.getFieldAs(getMaxColumn(c));
+                Short minShort = row.getFieldAs(getMinColumn(c));
+                Long ndvShort = row.getFieldAs(getNdvColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minShort != null ? minShort.longValue() : null,
+                        maxShort != null ? maxShort.longValue() : null,
+                        ndvShort,
+                        nullCount);
+            case INTEGER:
+                Integer maxInt = row.getFieldAs(getMaxColumn(c));
+                Integer minInt = row.getFieldAs(getMinColumn(c));
+                Long ndvInt = row.getFieldAs(getNdvColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minInt != null ? minInt.longValue() : null,
+                        maxInt != null ? maxInt.longValue() : null,
+                        ndvInt,
+                        nullCount);
+            case BIGINT:
+                Long ndvLong = row.getFieldAs(getNdvColumn(c));
+                Long maxLong = row.getFieldAs(getMaxColumn(c));
+                Long minLong = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataLong(minLong, maxLong, 
ndvLong, nullCount);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                Long ndvTs = row.getFieldAs(getNdvColumn(c));
+                LocalDateTime maxTs = row.getFieldAs(getMaxColumn(c));
+                LocalDateTime minTs = row.getFieldAs(getMinColumn(c));
+
+                return new CatalogColumnStatisticsDataLong(
+                        minTs != null ? minTs.toEpochSecond(ZoneOffset.UTC) : 
null,
+                        maxTs != null ? maxTs.toEpochSecond(ZoneOffset.UTC) : 
null,
+                        ndvTs,
+                        nullCount);
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                Long ndvTsLtz = row.getFieldAs(getNdvColumn(c));
+                Instant maxTsLtz = row.getFieldAs(getMaxColumn(c));
+                Instant minTsLtz = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minTsLtz != null ? minTsLtz.toEpochMilli() : null,
+                        maxTsLtz != null ? maxTsLtz.toEpochMilli() : null,
+                        ndvTsLtz,
+                        nullCount);
+            case FLOAT:
+                Long ndvFloat = row.getFieldAs(getNdvColumn(c));
+                Float maxFloat = row.getFieldAs(getMaxColumn(c));
+                Float minFloat = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDouble(
+                        minFloat != null ? minFloat.doubleValue() : null,
+                        maxFloat != null ? maxFloat.doubleValue() : null,
+                        ndvFloat,
+                        nullCount);
+            case DOUBLE:
+                Long ndvDouble = row.getFieldAs(getNdvColumn(c));
+                Double maxDouble = row.getFieldAs(getMaxColumn(c));
+                Double minDouble = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDouble(
+                        minDouble, maxDouble, ndvDouble, nullCount);
+            case DECIMAL:
+                Long ndvDecimal = row.getFieldAs(getNdvColumn(c));
+                BigDecimal maxDecimal = row.getFieldAs(getMaxColumn(c));
+                BigDecimal minDecimal = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDouble(
+                        minDecimal != null ? minDecimal.doubleValue() : null,
+                        maxDecimal != null ? maxDecimal.doubleValue() : null,
+                        ndvDecimal,
+                        nullCount);
+            case DATE:
+                Long ndvDate = row.getFieldAs(getNdvColumn(c));
+                LocalDate maxDate = row.getFieldAs(getMaxColumn(c));
+                LocalDate minDate = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDate(
+                        minDate != null ? new Date(minDate.toEpochDay()) : 
null,
+                        maxDate != null ? new Date(maxDate.toEpochDay()) : 
null,
+                        ndvDate,
+                        nullCount);
+            case TIME_WITHOUT_TIME_ZONE:
+                Long ndvTime = row.getFieldAs(getNdvColumn(c));
+                LocalTime maxTime = row.getFieldAs(getMaxColumn(c));
+                LocalTime minTime = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minTime != null ? minTime.toNanoOfDay() : null,
+                        maxTime != null ? maxTime.toNanoOfDay() : null,
+                        ndvTime,
+                        nullCount);
+            case CHAR:
+            case VARCHAR:
+                Long ndvString = row.getFieldAs(getNdvColumn(c));
+                Double avgLen = row.getFieldAs(getAvgLenColumn(c));
+                Long maxLen = row.getFieldAs(getMaxLenColumn(c));
+                return new CatalogColumnStatisticsDataString(maxLen, avgLen, 
ndvString, nullCount);
+            case BINARY:

Review Comment:
   This is not consistent with types in `getColumnStatsSelects`



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1247,6 +1268,178 @@ private Operation 
convertCompileAndExecutePlan(SqlCompileAndExecutePlan compileA
                         compileAndExecutePlan.getOperandList().get(0)));
     }
 
+    private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(analyzeTable.fullTableName());
+        ObjectIdentifier tableIdentifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        if (!optionalCatalogTable.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s doesn't exist.", tableIdentifier));
+        }
+
+        CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
+        if (baseTable instanceof CatalogView) {
+            throw new ValidationException("ANALYZE TABLE for a view is not 
allowed");
+        }
+        CatalogTable table = (CatalogTable) baseTable;
+        ResolvedSchema schema =
+                
baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+
+        LinkedHashMap<String, String> partitions = 
analyzeTable.getPartitions();
+        List<CatalogPartitionSpec> targetPartitionSpecs = null;
+        if (table.isPartitioned()) {
+            if (!new 
ArrayList<>(partitions.keySet()).equals(table.getPartitionKeys())) {

Review Comment:
   Do we require a strict order of partition columns? If so, the error message 
could highlight it.
   Also, it is good to add a case includes only partition columns' order 
mismatch.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1356,6 +1379,15 @@ public TableResultInternal executeInternal(Operation 
operation) {
             return (TableResultInternal) compiledPlan.execute();
         } else if (operation instanceof NopOperation) {
             return TableResultImpl.TABLE_RESULT_OK;
+        } else if (operation instanceof AnalyzeTableOperation) {
+            if (isStreamingMode) {
+                throw new TableException("AnalyzeTable is not supported for 
streaming mode now");
+            }
+            try {
+                return analyzeTable((AnalyzeTableOperation) operation);
+            } catch (Exception e) {
+                throw new TableException("Failed to execute AnalyzeTable 
command", e);

Review Comment:
   ditto



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1247,6 +1268,178 @@ private Operation 
convertCompileAndExecutePlan(SqlCompileAndExecutePlan compileA
                         compileAndExecutePlan.getOperandList().get(0)));
     }
 
+    private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(analyzeTable.fullTableName());
+        ObjectIdentifier tableIdentifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        if (!optionalCatalogTable.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s doesn't exist.", tableIdentifier));
+        }
+
+        CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
+        if (baseTable instanceof CatalogView) {
+            throw new ValidationException("ANALYZE TABLE for a view is not 
allowed");
+        }
+        CatalogTable table = (CatalogTable) baseTable;
+        ResolvedSchema schema =
+                
baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+
+        LinkedHashMap<String, String> partitions = 
analyzeTable.getPartitions();
+        List<CatalogPartitionSpec> targetPartitionSpecs = null;
+        if (table.isPartitioned()) {
+            if (!new 
ArrayList<>(partitions.keySet()).equals(table.getPartitionKeys())) {
+                throw new ValidationException(
+                        String.format(
+                                "For partition table, all partition keys 
should be specified explicitly. "
+                                        + "The given partition keys: [%s] are 
not match the target partition keys: [%s]",
+                                String.join(",", partitions.keySet()),
+                                String.join(",", table.getPartitionKeys())));
+            }
+
+            try {
+                targetPartitionSpecs = getPartitionSpecs(tableIdentifier, 
schema, partitions);
+            } catch (Exception e) {
+                throw new ValidationException(e.getMessage(), e);
+            }
+        } else if (!partitions.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Table: %s is not a partition table, while 
partition values is given",

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to