xzw0223 created FLINK-31494:
-------------------------------
Summary: Introduce OperationExecutor for SqlToOperationConverter
Key: FLINK-31494
URL: https://issues.apache.org/jira/browse/FLINK-31494
Project: Flink
Issue Type: Sub-task
Components: Table SQL / Planner
Reporter: xzw0223
Introduce OperationExecutor for SqlToOperationConverter, following Timo's idea
in FLINK-31368
class like:
```java
public interface OperationExecutor {
// The Operation supported by the current OperationExecutor
Class<? extends Operation> supportedOperation();
// The SqlNode type supported by the current OperationExecutor
Class<? extends SqlNode> supportedSqlNode();
// OperationExecutor is used to convert the validated sqlNode into Operation
Operation execute(SqlNode validated);
}
```
Add an OperationExecutors to manage all OperationExecutors and be responsible
for performing conversion sqlNode.
```java
public class OperationExecutors{
private static Map<Class<? extends SqlNode>,OperationExecutor > executors = new
HashMap<>();
static{
addExecutor(SqlCreateCatalog.class,CreateCatalogExecutor.class);
// .....
}
private static void addExecutor(sqlnodeClass,operationExecutorsClass) {
executors.put(sqlnodeClass,operationExecutorsClass.newInstance);
}
public OperationExecutor getExecutor(sqlnodeClass) {
return executors.get(validated.getClass());
}
public Operation execute(SqlNode validated){
return executors.get(validated.getClass()).executor(validated);
}
```
This can be used in SqlToOperationConverter.java
```java
private static Optional<Operation> convertValidatedSqlNode(
FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode
validated) {
OperationExecutor<?> executor = OperationExecutors.getExecutor(validated);
if (executor == null) {
return Optional.empty();
}
Operation operation = executor.execute(validated);
if (operation!=null) {
return Optional.of(operation);
}
}
```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)