xintongsong commented on code in PR #21347:
URL: https://github.com/apache/flink/pull/21347#discussion_r1036742632


##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java:
##########
@@ -271,4 +271,9 @@ default CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoor
             @RpcTimeout Time timeout) {
         throw new UnsupportedOperationException();
     }
+
+    /** The client reports the heartbeat to the dispatcher for aliveness. */
+    default CompletableFuture<Void> reportJobClientHeartbeat(JobID jobId, Time 
timeout) {
+        return CompletableFuture.completedFuture(null);

Review Comment:
   Use `FutureUtils.completedVoidFuture()` to reduce object creation.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1016,6 +1034,47 @@ public CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoord
                                 operatorId, serializedRequest, timeout));
     }
 
+    @Override
+    public CompletableFuture<Void> reportJobClientHeartbeat(JobID jobId, Time 
timeout) {
+        if (!getJobManagerRunner(jobId).isPresent()) {
+            log.warn("Fail to find job {} for client.", jobId);
+        } else {
+            log.debug("Job {} receives client's heartbeat.", jobId);
+            markJobClientAliveness(jobId);
+            resetJobClientAlivenessCheck(jobId);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private void markJobClientAliveness(JobID jobId) {
+        if (jobClientAlivenessFutures.containsKey(jobId)) {
+            jobClientAlivenessFutures.get(jobId).complete(null);
+        }
+    }
+
+    private void resetJobClientAlivenessCheck(JobID jobId) {
+        CompletableFuture<Void> clientAlivenessFuture = new 
CompletableFuture<>();
+        jobClientAlivenessFutures.put(jobId, clientAlivenessFuture);
+        FutureUtils.orTimeout(
+                clientAlivenessFuture,
+                jobClientHeartbeatTimeout,
+                TimeUnit.MILLISECONDS,
+                getMainThreadExecutor());
+        clientAlivenessFuture.whenComplete(
+                (t, throwable) -> {
+                    if (throwable != null) {
+                        if (throwable instanceof TimeoutException) {
+                            log.warn(
+                                    "Haven't receive aliveness from job client 
and cancel the job {}.",
+                                    jobId);
+                            cancelJob(jobId, webTimeout);

Review Comment:
   The future based approach seems to be an overkill for this feature. For each 
job and on each heartbeat, it creates a new feature, which seems to be 
expensive.
   
   I'd suggest to simply record the last heartbeat timestamp of each job, and 
periodically check whether there's any job that has timed out. This is similar 
to how we check and release idle TMs. It may not trigger a timeout precisely on 
the configured time, but would be much cheaper.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1016,6 +1034,47 @@ public CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoord
                                 operatorId, serializedRequest, timeout));
     }
 
+    @Override
+    public CompletableFuture<Void> reportJobClientHeartbeat(JobID jobId, Time 
timeout) {
+        if (!getJobManagerRunner(jobId).isPresent()) {
+            log.warn("Fail to find job {} for client.", jobId);
+        } else {
+            log.debug("Job {} receives client's heartbeat.", jobId);
+            markJobClientAliveness(jobId);
+            resetJobClientAlivenessCheck(jobId);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private void markJobClientAliveness(JobID jobId) {
+        if (jobClientAlivenessFutures.containsKey(jobId)) {
+            jobClientAlivenessFutures.get(jobId).complete(null);
+        }
+    }
+
+    private void resetJobClientAlivenessCheck(JobID jobId) {
+        CompletableFuture<Void> clientAlivenessFuture = new 
CompletableFuture<>();
+        jobClientAlivenessFutures.put(jobId, clientAlivenessFuture);
+        FutureUtils.orTimeout(
+                clientAlivenessFuture,
+                jobClientHeartbeatTimeout,
+                TimeUnit.MILLISECONDS,
+                getMainThreadExecutor());
+        clientAlivenessFuture.whenComplete(
+                (t, throwable) -> {
+                    if (throwable != null) {
+                        if (throwable instanceof TimeoutException) {
+                            log.warn(
+                                    "Haven't receive aliveness from job client 
and cancel the job {}.",
+                                    jobId);
+                            cancelJob(jobId, webTimeout);

Review Comment:
   What happens if there's something wrong in canceling the job? E.g., a 
timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to