[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15424489#comment-15424489 ]
ASF GitHub Bot commented on FLINK-1984: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75119617 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.SharedValue; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.StateStorageHelper; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.mesos.Protos; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A ZooKeeper-backed Mesos worker store. + */ +public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperMesosWorkerStore.class); + + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + /** A persistent value of the assigned framework ID */ + private final SharedValue frameworkIdInZooKeeper; + + /** A persistent count of all tasks created, for generating unique IDs */ + private final SharedCount totalTaskCountInZooKeeper; + + /** A persistent store of serialized workers */ + private final ZooKeeperStateHandleStore<MesosWorkerStore.Worker> workersInZooKeeper; + + @SuppressWarnings("unchecked") + ZooKeeperMesosWorkerStore( + CuratorFramework client, + String storePath, + StateStorageHelper<MesosWorkerStore.Worker> stateStorage + ) throws Exception { + checkNotNull(storePath, "storePath"); + checkNotNull(stateStorage, "stateStorage"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = checkNotNull(client, "client"); + + // All operations will have the given path as root + client.newNamespaceAwareEnsurePath(storePath).ensure(client.getZookeeperClient()); + CuratorFramework facade = client.usingNamespace(client.getNamespace() + storePath); + + // Track the assignd framework ID. + frameworkIdInZooKeeper = new SharedValue(facade, "/frameworkId", new byte[0]); + + // Keep a count of all tasks created ever, as the basis for a unique ID. + totalTaskCountInZooKeeper = new SharedCount(facade, "/count", 0); + + // Keep track of the workers in state handle storage. + facade.newNamespaceAwareEnsurePath("/workers").ensure(client.getZookeeperClient()); + CuratorFramework storeFacade = client.usingNamespace(facade.getNamespace() + "/workers"); + + this.workersInZooKeeper = ZooKeeperStateHandleStore.class + .getConstructor(CuratorFramework.class, StateStorageHelper.class) + .newInstance(storeFacade, stateStorage); + } + + @Override + public void start() throws Exception { + synchronized (cacheLock) { + if (!isRunning) { + isRunning = true; + frameworkIdInZooKeeper.start(); + totalTaskCountInZooKeeper.start(); + } + } + } + + public void stop() throws Exception { + synchronized (cacheLock) { + if (isRunning) { + frameworkIdInZooKeeper.close(); + totalTaskCountInZooKeeper.close(); + client.close(); + isRunning = false; + } + } + } + + /** + * Verifies that the state is running. + */ + private void verifyIsRunning() { + checkState(isRunning, "Not running. Forgot to call start()?"); + } + + /** + * Get the persisted framework ID. + * @return the current ID or empty if none is yet persisted. + * @throws Exception on ZK failures, interruptions. + */ + @Override + public Option<Protos.FrameworkID> getFrameworkID() throws Exception { + synchronized (cacheLock) { + verifyIsRunning(); + + Option<Protos.FrameworkID> frameworkID; + byte[] value = frameworkIdInZooKeeper.getValue(); + if (value.length == 0) { + frameworkID = Option.empty(); + } else { + frameworkID = Option.apply(Protos.FrameworkID.newBuilder().setValue(new String(value)).build()); + } + + return frameworkID; + } + } + + /** + * Update the persisted framework ID. + * @param frameworkID the new ID or empty to remove the persisted ID. + * @throws Exception on ZK failures, interruptions. + */ + @Override + public void setFrameworkID(Option<Protos.FrameworkID> frameworkID) throws Exception { + synchronized (cacheLock) { + verifyIsRunning(); + + byte[] value = frameworkID.isDefined() ? frameworkID.get().getValue().getBytes() : new byte[0]; + frameworkIdInZooKeeper.setValue(value); + } + } + + /** + * Generates a new task ID. + */ + @Override + public Protos.TaskID newTaskID() throws Exception { + synchronized (cacheLock) { + verifyIsRunning(); + + int nextCount; + boolean success; + do { + VersionedValue<Integer> count = totalTaskCountInZooKeeper.getVersionedValue(); + nextCount = count.getValue() + 1; + success = totalTaskCountInZooKeeper.trySetCount(count, nextCount); + } + while (!success); + + Protos.TaskID taskID = Protos.TaskID.newBuilder().setValue(TASKID_FORMAT.format(nextCount)).build(); + return taskID; + } + } + + @Override + public List<MesosWorkerStore.Worker> recoverWorkers() throws Exception { + synchronized (cacheLock) { + verifyIsRunning(); + + List<Tuple2<StateHandle<MesosWorkerStore.Worker>, String>> handles = workersInZooKeeper.getAll(); + + if(handles.size() != 0) { + List<MesosWorkerStore.Worker> workers = new ArrayList<>(handles.size()); + for (Tuple2<StateHandle<MesosWorkerStore.Worker>, String> handle : handles) { + Worker worker = handle.f0.getState(ClassLoader.getSystemClassLoader()); + + workers.add(worker); + } + + return workers; + } + else { + return Collections.emptyList(); + } + } + } + + @Override + public void putWorker(MesosWorkerStore.Worker worker) throws Exception { + + checkNotNull(worker, "worker"); + String path = getPathForWorker(worker.taskID()); + + synchronized (cacheLock) { + verifyIsRunning(); + + int currentVersion = workersInZooKeeper.exists(path); + if (currentVersion == -1) { + try { + workersInZooKeeper.add(path, worker); + LOG.debug("Added {} in ZooKeeper.", worker); + } catch (KeeperException.NodeExistsException ex) { + throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", ex); + } + } else { + try { + workersInZooKeeper.replace(path, currentVersion, worker); + LOG.debug("Updated {} in ZooKeeper.", worker); + } catch (KeeperException.NoNodeException ex) { + throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", ex); + } + } + } + } + + @Override + public void removeWorker(Protos.TaskID taskID) throws Exception { + checkNotNull(taskID, "taskID"); + String path = getPathForWorker(taskID); + synchronized (cacheLock) { + verifyIsRunning(); + + workersInZooKeeper.remove(path); + LOG.debug("Removed worker {} from ZooKeeper.", taskID); + } + } + + @Override + public void cleanup() throws Exception { + // TODO --- End diff -- Can we resolve this TODO? > Integrate Flink with Apache Mesos > --------------------------------- > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management > Reporter: Robert Metzger > Assignee: Eron Wright > Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)