Efrat19 commented on code in PR #26320: URL: https://github.com/apache/flink/pull/26320#discussion_r2053924957
########## docs/layouts/shortcodes/generated/rest_v1_dispatcher.html: ########## @@ -509,7 +509,72 @@ "properties" : { "plan" : { "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo:RawJson" + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo:Plan", Review Comment: Thanks. Addressed in a followup PR https://github.com/apache/flink/pull/26488 ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java: ########## @@ -102,74 +95,56 @@ public static String generatePlan( operatorDescr = StringEscapeUtils.escapeHtml4(operatorDescr); operatorDescr = operatorDescr.replace("\n", "<br/>"); - gen.writeStartObject(); - - // write the core properties JobVertexID vertexID = vertex.getID(); - gen.writeStringField("id", vertexID.toString()); - gen.writeNumberField( - "parallelism", + long parallelism = vertexParallelism .getParallelismOptional(vertexID) - .orElse(vertex.getParallelism())); - gen.writeStringField("operator", operator); - gen.writeStringField("operator_strategy", operatorDescr); - gen.writeStringField("description", description); + .orElse(vertex.getParallelism()); + Collection<JobPlanInfo.Plan.Node.Input> inputs = new ArrayList<>(); if (!vertex.isInputVertex()) { - // write the input edge properties - gen.writeArrayFieldStart("inputs"); - - List<JobEdge> inputs = vertex.getInputs(); - for (int inputNum = 0; inputNum < inputs.size(); inputNum++) { - JobEdge edge = inputs.get(inputNum); + for (int inputNum = 0; inputNum < vertex.getInputs().size(); inputNum++) { + JobEdge edge = vertex.getInputs().get(inputNum); if (edge.getSource() == null) { continue; } - JobVertex predecessor = edge.getSource().getProducer(); + if (predecessor == null || predecessor.getID() == null) { + continue; + } + String inputId = predecessor.getID().toString(); + if (edge.getSource().getResultType() == null + || edge.getSource().getResultType().name() == null) { + continue; + } + String exchange = edge.getSource().getResultType().name().toLowerCase(); String shipStrategy = edge.getShipStrategyName(); String preProcessingOperation = edge.getPreProcessingOperationName(); String operatorLevelCaching = edge.getOperatorLevelCachingDescription(); - gen.writeStartObject(); - gen.writeNumberField("num", inputNum); - gen.writeStringField("id", predecessor.getID().toString()); - - if (shipStrategy != null) { - gen.writeStringField("ship_strategy", shipStrategy); - } - if (preProcessingOperation != null) { - gen.writeStringField("local_strategy", preProcessingOperation); - } - if (operatorLevelCaching != null) { - gen.writeStringField("caching", operatorLevelCaching); - } - - gen.writeStringField( - "exchange", edge.getSource().getResultType().name().toLowerCase()); - - gen.writeEndObject(); + inputs.add( + new JobPlanInfo.Plan.Node.Input( + inputId, + inputNum, + exchange, + shipStrategy, + preProcessingOperation, + operatorLevelCaching)); } - - gen.writeEndArray(); } - - // write the optimizer properties - gen.writeFieldName("optimizer_properties"); - gen.writeRawValue(optimizerProps); Review Comment: https://github.com/apache/flink/pull/26488 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org