wenlong88 commented on a change in pull request #14729:
URL: https://github.com/apache/flink/pull/14729#discussion_r568542445



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StatementSetImpl.java
##########
@@ -100,4 +101,27 @@ public TableResult execute() {
             operations.clear();
         }
     }
+
+    /**
+     * Get the json plan of the all statements and Tables as a batch.
+     *
+     * <p>The json plan is the string json representation of an optimized 
ExecNode plan for the
+     * statements and Tables. An ExecNode plan can be serialized to json plan, 
and a json plan can
+     * be deserialized to an ExecNode plan.
+     *
+     * <p>NOTES: Only the Blink planner supports this method.
+     *
+     * <p><b>NOTES:</b>: This is an experimental feature now.
+     *
+     * @return the string json representation of an optimized ExecNode plan 
for the statements and
+     *     Tables.
+     */
+    @Experimental
+    public String getJsonPlan() {
+        try {
+            return tableEnvironment.getJsonPlan(operations);
+        } finally {
+            operations.clear();

Review comment:
       I would like to keep the operations since it is more like explain

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpec.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+/**
+ * {@link DynamicTableSourceSpec} describes how to serialize/deserialize 
dynamic table sink table
+ * and create {@link DynamicTableSink} from the deserialization result.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DynamicTableSinkSpec extends CatalogTableSpecBase {
+
+    @JsonIgnore private DynamicTableSink tableSink;
+
+    @JsonCreator

Review comment:
       I think JsonCreator annotation and DynamicTableSinkSpecJsonDeserializer 
could not work together

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonPlanGenerator.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
+import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
+import 
org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitorImpl;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MapperFeature;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.NamedType;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+import org.reflections.Reflections;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An utility class that can generate json plan based on given {@link 
ExecNodeGraph} or generate
+ * {@link ExecNodeGraph} based on given json plan.
+ */
+public class ExecNodeGraphJsonPlanGenerator {
+
+    /** Generate json plan based on the given {@link ExecNodeGraph}. */
+    public static String generateJsonPlan(ExecNodeGraph execGraph, 
SerdeContext serdeCtx)
+            throws IOException {
+        final ObjectMapper mapper = new ObjectMapper();
+        final SimpleModule module = new SimpleModule();
+        registerSerializers(module, serdeCtx);
+        mapper.registerModule(module);
+
+        final StringWriter writer = new StringWriter(1024);
+        try (JsonGenerator gen = mapper.getFactory().createGenerator(writer)) {
+            TopologicalExecNodeGraph topologyGraph = new 
TopologicalExecNodeGraph(execGraph);
+            gen.writeObject(topologyGraph);
+        }
+
+        return writer.toString();
+    }
+
+    /** Generate {@link ExecNodeGraph} based on the given json plan. */
+    @SuppressWarnings({"rawtypes"})
+    public static ExecNodeGraph generateExecNodeGraph(String jsonPlan, 
SerdeContext serdeCtx)
+            throws IOException {
+        final ObjectMapper mapper = new ObjectMapper();
+        mapper.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false);
+        final SimpleModule module = new SimpleModule();
+        final Set<Class<? extends ExecNodeBase>> nodeClasses = 
scanSubClassesOfExecNodeBase();
+        nodeClasses.forEach(c -> module.registerSubtypes(new NamedType(c, 
c.getCanonicalName())));
+        final TopologicalExecNodeGraphDeContext graphCtx = new 
TopologicalExecNodeGraphDeContext();
+        registerDeserializers(mapper, module, serdeCtx, graphCtx);
+        mapper.registerModule(module);
+
+        final TopologicalExecNodeGraph topologicalGraph =
+                mapper.readValue(jsonPlan, TopologicalExecNodeGraph.class);
+
+        return topologicalGraph.convertToExecNodeGraph(graphCtx);
+    }
+
+    private static void registerSerializers(SimpleModule module, SerdeContext 
serdeCtx) {
+        module.addSerializer(new ChangelogModeJsonSerializer());
+    }
+
+    private static void registerDeserializers(
+            ObjectMapper mapper,
+            SimpleModule module,
+            SerdeContext serdeCtx,
+            TopologicalExecNodeGraphDeContext graphCtx) {
+        module.addDeserializer(ExecNode.class, new 
ExecNodeDeserializer(graphCtx, mapper));
+        module.addDeserializer(
+                DynamicTableSourceSpec.class,
+                new 
DynamicTableSourceSpec.DynamicTableSourceSpecJsonDeserializer(serdeCtx));
+        module.addDeserializer(
+                DynamicTableSinkSpec.class,
+                new 
DynamicTableSinkSpec.DynamicTableSinkSpecJsonDeserializer(serdeCtx));
+        module.addDeserializer(ChangelogMode.class, new 
ChangelogModeJsonDeserializer());
+    }
+
+    /**
+     * The json plan representing pojo class.
+     *
+     * <p>This class can be serialize to json plan, or deserialize from json 
plan.
+     */
+    public static class TopologicalExecNodeGraph {
+        public static final String FIELD_NAME_FLINK_VERSION = "flinkVersion";
+        public static final String FIELD_NAME_NODES = "nodes";
+
+        @JsonProperty(FIELD_NAME_FLINK_VERSION)
+        private final String flinkVersion;
+
+        @JsonProperty(FIELD_NAME_NODES)
+        private final ExecNode<?>[] topologicalOrderingNodes;
+
+        public TopologicalExecNodeGraph(ExecNodeGraph execGraph) {
+            topologicalOrderingNodes = 
getAllNodesAsTopologicalOrdering(execGraph);
+            this.flinkVersion = execGraph.getFlinkVersion();
+        }
+
+        @JsonCreator
+        public TopologicalExecNodeGraph(
+                @JsonProperty(FIELD_NAME_FLINK_VERSION) String flinkVersion,
+                @JsonProperty(FIELD_NAME_NODES) ExecNode<?>[] 
topologicalOrderingNodes) {
+            this.flinkVersion = flinkVersion;
+            this.topologicalOrderingNodes = topologicalOrderingNodes;
+        }
+
+        private ExecNode<?>[] getAllNodesAsTopologicalOrdering(ExecNodeGraph 
execGraph) {
+            final List<ExecNode<?>> allNodes = new ArrayList<>();
+            final Set<Integer> nodesIds = new HashSet<>();
+            // for quick search
+            final Set<ExecNode<?>> visitedNodes = Sets.newIdentityHashSet();
+
+            final ExecNodeVisitor visitor =
+                    new ExecNodeVisitorImpl() {
+                        @Override
+                        public void visit(ExecNode<?> node) {
+                            if (visitedNodes.contains(node)) {
+                                return;
+                            }
+                            super.visitInputs(node);
+
+                            final int id = node.getId();
+                            if (nodesIds.contains(id)) {
+                                throw new TableException(
+                                        String.format(
+                                                "The id: %s is not unique for 
ExecNode: %s.\nplease check it.",
+                                                id, node.getDesc()));
+                            }
+
+                            allNodes.add(node);
+                            nodesIds.add(id);
+                            visitedNodes.add(node);
+                        }
+                    };
+
+            execGraph.getRootNodes().forEach(visitor::visit);
+            checkArgument(allNodes.size() == nodesIds.size());
+            return allNodes.toArray(new ExecNode<?>[0]);
+        }
+
+        public ExecNodeGraph 
convertToExecNodeGraph(TopologicalExecNodeGraphDeContext ctx) {
+            for (ExecNode<?> execNode : topologicalOrderingNodes) {
+                List<ExecNode<?>> inputNodes = 
ctx.getInputNodes(execNode.getId());
+                ((ExecNodeBase<?>) execNode).setInputNodes(inputNodes);
+            }
+            List<ExecNode<?>> rootNodes = ctx.getRootNodes();
+            return new ExecNodeGraph(rootNodes);
+        }
+    }
+
+    /** JSON deserializer for {@link ExecNodeBase}. */
+    public static class ExecNodeDeserializer extends 
StdDeserializer<ExecNodeBase<?>> {

Review comment:
       I think it is better to mark such Deserializer top level class instead 
of inner class, because they are so important in the serialization




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to