xintongsong commented on code in PR #21447:
URL: https://github.com/apache/flink/pull/21447#discussion_r1049319867


##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java:
##########
@@ -151,60 +146,88 @@
     }
 
     @Override
-    public Optional<T> getVertexStats(JobID jobId, AccessExecutionJobVertex 
vertex) {
+    public Optional<ThreadInfoStats> getVertexStats(JobID jobId, 
AccessExecutionJobVertex vertex) {
         synchronized (lock) {
-            final Key key = getKey(jobId, vertex);
+            List<AccessExecutionVertex> needRefreshedExecutionVertices = new 
ArrayList<>();
+            List<ThreadInfoSample> results = new ArrayList<>();
+
+            int requestId = Integer.MAX_VALUE;
+            long startTime = Long.MAX_VALUE;
+            long endTime = Long.MIN_VALUE;
+            for (AccessExecutionVertex executionVertex : 
vertex.getTaskVertices()) {
+                Key key = getKey(jobId, executionVertex);
+                final ThreadInfoStats stats = 
executionVertexStatsCache.getIfPresent(key);
+                if (stats != null) {
+                    results.addAll(stats.getSamples());
+                    requestId = Math.min(requestId, stats.getRequestId());
+                    startTime = Math.min(startTime, stats.getStartTime());
+                    endTime = Math.max(endTime, stats.getEndTime());

Review Comment:
   Not sure about these aggregations.
   - For `requestId`, there's no longer 1-1 mappings between the pending 
requests and the thread info stats.
   - For `start/endTime`, it does not sounds right sampling a group of tasks 
with different start / end time.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmUtils.java:
##########
@@ -53,29 +56,32 @@ public static Collection<ThreadInfo> createThreadDump() {
      * Creates a {@link ThreadInfoSample} for a specific thread. Contains 
thread traces if
      * maxStackTraceDepth > 0.
      *
+     * @param executionId The execution id of this threadInfo.
      * @param threadId The ID of the thread to create the thread dump for.
      * @param maxStackTraceDepth The maximum number of entries in the stack 
trace to be collected.
      * @return The thread information of a specific thread.
      */
     public static Optional<ThreadInfoSample> createThreadInfoSample(
-            long threadId, int maxStackTraceDepth) {
+            ExecutionAttemptID executionId, long threadId, int 
maxStackTraceDepth) {

Review Comment:
   A `JvmUtils` should not be aware of the Flink DAG concepts such as 
`ExecutionAttemptID` and `SampleableTask`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/messages/ThreadInfoSample.java:
##########
@@ -32,25 +36,33 @@
  */
 public class ThreadInfoSample implements Serializable {
 
+    private final ExecutionAttemptID executionId;

Review Comment:
   Not sure about including the `executionId` in `ThreadInfoSample`. I think it 
changes `ThreadInfoSample` from a generalized class that wraps parts of 
`java.lang.management.ThreadInfo` to a specialized class that only serves for a 
flink execution. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to