Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5397#discussion_r166226318 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java --- @@ -137,11 +151,28 @@ public static JobManagerServices fromConfiguration( Hardware.getNumberCPUCores(), new ExecutorThreadFactory("jobmanager-future")); + final StackTraceSampleCoordinator stackTraceSampleCoordinator = + new StackTraceSampleCoordinator(futureExecutor, timeout.toMillis()); + final BackPressureStatsTracker backPressureStatsTracker = new BackPressureStatsTracker( + stackTraceSampleCoordinator, + config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL), + config.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES), + config.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL), + Time.milliseconds(config.getInteger(WebOptions.BACKPRESSURE_DELAY))); + + futureExecutor.scheduleWithFixedDelay( --- End diff -- Not sure whether we should schedule the cleanup here because that way we don't have access to the `ScheduledFuture` which we should cancel once we shut the `BackPressureStatsTracker` down. I think this should happen in the `JobMaster`.
---