Efrat19 commented on code in PR #26320: URL: https://github.com/apache/flink/pull/26320#discussion_r2051997572
########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java: ########## @@ -102,74 +95,56 @@ public static String generatePlan( operatorDescr = StringEscapeUtils.escapeHtml4(operatorDescr); operatorDescr = operatorDescr.replace("\n", "<br/>"); - gen.writeStartObject(); - - // write the core properties JobVertexID vertexID = vertex.getID(); - gen.writeStringField("id", vertexID.toString()); - gen.writeNumberField( - "parallelism", + long parallelism = vertexParallelism .getParallelismOptional(vertexID) - .orElse(vertex.getParallelism())); - gen.writeStringField("operator", operator); - gen.writeStringField("operator_strategy", operatorDescr); - gen.writeStringField("description", description); + .orElse(vertex.getParallelism()); + Collection<JobPlanInfo.Plan.Node.Input> inputs = new ArrayList<>(); if (!vertex.isInputVertex()) { - // write the input edge properties - gen.writeArrayFieldStart("inputs"); - - List<JobEdge> inputs = vertex.getInputs(); - for (int inputNum = 0; inputNum < inputs.size(); inputNum++) { - JobEdge edge = inputs.get(inputNum); + for (int inputNum = 0; inputNum < vertex.getInputs().size(); inputNum++) { + JobEdge edge = vertex.getInputs().get(inputNum); if (edge.getSource() == null) { continue; } - JobVertex predecessor = edge.getSource().getProducer(); + if (predecessor == null || predecessor.getID() == null) { + continue; + } + String inputId = predecessor.getID().toString(); + if (edge.getSource().getResultType() == null + || edge.getSource().getResultType().name() == null) { + continue; + } + String exchange = edge.getSource().getResultType().name().toLowerCase(); String shipStrategy = edge.getShipStrategyName(); String preProcessingOperation = edge.getPreProcessingOperationName(); String operatorLevelCaching = edge.getOperatorLevelCachingDescription(); - gen.writeStartObject(); - gen.writeNumberField("num", inputNum); - gen.writeStringField("id", predecessor.getID().toString()); - - if (shipStrategy != null) { - gen.writeStringField("ship_strategy", shipStrategy); - } - if (preProcessingOperation != null) { - gen.writeStringField("local_strategy", preProcessingOperation); - } - if (operatorLevelCaching != null) { - gen.writeStringField("caching", operatorLevelCaching); - } - - gen.writeStringField( - "exchange", edge.getSource().getResultType().name().toLowerCase()); - - gen.writeEndObject(); + inputs.add( + new JobPlanInfo.Plan.Node.Input( + inputId, + inputNum, + exchange, + shipStrategy, + preProcessingOperation, + operatorLevelCaching)); } - - gen.writeEndArray(); } - - // write the optimizer properties - gen.writeFieldName("optimizer_properties"); - gen.writeRawValue(optimizerProps); Review Comment: Noticed `optimizerProps` is a string but is written as raw json - I should probably replicate that behavior Will be addressed in a followup pr ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java: ########## @@ -102,74 +95,56 @@ public static String generatePlan( operatorDescr = StringEscapeUtils.escapeHtml4(operatorDescr); operatorDescr = operatorDescr.replace("\n", "<br/>"); - gen.writeStartObject(); - - // write the core properties JobVertexID vertexID = vertex.getID(); - gen.writeStringField("id", vertexID.toString()); - gen.writeNumberField( - "parallelism", + long parallelism = vertexParallelism .getParallelismOptional(vertexID) - .orElse(vertex.getParallelism())); - gen.writeStringField("operator", operator); - gen.writeStringField("operator_strategy", operatorDescr); - gen.writeStringField("description", description); + .orElse(vertex.getParallelism()); + Collection<JobPlanInfo.Plan.Node.Input> inputs = new ArrayList<>(); if (!vertex.isInputVertex()) { - // write the input edge properties - gen.writeArrayFieldStart("inputs"); - - List<JobEdge> inputs = vertex.getInputs(); - for (int inputNum = 0; inputNum < inputs.size(); inputNum++) { - JobEdge edge = inputs.get(inputNum); + for (int inputNum = 0; inputNum < vertex.getInputs().size(); inputNum++) { + JobEdge edge = vertex.getInputs().get(inputNum); if (edge.getSource() == null) { continue; } - JobVertex predecessor = edge.getSource().getProducer(); + if (predecessor == null || predecessor.getID() == null) { + continue; + } + String inputId = predecessor.getID().toString(); + if (edge.getSource().getResultType() == null + || edge.getSource().getResultType().name() == null) { + continue; + } + String exchange = edge.getSource().getResultType().name().toLowerCase(); String shipStrategy = edge.getShipStrategyName(); String preProcessingOperation = edge.getPreProcessingOperationName(); String operatorLevelCaching = edge.getOperatorLevelCachingDescription(); - gen.writeStartObject(); - gen.writeNumberField("num", inputNum); - gen.writeStringField("id", predecessor.getID().toString()); - - if (shipStrategy != null) { - gen.writeStringField("ship_strategy", shipStrategy); - } - if (preProcessingOperation != null) { - gen.writeStringField("local_strategy", preProcessingOperation); - } - if (operatorLevelCaching != null) { - gen.writeStringField("caching", operatorLevelCaching); - } - - gen.writeStringField( - "exchange", edge.getSource().getResultType().name().toLowerCase()); - - gen.writeEndObject(); + inputs.add( + new JobPlanInfo.Plan.Node.Input( + inputId, + inputNum, + exchange, + shipStrategy, + preProcessingOperation, + operatorLevelCaching)); } - - gen.writeEndArray(); } - - // write the optimizer properties - gen.writeFieldName("optimizer_properties"); - gen.writeRawValue(optimizerProps); Review Comment: `optimizerProps` is a string but is written as raw json - I should probably replicate that behavior Will be addressed in a followup pr -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org