Hi all,

## SupportsParallelismReport

Now that FLIP-95 [1] is ready, only Hive and Filesystem are still using the
old interfaces.

We are considering migrating to the new interface.

However, one problem is that in the old interface implementation,
connectors infer parallelism by itself instead of a global parallelism
configuration. Hive & filesystem determines the parallelism size according
to the number of files and the size of the file. In this way, large tables
may use thousands of parallelisms, while small tables only have 10
parallelisms, which can minimize the consumption of task scheduling.

This situation is very common in batch computing. For example, in the star
model, a large table needs to be joined with multiple small tables.

So we should give this ability to new table source interfaces. The
interface can be:

/**
 * Enables to give source the ability to report parallelism.
 *
 * <p>After filtering push down and partition push down, the source
can have more information,
 * which can help it infer more effective parallelism.
 */
@Internal
public interface SupportsParallelismReport {

   /**
    * Report parallelism from source or sink. The parallelism of an
operator must be at least 1,
    * or -1 (use system default).
    */
   int reportParallelism();
}


Rejected Alternatives:
- SupportsSplitReport: What is the relationship between this split and the
split of FLIP-27? Do we have to match them one by one? I think they are two
independent things. In fact, the design of FLIP-27, split and parallelism
are not bound one by one.
- SupportsPartitionReport: What is partition? Actually, in table/SQL,
partition is a special concept of table. It should not be mixed with
parallelism.

## SupportsStatisticsReport

As with parallelism, statistics information from source will be more
appropriate and accurate. After filtering push down and partition push
down, the source can have more information, which can help it infer more
effective statistics. However, if we only infer from the planner itself, it
may lead to a big gap between the statistics information and the real
situation.

The interface:

/**
 * Enables to give {@link ScanTableSource} the ability to report table
statistics.
 *
 * <p>Statistics can be inferred from real data in real time,  it is
more accurate than the
 * statistics in the catalog.
 *
 * <p>After filtering push down and partition push down, the source
can have more information,
 * which can help it infer more effective table statistics.
 */
@Internal
public interface SupportsStatisticsReport {

   /**
    * Reports {@link TableStats} from old table stats.
    */
   TableStats reportTableStatistics(TableStats oldStats);
}


When to invoke reported statistics to the planner?
- First of all, this call can be expensive (to view the metadata of the
files), so it can't be called repeatedly.
- We need to call after FilterPushdown, because that's the most accurate
information. We also need to call before CBO (Like JoinReorder and choose
BroadcastJoin or ShuffleJoin), because that's where statistics are used.

Rejected Alternatives:
- Using CatalogTableStatistics: CatalogTableStatistics or TableStats? I
lean to TableStats, because TableStats is the class used by planner,
but CatalogTableStatistics may contains some catalog information which is
not related to planner optimizer.

## Internal or Public

I personally lean to internal, these interfaces are only used Hive and
Filesystem, another way is: SupportsParallelismReport(Internal, I haven't
seen this requirement from outside.) and SupportsStatisticsReport(Public,
maybe Apache Iceberg Flink connector can use it).

What do you think?

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces

Best,
Jingsong Lee

Reply via email to