JunRuiLee commented on code in PR #25798: URL: https://github.com/apache/flink/pull/25798#discussion_r1890418851
########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java: ########## @@ -47,6 +47,13 @@ public interface AccessExecutionGraph extends JobStatusProvider { */ String getJsonPlan(); + /** + * Returns the stream graph as a JSON string. + * + * @return stream graph as a JSON string, or empty string if the job submitted with JobGraph. Review Comment: or an empty string if the job is submitted with a JobGraph or if it's a streaming job. ########## flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java: ########## @@ -141,6 +142,11 @@ private static JobDetailsInfo createJobDetailsInfo( executionState, jobVerticesPerState[executionState.ordinal()]); } + JobPlanInfo.RawJson streamGraphJson = null; + if (!StringUtils.isNullOrWhitespaceOnly(executionGraph.getStreamGraphJson())) { Review Comment: the return value of getStreamGraphJson will not be null. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java: ########## @@ -291,6 +299,8 @@ private List<JobVertex> createJobVerticesAndUpdateGraph(List<StreamNode> streamN generateConfigForJobVertices(jobVertexBuildContext); + onStreamGraphUpdated(streamGraph); Review Comment: I think calling this method here is strange because what we want is to update the JSON due to a newly created job vertex. I suggest that we call generateStreamGraphJson directly here. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphContext.java: ########## @@ -62,4 +62,13 @@ public interface StreamGraphContext { * @return true if all modifications were successful and applied atomically, false otherwise. */ boolean modifyStreamEdge(List<StreamEdgeUpdateRequestInfo> requestInfos); + + interface StreamGraphUpdateListener { + /** + * This method is called whenever the StreamGraph is updated. + * + * @param streamGraph the updated StreamGraph + */ + void onStreamGraphUpdated(StreamGraph streamGraph); Review Comment: I think the listener already holds a stream graph, so we don't need to pass a stream graph. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphContext.java: ########## @@ -62,4 +62,13 @@ public interface StreamGraphContext { * @return true if all modifications were successful and applied atomically, false otherwise. */ boolean modifyStreamEdge(List<StreamEdgeUpdateRequestInfo> requestInfos); + + interface StreamGraphUpdateListener { Review Comment: javadoc ########## flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java: ########## @@ -79,6 +79,10 @@ public class JobDetailsInfo implements ResponseBody { public static final String FIELD_NAME_JSON_PLAN = "plan"; + public static final String FIELD_NAME_STREAM_GRAPH_JSON_PLAN = "stream-graph-plan"; + + public static final String FIELD_NAME_PENDING_OPERATOR_COUNT = "pending-operator-count"; Review Comment: pending-operators -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org