Efrat19 commented on code in PR #26320:
URL: https://github.com/apache/flink/pull/26320#discussion_r2053924957


##########
docs/layouts/shortcodes/generated/rest_v1_dispatcher.html:
##########
@@ -509,7 +509,72 @@
   "properties" : {
     "plan" : {
       "type" : "object",
-      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo:RawJson"
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo:Plan",

Review Comment:
   Thanks. Addressed in a followup PR https://github.com/apache/flink/pull/26488



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java:
##########
@@ -102,74 +95,56 @@ public static String generatePlan(
                 operatorDescr = StringEscapeUtils.escapeHtml4(operatorDescr);
                 operatorDescr = operatorDescr.replace("\n", "<br/>");
 
-                gen.writeStartObject();
-
-                // write the core properties
                 JobVertexID vertexID = vertex.getID();
-                gen.writeStringField("id", vertexID.toString());
-                gen.writeNumberField(
-                        "parallelism",
+                long parallelism =
                         vertexParallelism
                                 .getParallelismOptional(vertexID)
-                                .orElse(vertex.getParallelism()));
-                gen.writeStringField("operator", operator);
-                gen.writeStringField("operator_strategy", operatorDescr);
-                gen.writeStringField("description", description);
+                                .orElse(vertex.getParallelism());
+                Collection<JobPlanInfo.Plan.Node.Input> inputs = new 
ArrayList<>();
 
                 if (!vertex.isInputVertex()) {
-                    // write the input edge properties
-                    gen.writeArrayFieldStart("inputs");
-
-                    List<JobEdge> inputs = vertex.getInputs();
-                    for (int inputNum = 0; inputNum < inputs.size(); 
inputNum++) {
-                        JobEdge edge = inputs.get(inputNum);
+                    for (int inputNum = 0; inputNum < 
vertex.getInputs().size(); inputNum++) {
+                        JobEdge edge = vertex.getInputs().get(inputNum);
                         if (edge.getSource() == null) {
                             continue;
                         }
-
                         JobVertex predecessor = edge.getSource().getProducer();
+                        if (predecessor == null || predecessor.getID() == 
null) {
+                            continue;
+                        }
+                        String inputId = predecessor.getID().toString();
 
+                        if (edge.getSource().getResultType() == null
+                                || edge.getSource().getResultType().name() == 
null) {
+                            continue;
+                        }
+                        String exchange = 
edge.getSource().getResultType().name().toLowerCase();
                         String shipStrategy = edge.getShipStrategyName();
                         String preProcessingOperation = 
edge.getPreProcessingOperationName();
                         String operatorLevelCaching = 
edge.getOperatorLevelCachingDescription();
 
-                        gen.writeStartObject();
-                        gen.writeNumberField("num", inputNum);
-                        gen.writeStringField("id", 
predecessor.getID().toString());
-
-                        if (shipStrategy != null) {
-                            gen.writeStringField("ship_strategy", 
shipStrategy);
-                        }
-                        if (preProcessingOperation != null) {
-                            gen.writeStringField("local_strategy", 
preProcessingOperation);
-                        }
-                        if (operatorLevelCaching != null) {
-                            gen.writeStringField("caching", 
operatorLevelCaching);
-                        }
-
-                        gen.writeStringField(
-                                "exchange", 
edge.getSource().getResultType().name().toLowerCase());
-
-                        gen.writeEndObject();
+                        inputs.add(
+                                new JobPlanInfo.Plan.Node.Input(
+                                        inputId,
+                                        inputNum,
+                                        exchange,
+                                        shipStrategy,
+                                        preProcessingOperation,
+                                        operatorLevelCaching));
                     }
-
-                    gen.writeEndArray();
                 }
-
-                // write the optimizer properties
-                gen.writeFieldName("optimizer_properties");
-                gen.writeRawValue(optimizerProps);

Review Comment:
   https://github.com/apache/flink/pull/26488



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to