[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15424744#comment-15424744 ]
ASF GitHub Bot commented on FLINK-1984: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75149129 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.SharedValue; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.StateStorageHelper; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.mesos.Protos; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A ZooKeeper-backed Mesos worker store. + */ +public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperMesosWorkerStore.class); + + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + /** A persistent value of the assigned framework ID */ + private final SharedValue frameworkIdInZooKeeper; + + /** A persistent count of all tasks created, for generating unique IDs */ + private final SharedCount totalTaskCountInZooKeeper; + + /** A persistent store of serialized workers */ + private final ZooKeeperStateHandleStore<MesosWorkerStore.Worker> workersInZooKeeper; + + @SuppressWarnings("unchecked") + ZooKeeperMesosWorkerStore( + CuratorFramework client, + String storePath, + StateStorageHelper<MesosWorkerStore.Worker> stateStorage + ) throws Exception { + checkNotNull(storePath, "storePath"); + checkNotNull(stateStorage, "stateStorage"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = checkNotNull(client, "client"); + + // All operations will have the given path as root + client.newNamespaceAwareEnsurePath(storePath).ensure(client.getZookeeperClient()); + CuratorFramework facade = client.usingNamespace(client.getNamespace() + storePath); + + // Track the assignd framework ID. + frameworkIdInZooKeeper = new SharedValue(facade, "/frameworkId", new byte[0]); + + // Keep a count of all tasks created ever, as the basis for a unique ID. + totalTaskCountInZooKeeper = new SharedCount(facade, "/count", 0); + + // Keep track of the workers in state handle storage. + facade.newNamespaceAwareEnsurePath("/workers").ensure(client.getZookeeperClient()); + CuratorFramework storeFacade = client.usingNamespace(facade.getNamespace() + "/workers"); + + this.workersInZooKeeper = ZooKeeperStateHandleStore.class + .getConstructor(CuratorFramework.class, StateStorageHelper.class) + .newInstance(storeFacade, stateStorage); --- End diff -- I think we can resolve this by configuring the shading properly. I could offer to look into this after the merge... > Integrate Flink with Apache Mesos > --------------------------------- > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management > Reporter: Robert Metzger > Assignee: Eron Wright > Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)