[
https://issues.apache.org/jira/browse/FLINK-4478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535622#comment-15535622
]
ASF GitHub Bot commented on FLINK-4478:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2435#discussion_r81313363
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
---
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Heartbeat manager implementation. The heartbeat manager maintains a map
of heartbeat monitors
+ * and resource IDs. Each monitor will be updated when a new heartbeat of
the associated machine has
+ * been received. If the monitor detects that a heartbeat has timed out,
it will notify the
+ * {@link HeartbeatListener} about it. A heartbeat times out iff no
heartbeat signal has been
+ * received within a given timeout interval.
+ *
+ * @param <I> Type of the incoming heartbeat payload
+ * @param <O> Type of the outgoing heartbeat payload
+ */
+@ThreadSafe
+public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O>,
HeartbeatTarget<I> {
+
+ /** Heartbeat timeout interval in milli seconds */
+ private final long heartbeatTimeoutIntervalMs;
+
+ /** Resource ID which is used to mark one own's heartbeat signals */
+ private final ResourceID ownResourceID;
+
+ /** Executor service used to run heartbeat timeout notifications */
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ protected final Logger log;
+
+ /** Map containing the heartbeat monitors associated with the
respective resource ID */
+ private final ConcurrentHashMap<ResourceID,
HeartbeatManagerImpl.HeartbeatMonitor<O>> heartbeatTargets;
+
+ /** Execution context used to run future callbacks */
+ private final Executor executor;
+
+ /** Heartbeat listener with which the heartbeat manager has been
associated */
+ private HeartbeatListener<I, O> heartbeatListener;
+
+ /** Running state of the heartbeat manager */
+ protected boolean stopped;
+
+ public HeartbeatManagerImpl(
+ long heartbeatTimeoutIntervalMs,
+ ResourceID ownResourceID,
+ Executor executor,
+ ScheduledExecutorService scheduledExecutorService,
+ Logger log) {
+ Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L,
"The heartbeat timeout has to be larger than 0.");
+
+ this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
+ this.ownResourceID = Preconditions.checkNotNull(ownResourceID);
+ this.scheduledExecutorService =
Preconditions.checkNotNull(scheduledExecutorService);
+ this.log = Preconditions.checkNotNull(log);
+ this.executor = Preconditions.checkNotNull(executor);
+ this.heartbeatTargets = new ConcurrentHashMap<>(16);
+
+ stopped = true;
+ }
+
+
//----------------------------------------------------------------------------------------------
+ // Getters
+
//----------------------------------------------------------------------------------------------
+
+ ResourceID getOwnResourceID() {
+ return ownResourceID;
+ }
+
+ Executor getExecutor() {
+ return executor;
+ }
+
+ HeartbeatListener<I, O> getHeartbeatListener() {
+ return heartbeatListener;
+ }
+
+ Collection<HeartbeatManagerImpl.HeartbeatMonitor<O>>
getHeartbeatTargets() {
+ return heartbeatTargets.values();
+ }
+
+
//----------------------------------------------------------------------------------------------
+ // HeartbeatManager methods
+
//----------------------------------------------------------------------------------------------
+
+ @Override
+ public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O>
heartbeatTarget) {
+ if (!stopped) {
+ if (heartbeatTargets.containsKey(resourceID)) {
+ log.info("The target with resource ID {} is
already been monitored.", resourceID);
+ } else {
+ HeartbeatManagerImpl.HeartbeatMonitor<O>
heartbeatMonitor = new HeartbeatManagerImpl.HeartbeatMonitor<>(
+ resourceID,
+ heartbeatTarget,
+ scheduledExecutorService,
+ heartbeatListener,
+ heartbeatTimeoutIntervalMs);
+
+ heartbeatTargets.put(
+ resourceID,
+ heartbeatMonitor);
+
+ // check if we have stopped in the meantime
(concurrent stop operation)
+ if (stopped) {
+ heartbeatMonitor.cancel();
+
+ heartbeatTargets.remove(resourceID);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void unmonitorTarget(ResourceID resourceID) {
+ if (!stopped) {
+ HeartbeatManagerImpl.HeartbeatMonitor<O>
heartbeatMonitor = heartbeatTargets.remove(resourceID);
+
+ if (heartbeatMonitor != null) {
+ heartbeatMonitor.cancel();
+ }
+ }
+ }
+
+ @Override
+ public void start(HeartbeatListener<I, O> heartbeatListener) {
+ Preconditions.checkState(stopped, "Cannot start an already
started heartbeat manager.");
+
+ stopped = false;
+
+ this.heartbeatListener =
Preconditions.checkNotNull(heartbeatListener);
+ }
+
+ @Override
+ public void stop() {
+ stopped = true;
+
+ for (HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor
: heartbeatTargets.values()) {
+ heartbeatMonitor.cancel();
+ }
+
+ heartbeatTargets.clear();
+ }
+
+
//----------------------------------------------------------------------------------------------
+ // HeartbeatTarget methods
+
//----------------------------------------------------------------------------------------------
+
+ @Override
+ public void sendHeartbeat(ResourceID resourceID, I payload) {
+ if (!stopped) {
+ log.debug("Received heartbeat from {}.", resourceID);
+ reportHeartbeat(resourceID);
+
+ if (payload != null) {
+ heartbeatListener.reportPayload(resourceID,
payload);
+ }
+ }
+ }
+
+ @Override
+ public void requestHeartbeat(ResourceID resourceID, I payload) {
+ if (!stopped) {
+ log.debug("Received heartbeat request from {}.",
resourceID);
+
+ final HeartbeatTarget<O> heartbeatTarget =
reportHeartbeat(resourceID);
+
+ if (heartbeatTarget != null) {
+ if (payload != null) {
+
heartbeatListener.reportPayload(resourceID, payload);
+ }
+
+ Future<O> futurePayload =
heartbeatListener.retrievePayload();
+
+ if (futurePayload != null) {
+ futurePayload.thenAcceptAsync(new
AcceptFunction<O>() {
+ @Override
+ public void accept(O
retrievedPayload) {
+
heartbeatTarget.sendHeartbeat(getOwnResourceID(), retrievedPayload);
+ }
+ }, executor);
+ } else {
+
heartbeatTarget.sendHeartbeat(ownResourceID, null);
+ }
+ }
+ }
+ }
+
+ HeartbeatTarget<O> reportHeartbeat(ResourceID resourceID) {
+ if (heartbeatTargets.containsKey(resourceID)) {
+ HeartbeatManagerImpl.HeartbeatMonitor<O>
heartbeatMonitor = heartbeatTargets.get(resourceID);
+ heartbeatMonitor.reportHeartbeat();
+
+ return heartbeatMonitor.getHeartbeatTarget();
+ } else {
+ return null;
+ }
+ }
+
+
//----------------------------------------------------------------------------------------------
+ // Utility classes
+
//----------------------------------------------------------------------------------------------
+
+ /**
+ * Heartbeat monitor which manages the heartbeat state of the
associated heartbeat target. The
+ * monitor notifies the {@link HeartbeatListener} whenever it has not
seen a heartbeat signal
+ * in the specified heartbeat timeout interval. Each heartbeat signal
resets this timer.
+ *
+ * @param <O> Type of the payload being sent to the associated
heartbeat target
+ */
+ static class HeartbeatMonitor<O> implements Runnable {
+
+ /** Resource ID of the monitored heartbeat target */
+ private final ResourceID resourceID;
+
+ /** Associated heartbeat target */
+ private final HeartbeatTarget<O> heartbeatTarget;
+
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ /** Listener which is notified about heartbeat timeouts */
+ private final HeartbeatListener<?, ?> heartbeatListener;
+
+ /** Maximum heartbeat timeout interval */
+ private final long heartbeatTimeoutIntervalMs;
+
+ private volatile ScheduledFuture<?> futureTimeout;
+
+ private final AtomicReference<State> state = new
AtomicReference<>(State.RUNNING);
+
+ HeartbeatMonitor(
+ ResourceID resourceID,
+ HeartbeatTarget<O> heartbeatTarget,
+ ScheduledExecutorService scheduledExecutorService,
+ HeartbeatListener<?, O> heartbeatListener,
+ long heartbeatTimeoutIntervalMs) {
+
+ this.resourceID =
Preconditions.checkNotNull(resourceID);
+ this.heartbeatTarget =
Preconditions.checkNotNull(heartbeatTarget);
+ this.scheduledExecutorService =
Preconditions.checkNotNull(scheduledExecutorService);
+ this.heartbeatListener =
Preconditions.checkNotNull(heartbeatListener);
+
+ Preconditions.checkArgument(heartbeatTimeoutIntervalMs
>= 0L, "The heartbeat timeout interval has to be larger than 0.");
+ this.heartbeatTimeoutIntervalMs =
heartbeatTimeoutIntervalMs;
+
+ resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
+ }
+
+ HeartbeatTarget<O> getHeartbeatTarget() {
+ return heartbeatTarget;
+ }
+
+ void reportHeartbeat() {
+ resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
+ }
+
+ void resetHeartbeatTimeout(long heartbeatTimeout) {
+ if (state.get() == State.RUNNING) {
+ cancelTimeout();
+
+ futureTimeout =
scheduledExecutorService.schedule(this, heartbeatTimeout,
TimeUnit.MILLISECONDS);
+
+ // Double check for concurrent accesses (e.g. a
firing of the scheduled future)
+ if (state.get() != State.RUNNING) {
+ cancelTimeout();
+ }
--- End diff --
This looks like an optimization. Is is really necessary? We can still
change state after we have checked.
> Implement heartbeat logic
> -------------------------
>
> Key: FLINK-4478
> URL: https://issues.apache.org/jira/browse/FLINK-4478
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Coordination
> Affects Versions: 1.1.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> With the Flip-6 refactoring, we'll have the need for a dedicated heartbeat
> component. The heartbeat component is used to check the liveliness of the
> distributed components among each other. Furthermore, heartbeats are used to
> regularly transmit status updates to another component. For example, the
> TaskManager informs the ResourceManager with each heartbeat about the current
> slot allocation.
> The heartbeat is initiated from one component. This component sends a
> heartbeat request to another component which answers with an heartbeat
> response. Thus, one can differentiate between a sending and a receiving side.
> Apart from the triggering of the heartbeat request, the logic of treating
> heartbeats, marking components dead and payload delivery are the same and
> should be reusable by different distributed components (JM, TM, RM).
> Different models for the heartbeat reporting are conceivable. First of all,
> the heartbeat request could be sent as an ask operation where the heartbeat
> response is returned as a future on the sending side. Alternatively, the
> sending side could request a heartbeat response by sending a tell message.
> The heartbeat response is then delivered by an RPC back to the heartbeat
> sender. The latter model has the advantage that a heartbeat response is not
> tightly coupled to a heartbeat request. Such a tight coupling could cause
> that heartbeat response are ignored after the future has timed out even
> though they might still contain valuable information (receiver is still
> alive).
> Furthermore, different strategies for the heartbeat triggering and marking
> heartbeat targets as dead are conceivable. For example, we could periodically
> (with a fixed period) trigger a heartbeat request and mark all targets as
> dead if we didn't receive a heartbeat response in a given time period.
> Furthermore, we could adapt the heartbeat interval and heartbeat timeouts
> with respect to the latency of previous heartbeat responses. This would
> reflect the current load and network conditions better.
> For the first version, I would propose to use a fixed period heartbeat with a
> maximum heartbeat timeout before a target is marked dead. Furthermore, I
> would propose to use tell messages (fire and forget) to request and report
> heartbeats because they are the more flexible model imho.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)