wenlong88 commented on a change in pull request #14729: URL: https://github.com/apache/flink/pull/14729#discussion_r568542445
########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StatementSetImpl.java ########## @@ -100,4 +101,27 @@ public TableResult execute() { operations.clear(); } } + + /** + * Get the json plan of the all statements and Tables as a batch. + * + * <p>The json plan is the string json representation of an optimized ExecNode plan for the + * statements and Tables. An ExecNode plan can be serialized to json plan, and a json plan can + * be deserialized to an ExecNode plan. + * + * <p>NOTES: Only the Blink planner supports this method. + * + * <p><b>NOTES:</b>: This is an experimental feature now. + * + * @return the string json representation of an optimized ExecNode plan for the statements and + * Tables. + */ + @Experimental + public String getJsonPlan() { + try { + return tableEnvironment.getJsonPlan(operations); + } finally { + operations.clear(); Review comment: I would like to keep the operations since it is more like explain ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpec.java ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.factories.FactoryUtil; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; + +import java.io.IOException; + +/** + * {@link DynamicTableSourceSpec} describes how to serialize/deserialize dynamic table sink table + * and create {@link DynamicTableSink} from the deserialization result. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class DynamicTableSinkSpec extends CatalogTableSpecBase { + + @JsonIgnore private DynamicTableSink tableSink; + + @JsonCreator Review comment: I think JsonCreator annotation and DynamicTableSinkSpecJsonDeserializer could not work together ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonPlanGenerator.java ########## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph; +import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor; +import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitorImpl; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MapperFeature; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.NamedType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; + +import org.reflections.Reflections; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * An utility class that can generate json plan based on given {@link ExecNodeGraph} or generate + * {@link ExecNodeGraph} based on given json plan. + */ +public class ExecNodeGraphJsonPlanGenerator { + + /** Generate json plan based on the given {@link ExecNodeGraph}. */ + public static String generateJsonPlan(ExecNodeGraph execGraph, SerdeContext serdeCtx) + throws IOException { + final ObjectMapper mapper = new ObjectMapper(); + final SimpleModule module = new SimpleModule(); + registerSerializers(module, serdeCtx); + mapper.registerModule(module); + + final StringWriter writer = new StringWriter(1024); + try (JsonGenerator gen = mapper.getFactory().createGenerator(writer)) { + TopologicalExecNodeGraph topologyGraph = new TopologicalExecNodeGraph(execGraph); + gen.writeObject(topologyGraph); + } + + return writer.toString(); + } + + /** Generate {@link ExecNodeGraph} based on the given json plan. */ + @SuppressWarnings({"rawtypes"}) + public static ExecNodeGraph generateExecNodeGraph(String jsonPlan, SerdeContext serdeCtx) + throws IOException { + final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false); + final SimpleModule module = new SimpleModule(); + final Set<Class<? extends ExecNodeBase>> nodeClasses = scanSubClassesOfExecNodeBase(); + nodeClasses.forEach(c -> module.registerSubtypes(new NamedType(c, c.getCanonicalName()))); + final TopologicalExecNodeGraphDeContext graphCtx = new TopologicalExecNodeGraphDeContext(); + registerDeserializers(mapper, module, serdeCtx, graphCtx); + mapper.registerModule(module); + + final TopologicalExecNodeGraph topologicalGraph = + mapper.readValue(jsonPlan, TopologicalExecNodeGraph.class); + + return topologicalGraph.convertToExecNodeGraph(graphCtx); + } + + private static void registerSerializers(SimpleModule module, SerdeContext serdeCtx) { + module.addSerializer(new ChangelogModeJsonSerializer()); + } + + private static void registerDeserializers( + ObjectMapper mapper, + SimpleModule module, + SerdeContext serdeCtx, + TopologicalExecNodeGraphDeContext graphCtx) { + module.addDeserializer(ExecNode.class, new ExecNodeDeserializer(graphCtx, mapper)); + module.addDeserializer( + DynamicTableSourceSpec.class, + new DynamicTableSourceSpec.DynamicTableSourceSpecJsonDeserializer(serdeCtx)); + module.addDeserializer( + DynamicTableSinkSpec.class, + new DynamicTableSinkSpec.DynamicTableSinkSpecJsonDeserializer(serdeCtx)); + module.addDeserializer(ChangelogMode.class, new ChangelogModeJsonDeserializer()); + } + + /** + * The json plan representing pojo class. + * + * <p>This class can be serialize to json plan, or deserialize from json plan. + */ + public static class TopologicalExecNodeGraph { + public static final String FIELD_NAME_FLINK_VERSION = "flinkVersion"; + public static final String FIELD_NAME_NODES = "nodes"; + + @JsonProperty(FIELD_NAME_FLINK_VERSION) + private final String flinkVersion; + + @JsonProperty(FIELD_NAME_NODES) + private final ExecNode<?>[] topologicalOrderingNodes; + + public TopologicalExecNodeGraph(ExecNodeGraph execGraph) { + topologicalOrderingNodes = getAllNodesAsTopologicalOrdering(execGraph); + this.flinkVersion = execGraph.getFlinkVersion(); + } + + @JsonCreator + public TopologicalExecNodeGraph( + @JsonProperty(FIELD_NAME_FLINK_VERSION) String flinkVersion, + @JsonProperty(FIELD_NAME_NODES) ExecNode<?>[] topologicalOrderingNodes) { + this.flinkVersion = flinkVersion; + this.topologicalOrderingNodes = topologicalOrderingNodes; + } + + private ExecNode<?>[] getAllNodesAsTopologicalOrdering(ExecNodeGraph execGraph) { + final List<ExecNode<?>> allNodes = new ArrayList<>(); + final Set<Integer> nodesIds = new HashSet<>(); + // for quick search + final Set<ExecNode<?>> visitedNodes = Sets.newIdentityHashSet(); + + final ExecNodeVisitor visitor = + new ExecNodeVisitorImpl() { + @Override + public void visit(ExecNode<?> node) { + if (visitedNodes.contains(node)) { + return; + } + super.visitInputs(node); + + final int id = node.getId(); + if (nodesIds.contains(id)) { + throw new TableException( + String.format( + "The id: %s is not unique for ExecNode: %s.\nplease check it.", + id, node.getDesc())); + } + + allNodes.add(node); + nodesIds.add(id); + visitedNodes.add(node); + } + }; + + execGraph.getRootNodes().forEach(visitor::visit); + checkArgument(allNodes.size() == nodesIds.size()); + return allNodes.toArray(new ExecNode<?>[0]); + } + + public ExecNodeGraph convertToExecNodeGraph(TopologicalExecNodeGraphDeContext ctx) { + for (ExecNode<?> execNode : topologicalOrderingNodes) { + List<ExecNode<?>> inputNodes = ctx.getInputNodes(execNode.getId()); + ((ExecNodeBase<?>) execNode).setInputNodes(inputNodes); + } + List<ExecNode<?>> rootNodes = ctx.getRootNodes(); + return new ExecNodeGraph(rootNodes); + } + } + + /** JSON deserializer for {@link ExecNodeBase}. */ + public static class ExecNodeDeserializer extends StdDeserializer<ExecNodeBase<?>> { Review comment: I think it is better to mark such Deserializer top level class instead of inner class, because they are so important in the serialization ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org