yuchen-ecnu commented on code in PR #25798: URL: https://github.com/apache/flink/pull/25798#discussion_r1893922934
########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java: ########## @@ -47,6 +47,13 @@ public interface AccessExecutionGraph extends JobStatusProvider { */ String getJsonPlan(); + /** + * Returns the stream graph as a JSON string. + * + * @return stream graph as a JSON string, or empty string if the job submitted with JobGraph. Review Comment: The interface was named `getStreamGraphJson` for retrieving the stream graph. And the adaptive execution was based on the stream graph rather than job graph. ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java: ########## @@ -199,4 +206,12 @@ public interface AccessExecutionGraph extends JobStatusProvider { * @return The changelog storage name, or an empty Optional in the case of batch jobs */ Optional<String> getChangelogStorageName(); + + /** + * Retrieves the count of pending operators waiting to be transferred to job vertices in the + * adaptive execution of batch jobs. + * + * @return the number of pending operators. + */ + int getPendingOperatorCount(); Review Comment: I think the method name should be general ones in `AccessExecutionGraph`. Since it defines the interface to retrieve the runtime informations of the execution graph and doesn't distinguish whether it's a streaming or a batch job. ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java: ########## @@ -170,4 +174,54 @@ public static String generatePlan( throw new RuntimeException("Failed to generate plan", e); } } + + public static String generateStreamGraphJson( + StreamGraph sg, Map<Integer, JobVertexID> jobVertexIdMap) { + try (final StringWriter writer = new StringWriter(1024)) { + try (final JsonGenerator gen = new JsonFactory().createGenerator(writer)) { + // start of everything + gen.writeStartObject(); + + gen.writeArrayFieldStart("nodes"); + + // info per vertex + for (StreamNode node : sg.getStreamNodes()) { + gen.writeStartObject(); + gen.writeStringField("id", String.valueOf(node.getId())); Review Comment: Just like `org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator#generatePlan`, maybe we should rely on as few third-party frameworks as possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org