Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5397#discussion_r165835483 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -867,6 +889,25 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) { } } + @Override + public CompletableFuture<Optional<OperatorBackPressureStats>> getOperatorBackPressureStats( + final JobID jobId, final JobVertexID jobVertexId) { + final ExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexId); + if (jobVertex == null) { + return FutureUtils.completedExceptionally(new FlinkException("JobVertexID not found " + + jobVertexId)); + } + + final Optional<OperatorBackPressureStats> operatorBackPressureStats = + backPressureStatsTracker.getOperatorBackPressureStats(jobVertex); + if (!operatorBackPressureStats.isPresent() || + backPressureStatsRefreshInterval <= System.currentTimeMillis() - operatorBackPressureStats.get().getEndTimestamp()) { + backPressureStatsTracker.triggerStackTraceSample(jobVertex); + return CompletableFuture.completedFuture(Optional.empty()); --- End diff -- done
---