Efrat19 commented on code in PR #26320:
URL: https://github.com/apache/flink/pull/26320#discussion_r2051997572


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java:
##########
@@ -102,74 +95,56 @@ public static String generatePlan(
                 operatorDescr = StringEscapeUtils.escapeHtml4(operatorDescr);
                 operatorDescr = operatorDescr.replace("\n", "<br/>");
 
-                gen.writeStartObject();
-
-                // write the core properties
                 JobVertexID vertexID = vertex.getID();
-                gen.writeStringField("id", vertexID.toString());
-                gen.writeNumberField(
-                        "parallelism",
+                long parallelism =
                         vertexParallelism
                                 .getParallelismOptional(vertexID)
-                                .orElse(vertex.getParallelism()));
-                gen.writeStringField("operator", operator);
-                gen.writeStringField("operator_strategy", operatorDescr);
-                gen.writeStringField("description", description);
+                                .orElse(vertex.getParallelism());
+                Collection<JobPlanInfo.Plan.Node.Input> inputs = new 
ArrayList<>();
 
                 if (!vertex.isInputVertex()) {
-                    // write the input edge properties
-                    gen.writeArrayFieldStart("inputs");
-
-                    List<JobEdge> inputs = vertex.getInputs();
-                    for (int inputNum = 0; inputNum < inputs.size(); 
inputNum++) {
-                        JobEdge edge = inputs.get(inputNum);
+                    for (int inputNum = 0; inputNum < 
vertex.getInputs().size(); inputNum++) {
+                        JobEdge edge = vertex.getInputs().get(inputNum);
                         if (edge.getSource() == null) {
                             continue;
                         }
-
                         JobVertex predecessor = edge.getSource().getProducer();
+                        if (predecessor == null || predecessor.getID() == 
null) {
+                            continue;
+                        }
+                        String inputId = predecessor.getID().toString();
 
+                        if (edge.getSource().getResultType() == null
+                                || edge.getSource().getResultType().name() == 
null) {
+                            continue;
+                        }
+                        String exchange = 
edge.getSource().getResultType().name().toLowerCase();
                         String shipStrategy = edge.getShipStrategyName();
                         String preProcessingOperation = 
edge.getPreProcessingOperationName();
                         String operatorLevelCaching = 
edge.getOperatorLevelCachingDescription();
 
-                        gen.writeStartObject();
-                        gen.writeNumberField("num", inputNum);
-                        gen.writeStringField("id", 
predecessor.getID().toString());
-
-                        if (shipStrategy != null) {
-                            gen.writeStringField("ship_strategy", 
shipStrategy);
-                        }
-                        if (preProcessingOperation != null) {
-                            gen.writeStringField("local_strategy", 
preProcessingOperation);
-                        }
-                        if (operatorLevelCaching != null) {
-                            gen.writeStringField("caching", 
operatorLevelCaching);
-                        }
-
-                        gen.writeStringField(
-                                "exchange", 
edge.getSource().getResultType().name().toLowerCase());
-
-                        gen.writeEndObject();
+                        inputs.add(
+                                new JobPlanInfo.Plan.Node.Input(
+                                        inputId,
+                                        inputNum,
+                                        exchange,
+                                        shipStrategy,
+                                        preProcessingOperation,
+                                        operatorLevelCaching));
                     }
-
-                    gen.writeEndArray();
                 }
-
-                // write the optimizer properties
-                gen.writeFieldName("optimizer_properties");
-                gen.writeRawValue(optimizerProps);

Review Comment:
   Noticed `optimizerProps` is a string but is written as raw json - I should 
probably replicate that behavior
   Will be addressed in a followup pr



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java:
##########
@@ -102,74 +95,56 @@ public static String generatePlan(
                 operatorDescr = StringEscapeUtils.escapeHtml4(operatorDescr);
                 operatorDescr = operatorDescr.replace("\n", "<br/>");
 
-                gen.writeStartObject();
-
-                // write the core properties
                 JobVertexID vertexID = vertex.getID();
-                gen.writeStringField("id", vertexID.toString());
-                gen.writeNumberField(
-                        "parallelism",
+                long parallelism =
                         vertexParallelism
                                 .getParallelismOptional(vertexID)
-                                .orElse(vertex.getParallelism()));
-                gen.writeStringField("operator", operator);
-                gen.writeStringField("operator_strategy", operatorDescr);
-                gen.writeStringField("description", description);
+                                .orElse(vertex.getParallelism());
+                Collection<JobPlanInfo.Plan.Node.Input> inputs = new 
ArrayList<>();
 
                 if (!vertex.isInputVertex()) {
-                    // write the input edge properties
-                    gen.writeArrayFieldStart("inputs");
-
-                    List<JobEdge> inputs = vertex.getInputs();
-                    for (int inputNum = 0; inputNum < inputs.size(); 
inputNum++) {
-                        JobEdge edge = inputs.get(inputNum);
+                    for (int inputNum = 0; inputNum < 
vertex.getInputs().size(); inputNum++) {
+                        JobEdge edge = vertex.getInputs().get(inputNum);
                         if (edge.getSource() == null) {
                             continue;
                         }
-
                         JobVertex predecessor = edge.getSource().getProducer();
+                        if (predecessor == null || predecessor.getID() == 
null) {
+                            continue;
+                        }
+                        String inputId = predecessor.getID().toString();
 
+                        if (edge.getSource().getResultType() == null
+                                || edge.getSource().getResultType().name() == 
null) {
+                            continue;
+                        }
+                        String exchange = 
edge.getSource().getResultType().name().toLowerCase();
                         String shipStrategy = edge.getShipStrategyName();
                         String preProcessingOperation = 
edge.getPreProcessingOperationName();
                         String operatorLevelCaching = 
edge.getOperatorLevelCachingDescription();
 
-                        gen.writeStartObject();
-                        gen.writeNumberField("num", inputNum);
-                        gen.writeStringField("id", 
predecessor.getID().toString());
-
-                        if (shipStrategy != null) {
-                            gen.writeStringField("ship_strategy", 
shipStrategy);
-                        }
-                        if (preProcessingOperation != null) {
-                            gen.writeStringField("local_strategy", 
preProcessingOperation);
-                        }
-                        if (operatorLevelCaching != null) {
-                            gen.writeStringField("caching", 
operatorLevelCaching);
-                        }
-
-                        gen.writeStringField(
-                                "exchange", 
edge.getSource().getResultType().name().toLowerCase());
-
-                        gen.writeEndObject();
+                        inputs.add(
+                                new JobPlanInfo.Plan.Node.Input(
+                                        inputId,
+                                        inputNum,
+                                        exchange,
+                                        shipStrategy,
+                                        preProcessingOperation,
+                                        operatorLevelCaching));
                     }
-
-                    gen.writeEndArray();
                 }
-
-                // write the optimizer properties
-                gen.writeFieldName("optimizer_properties");
-                gen.writeRawValue(optimizerProps);

Review Comment:
   `optimizerProps` is a string but is written as raw json - I should probably 
replicate that behavior
   Will be addressed in a followup pr



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to