[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2365#discussion_r74737756
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 ---
@@ -25,33 +25,42 @@
 import org.apache.flink.runtime.rpc.MainThreadExecutor;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
+import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
+import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
 import org.apache.flink.util.Preconditions;
+import org.apache.log4j.Logger;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.util.BitSet;
 import java.util.concurrent.Callable;
 
 /**
- * Invocation handler to be used with a {@link AkkaRpcActor}. The 
invocation handler wraps the
- * rpc in a {@link RpcInvocation} message and then sends it to the {@link 
AkkaRpcActor} where it is
+ * Invocation handler to be used with an {@link AkkaRpcActor}. The 
invocation handler wraps the
+ * rpc in a {@link LocalRpcInvocation} message and then sends it to the 
{@link AkkaRpcActor} where it is
  * executed.
  */
 class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, 
MainThreadExecutor {
+   private static final Logger LOG = 
Logger.getLogger(AkkaInvocationHandler.class);
+
private final ActorRef rpcServer;
--- End diff --

Good point, will change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2365#discussion_r74737803
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RemoteRpcInvocation.java
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * Remote rpc invocation message which is used when the actor 
communication is remote and, thus, the
+ * message has to be serialized.
+ * 
+ * In order to fail fast and report an appropriate error message to the 
user, the method name, the
+ * parameter types and the arguments are eagerly serialized. In case the 
the invocation call
+ * contains a non-serializable object, then an {@link IOException} is 
thrown.
+ */
+public class RemoteRpcInvocation implements RpcInvocation, Serializable {
+   private static final long serialVersionUID = 6179354390913843809L;
+
+   // Serialized invocation data
+   private final SerializedValue 
serializedMethodInvocation;
+
+   // Transient field which is lazily initialized upon first access to the 
invocation data
+   private transient RemoteRpcInvocation.MethodInvocation methodInvocation;
+
+   public  RemoteRpcInvocation(
+   final String methodName,
+   final Class[] parameterTypes,
+   final Object[] args) throws IOException {
+
+   serializedMethodInvocation = new SerializedValue<>(new 
RemoteRpcInvocation.MethodInvocation(methodName, parameterTypes, args));
--- End diff --

You're right. Will add it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2365#discussion_r74737963
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 ---
@@ -25,33 +25,42 @@
 import org.apache.flink.runtime.rpc.MainThreadExecutor;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
+import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
+import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
 import org.apache.flink.util.Preconditions;
+import org.apache.log4j.Logger;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.util.BitSet;
 import java.util.concurrent.Callable;
 
 /**
- * Invocation handler to be used with a {@link AkkaRpcActor}. The 
invocation handler wraps the
- * rpc in a {@link RpcInvocation} message and then sends it to the {@link 
AkkaRpcActor} where it is
+ * Invocation handler to be used with an {@link AkkaRpcActor}. The 
invocation handler wraps the
+ * rpc in a {@link LocalRpcInvocation} message and then sends it to the 
{@link AkkaRpcActor} where it is
  * executed.
  */
 class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, 
MainThreadExecutor {
+   private static final Logger LOG = 
Logger.getLogger(AkkaInvocationHandler.class);
+
private final ActorRef rpcServer;
 
// default timeout for asks
private final Timeout timeout;
 
-   AkkaInvocationHandler(ActorRef rpcServer, Timeout timeout) {
+   private final long maximumFramesize;
+
+   AkkaInvocationHandler(ActorRef rpcServer, Timeout timeout, long 
maximumFramesize) {
--- End diff --

`isLocalActorRef` should actually be quite cheap since it simply checks 
whether the host option of the actor's address is defined or not. But, anyway, 
it makes sense to check it once and store the result. Will add the change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc invocatio...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2365
  
Thanks for the review @StephanEwen. I've addressed your comments. Once 
Travis gives green light, I'll merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2370#discussion_r74768392
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import java.io.Serializable;
+
+public class ResourceProfile implements Serializable {
--- End diff --

Java docs are missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2370#discussion_r74768700
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import java.io.Serializable;
+
+public class ResourceProfile implements Serializable {
+
+   private static final long serialVersionUID = -784900073893060124L;
+
+   /** How many cpu cores are needed, use double so we can specify cpu 
like 0.1 */
+   private final double cpuCores;
+
+   /** How many memory in mb are needed */
+   private final long memoryInMB;
--- End diff --

It might be helpful to have getter methods for the fields. The RM can then 
use the information from the ResourceProfile to create container requests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2370#discussion_r74768989
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class SlotID implements ResourceIDRetrievable, Serializable {
--- End diff --

Java docs missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2370#discussion_r74769608
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import java.io.Serializable;
+
+public class ResourceProfile implements Serializable {
+
+   private static final long serialVersionUID = -784900073893060124L;
+
+   /** How many cpu cores are needed, use double so we can specify cpu 
like 0.1 */
+   private final double cpuCores;
+
+   /** How many memory in mb are needed */
+   private final long memoryInMB;
+
+   public ResourceProfile(double cpuCores, long memoryInMB) {
+   this.cpuCores = cpuCores;
+   this.memoryInMB = memoryInMB;
+   }
+
+   /**
+* Check whether required resource profile can be matched
+*
+* @param required the required resource profile
+* @return true if the requirement is matched, otherwise false
+*/
+   public boolean matchRequirement(ResourceProfile required) {
--- End diff --

Maybe we could rename this method to `isMatching` or 
`isMatchingRequirement`. Starting these kind of methods with a question 
indicates that it does not change the internal state but checks for a certain 
condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2370#discussion_r74769810
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import java.io.Serializable;
+
+public class ResourceProfile implements Serializable {
+
+   private static final long serialVersionUID = -784900073893060124L;
+
+   /** How many cpu cores are needed, use double so we can specify cpu 
like 0.1 */
+   private final double cpuCores;
+
+   /** How many memory in mb are needed */
+   private final long memoryInMB;
+
+   public ResourceProfile(double cpuCores, long memoryInMB) {
+   this.cpuCores = cpuCores;
+   this.memoryInMB = memoryInMB;
+   }
+
+   /**
+* Check whether required resource profile can be matched
+*
+* @param required the required resource profile
+* @return true if the requirement is matched, otherwise false
+*/
+   public boolean matchRequirement(ResourceProfile required) {
+   return Double.compare(cpuCores, required.cpuCores) >= 0 && 
memoryInMB >= required.memoryInMB;
--- End diff --

Maybe we could create a getter method for `memoryInMB`. Then we don't have 
to access the private fields of another instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2370#discussion_r74769869
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import java.io.Serializable;
+
+public class ResourceProfile implements Serializable {
+
+   private static final long serialVersionUID = -784900073893060124L;
+
+   /** How many cpu cores are needed, use double so we can specify cpu 
like 0.1 */
+   private final double cpuCores;
+
+   /** How many memory in mb are needed */
+   private final long memoryInMB;
+
+   public ResourceProfile(double cpuCores, long memoryInMB) {
+   this.cpuCores = cpuCores;
+   this.memoryInMB = memoryInMB;
+   }
+
+   /**
+* Check whether required resource profile can be matched
+*
+* @param required the required resource profile
+* @return true if the requirement is matched, otherwise false
+*/
+   public boolean matchRequirement(ResourceProfile required) {
+   return Double.compare(cpuCores, required.cpuCores) >= 0 && 
memoryInMB >= required.memoryInMB;
--- End diff --

The same would then apply to `cpuCores` as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2370#discussion_r74770025
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class SlotID implements ResourceIDRetrievable, Serializable {
+
+   private static final long serialVersionUID = -6399206032549807771L;
+
+   /** The resource id which this slot located */
+   private final ResourceID resourceId;
+
+   /** The numeric id for single slot */
+   private final int slotId;
+
+   public SlotID(ResourceID resourceId, int slotId) {
+   checkNotNull(resourceId, "ResourceID must not be null");
+   this.resourceId = resourceId;
--- End diff --

You can write: `this.resourceId = checkNotNull(resourceId, "")`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2370: [FLINK-4373] [cluster management] Introduce SlotID, Alloc...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2370
  
Thanks for you contribution @KurtYoung. Good work! 

I only had some minor comments. Once they are addressed it's good to be 
merged :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc invocatio...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2365
  
Somehow this PR was not closed automatically.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/2365


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2358: [FLINK-4382] Buffer rpc calls until the RpcEndpoint has b...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2358
  
The stashing is a feature which comes for free with Akka. Of course the 
sending side should not rely on this behaviour when implementing its failure 
treatment. But in cases where we have a race condition between the start-up of 
the `RpcEndpoint` and a `RpcGateway` it should result into a slightly more 
forgiving receiver behaviour.

I will rebase the PR and if Travis gives green light I would like to merge 
this PR to the flip-6 feature branch if nobody objects.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2370#discussion_r74785154
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
 ---
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import org.apache.flink.util.AbstractID;
+
+/**
+ * Unique identifier for the attempt to allocate a slot, normally created 
by JobManager when requesting a slot,
+ * constant across re-tries. This can also be used to identify responses 
by the ResourceManager and to identify
+ * deployment calls towards the TaskManager that was allocated from.
+ */
+public class AllocationID extends AbstractID {
+
+   private static final long serialVersionUID = 1L;
+
+   /**
+* Creates an new random allocation ID.
+*/
+   public AllocationID() {
+   super();
+   }
--- End diff --

This constructor is the standard default constructor, right? I guess we 
don't need to specify this constructor then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2353: [FLINK-4355] [cluster management] Implement TaskMa...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2353#discussion_r74800246
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/TaskExecutorRegistrationResponse.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import org.apache.flink.runtime.instance.InstanceID;
+
+/**
+ * Base class for responses from the ResourceManager to a registration 
attempt by a
+ * TaskExecutor.
+ */
+public abstract class TaskExecutorRegistrationResponse {
+
+   /**
+* Successful registration.
+*/
+   public static final class Success extends 
TaskExecutorRegistrationResponse {
+
+   private final InstanceID registrationId;
+
+   private final long heartbeatInterval;
+
+   public Success(InstanceID registrationId, long 
heartbeatInterval) {
+   this.registrationId = registrationId;
+   this.heartbeatInterval = heartbeatInterval;
+   }
+
+   public InstanceID getRegistrationId() {
+   return registrationId;
+   }
+
+   public long getHeartbeatInterval() {
+   return heartbeatInterval;
+   }
+
+   @Override
+   public String toString() {
+   return "TaskExecutorRegistrationSuccess (" + 
registrationId + " / " + heartbeatInterval + ')';
+   }
+   }
+
+   // 

+
+   /**
+* A rejected (declined) registration.
+*/
+   public static final class Decline extends 
TaskExecutorRegistrationResponse {
+
+   private final String reason;
+
+   public Decline(String reason) {
+   this.reason = reason;
+   }
+
+   public String getReason() {
+   return reason;
+   }
+
+   @Override
+   public String toString() {
+   return "TaskExecutorRegistrationDecline (" + reason + 
')';
+   }
+   }
+}
+
+
+
+
+
+
+
--- End diff --

Many line breaks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2353: [FLINK-4355] [cluster management] Implement TaskMa...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2353#discussion_r74802315
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/ResourceManagerRegistration.java
 ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import akka.dispatch.OnFailure;
+import akka.dispatch.OnSuccess;
+
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import 
org.apache.flink.runtime.rpc.resourcemanager.TaskExecutorRegistrationResponse;
+
+import org.slf4j.Logger;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * This utility class handles the registration of the TaskExecutor at the 
ResourceManager.
+ * It implements the initial address resolution and the 
retries-with-backoff strategy.
+ * 
+ * The registration process notifies its {@link 
TaskExecutorToResourceManagerConnection}
+ * upon successful registration. The registration can be canceled, for 
example when the
+ * ResourceManager that it tries to register at looses leader status.
+ * 
+ * Implementation note: This class does not act like a single-threaded 
actor.
+ * It holds only constant state and passes all variable state via stack 
and closures.
+ */
+public class ResourceManagerRegistration {
+
+   // 

+   //  configuration constants
+   // 

+
+   private static final long INITIAL_REGISTRATION_TIMEOUT_MILLIS = 100;
+
+   private static final long MAX_REGISTRATION_TIMEOUT_MILLIS = 3;
+
+   private static final long ERROR_REGISTRATION_DELAY_MILLIS = 1;
+
+   private static final long REFUSED_REGISTRATION_DELAY_MILLIS = 3;
+
+   // 

+
+   private final Logger log;
+
+   private final TaskExecutorToResourceManagerConnection connection;
+
+   private final TaskExecutor taskExecutor;
+
+   private final String resourceManagerAddress;
+
+   private final UUID resourceManagerLeaderId;
+
+   private volatile boolean canceled;
+
+   public ResourceManagerRegistration(
+   Logger log,
+   TaskExecutorToResourceManagerConnection connection,
+   TaskExecutor taskExecutor,
+   String resourceManagerAddress,
+   UUID resourceManagerLeaderId) {
+
+   this.log = checkNotNull(log);
+   this.connection = checkNotNull(connection);
+   this.taskExecutor = checkNotNull(taskExecutor);
+   this.resourceManagerAddress = 
checkNotNull(resourceManagerAddress);
+   this.resourceManagerLeaderId = 
checkNotNull(resourceManagerLeaderId);
+   }
+
+   // 

+   //  cancellation
+   // 

+
+   /**
+* Cancels the registration procedure.
+*/
+   public void cancel() {
+   canceled = true;
+   }
+
+   /**
+* Checks if the registration was canceled.
+* @return True if the registration was canceled, false otherwise.
+*/
+   public boolean isCanceled() {
+   return canceled;
+   }
+
+   // 


[GitHub] flink pull request #2353: [FLINK-4355] [cluster management] Implement TaskMa...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2353#discussion_r74803435
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.slf4j.Logger;
+
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class TaskExecutorToResourceManagerConnection {
+
+   private final Logger log;
+
+   private final TaskExecutor taskExecutor;
+
+   private final UUID resourceManagerLeaderId;
+
+   private final String resourceManagerAddress;
+
+   private ResourceManagerRegistration pendingRegistration;
+
+   private ResourceManagerGateway registeredResourceManager;
+
+   private InstanceID registrationId;
+
+   private volatile boolean closed;
+
+
+   public TaskExecutorToResourceManagerConnection(
+   Logger log,
--- End diff --

I'm not so sure whether passing a logger from a different context makes the 
logging statements easier or harder to understand. Usually I have troubles to 
locate the logging statements (and thus the state of the computation) when I 
see logging statements logged from a different (wider) scope than they actually 
happened.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2353: [FLINK-4355] [cluster management] Implement TaskManager s...

2016-08-15 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2353
  
Really good code @StephanEwen :-) I like it a lot that you've pulled out 
the registration logic from the `TaskExecutor`. This makes it much cleaner and 
easier to test.

I'm actually wondering whether we can actually take it a step further. What 
I've noticed is that we have to pass the `TaskExecutor` to different places to 
have access to the its state and to run things in the main thread context. I'm 
not sure whether this is really necessary. 

The registration information should be more or less constant and the main 
thing which has to happen in the main thread context is the setting of the 
connected gateway. So wouldn't it be possible that the 
`ResourceManagerRegistration` is something like a cancelable future which is 
instantiated with all necessary information. It performs the registration logic 
completely in the background and once it is completed we can create a 
`TaskExecutorToResourceManagerConnection` in the main thread context. Whenever 
we have to start a new registration, we cancel the future (if there is still 
another registration running).

Pulling out the `TaskExecutor` from the registration process has the 
advantage that the components are even further separated and, thus, easier to 
test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2370: [FLINK-4373] [cluster management] Introduce SlotID, Alloc...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2370
  
Failing test cases are unrelated. Will merge this PR. Thanks for your 
contribution @KurtYoung.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2371: [FLINK-4309] Potential null pointer dereference in...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2371#discussion_r74893388
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
 ---
@@ -178,14 +178,19 @@ public String toString() {
 
@Override
public Set keySet() {
+
final HashSet set = new HashSet();
-   final int prefixLen = this.prefix == null ? 0 : 
this.prefix.length();
 
for (String key : this.backingConfig.keySet()) {
-   if (key.startsWith(this.prefix)) {
+
+   if (this.prefix == null) {
--- End diff --

Can we move the condition out of the loop? The condition does not change 
over different iteration so we don't have to perform it every time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2371: [FLINK-4309] Potential null pointer dereference in Delega...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2371
  
Thanks for your contribution @tsunny. Your changes look good. I only had a 
minor comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2370: [FLINK-4373] [cluster management] Introduce SlotID, Alloc...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2370
  
Not automatically closed by the asfgit bot. @KurtYoung could you close this 
PR manually?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74937306
  
--- Diff: flink-dist/pom.xml ---
@@ -113,8 +113,13 @@ under the License.
flink-metrics-jmx
${project.version}

+
+   
+   org.apache.flink
+   flink-mesos_2.10
+   ${project.version}
+   
--- End diff --

We might introduce an "include-mesos" profile where we add this dependency 
to flink-dist.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74937890
  
--- Diff: flink-mesos/pom.xml ---
@@ -0,0 +1,294 @@
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-parent
+   1.1-SNAPSHOT
+   ..
+   
+   
+   flink-mesos_2.10
+   flink-mesos
+   jar
+
+
+0.27.1
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   
+   
+   hadoop-core
+   org.apache.hadoop
+   
+   
+   
+   
+   
+   org.apache.flink
+   flink-clients_2.10
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   ${shading-artifact.name}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.10
+   ${project.version}
+   test-jar
+   test
+   
+
+
+   
+   com.typesafe.akka
+   
akka-actor_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-remote_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-slf4j_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-testkit_${scala.binary.version}
+   
+
+   
--- End diff --

Can we remove this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74938223
  
--- Diff: flink-mesos/pom.xml ---
@@ -0,0 +1,294 @@
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-parent
+   1.1-SNAPSHOT
+   ..
+   
+   
+   flink-mesos_2.10
+   flink-mesos
+   jar
+
+
+0.27.1
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   
+   
+   hadoop-core
+   org.apache.hadoop
+   
+   
+   
+   
+   
+   org.apache.flink
+   flink-clients_2.10
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   ${shading-artifact.name}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.10
+   ${project.version}
+   test-jar
+   test
+   
+
+
+   
+   com.typesafe.akka
+   
akka-actor_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-remote_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-slf4j_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-testkit_${scala.binary.version}
+   
+
+   
+
+   
+   com.google.guava
+   guava
+   ${guava.version}
+   
--- End diff --

Are you using Guava or is this a dependency for Mesos?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74938631
  
--- Diff: flink-mesos/pom.xml ---
@@ -0,0 +1,294 @@
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-parent
+   1.1-SNAPSHOT
+   ..
+   
+   
+   flink-mesos_2.10
+   flink-mesos
+   jar
+
+
+0.27.1
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   
+   
+   hadoop-core
+   org.apache.hadoop
+   
+   
+   
+   
+   
+   org.apache.flink
+   flink-clients_2.10
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   ${shading-artifact.name}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.10
+   ${project.version}
+   test-jar
+   test
+   
+
+
+   
+   com.typesafe.akka
+   
akka-actor_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-remote_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-slf4j_${scala.binary.version}
+   
+
+   
+   com.typesafe.akka
+   
akka-testkit_${scala.binary.version}
+   
+
+   
+
+   
+   com.google.guava
+   guava
+   ${guava.version}
+   
+
+   
+   org.apache.curator
+   curator-test
+   ${curator.version}
+   test
--- End diff --

It might make sense to add this dependency to the dependency management 
section of the parent pom since also other modules use it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74940821
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.cli;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class FlinkMesosSessionCli {
--- End diff --

Java docs are missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74941466
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.mesos.Protos;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.mesos.Utils.variable;
+import static org.apache.flink.mesos.Utils.range;
+import static org.apache.flink.mesos.Utils.ranges;
+import static org.apache.flink.mesos.Utils.scalar;
+
+/**
+ * Specifies how to launch a Mesos worker.
+ */
+public class LaunchableMesosWorker implements LaunchableTask {
+
+   /**
+* The set of configuration keys to be dynamically configured with a 
port allocated from Mesos.
+*/
+   private static String[] TM_PORT_KEYS = {
+   "taskmanager.rpc.port",
+   "taskmanager.data.port" };
+
+   private final MesosTaskManagerParameters params;
+   private final Protos.TaskInfo.Builder template;
+   private final Protos.TaskID taskID;
+   private final Request taskRequest;
+
+   /**
+* Construct a launchable Mesos worker.
+* @param params the TM parameters such as memory, cpu to acquire.
+* @param template a template for the TaskInfo to be constructed at 
launch time.
+* @param taskID the taskID for this worker.
+ */
+   public LaunchableMesosWorker(MesosTaskManagerParameters params, 
Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+   this.params = params;
+   this.template = template;
+   this.taskID = taskID;
+   this.taskRequest = new Request();
+   }
+
+   public Protos.TaskID taskID() {
+   return taskID;
+   }
+
+   @Override
+   public TaskRequest taskRequest() {
+   return taskRequest;
+   }
+
+   class Request implements TaskRequest {
+   private final AtomicReference 
assignedResources = new AtomicReference<>();
+
+   @Override
+   public String getId() {
+   return taskID.getValue();
+   }
+
+   @Override
+   public String taskGroupName() {
+   return "";
+   }
+
+   @Override
+   public double getCPUs() {
+   return params.cpus();
+   }
+
+   @Override
+   public double getMemory() {
+   return 
params.containeredParameters().taskManagerTotalMemoryMB();
+   }
+
+   @Override
+   public double getNetworkMbps() {
+   return 0.0;
+   }
+
+   @Override
+   public double getDisk() {
+   return 0.0;
+   }
+
+   @Override
+   public int getPorts() {
+   return TM_PORT_KEYS.length;
+   }
+
+   @Override
+   public Map 
getCustomNamedResources() {
+   return Collections.emptyMap();
+   }
+
+   @Override
+   public List getHardConstraints() 
{
+   return null;
+   }
+
+   @

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74941614
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.mesos.Protos;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.mesos.Utils.variable;
+import static org.apache.flink.mesos.Utils.range;
+import static org.apache.flink.mesos.Utils.ranges;
+import static org.apache.flink.mesos.Utils.scalar;
+
+/**
+ * Specifies how to launch a Mesos worker.
+ */
+public class LaunchableMesosWorker implements LaunchableTask {
+
+   /**
+* The set of configuration keys to be dynamically configured with a 
port allocated from Mesos.
+*/
+   private static String[] TM_PORT_KEYS = {
+   "taskmanager.rpc.port",
+   "taskmanager.data.port" };
+
+   private final MesosTaskManagerParameters params;
+   private final Protos.TaskInfo.Builder template;
+   private final Protos.TaskID taskID;
+   private final Request taskRequest;
+
+   /**
+* Construct a launchable Mesos worker.
+* @param params the TM parameters such as memory, cpu to acquire.
+* @param template a template for the TaskInfo to be constructed at 
launch time.
+* @param taskID the taskID for this worker.
+ */
+   public LaunchableMesosWorker(MesosTaskManagerParameters params, 
Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+   this.params = params;
+   this.template = template;
+   this.taskID = taskID;
+   this.taskRequest = new Request();
+   }
+
+   public Protos.TaskID taskID() {
+   return taskID;
+   }
+
+   @Override
+   public TaskRequest taskRequest() {
+   return taskRequest;
+   }
+
+   class Request implements TaskRequest {
+   private final AtomicReference 
assignedResources = new AtomicReference<>();
+
+   @Override
+   public String getId() {
+   return taskID.getValue();
+   }
+
+   @Override
+   public String taskGroupName() {
+   return "";
+   }
+
+   @Override
+   public double getCPUs() {
+   return params.cpus();
+   }
+
+   @Override
+   public double getMemory() {
+   return 
params.containeredParameters().taskManagerTotalMemoryMB();
+   }
+
+   @Override
+   public double getNetworkMbps() {
+   return 0.0;
+   }
+
+   @Override
+   public double getDisk() {
+   return 0.0;
+   }
+
+   @Override
+   public int getPorts() {
+   return TM_PORT_KEYS.length;
+   }
+
+   @Override
+   public Map 
getCustomNamedResources() {
+   return Collections.emptyMap();
+   }
+
+   @Override
+   public List getHardConstraints() 
{
+   return null;
+   }
+
+   @

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74941852
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.mesos.Protos;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.mesos.Utils.variable;
+import static org.apache.flink.mesos.Utils.range;
+import static org.apache.flink.mesos.Utils.ranges;
+import static org.apache.flink.mesos.Utils.scalar;
+
+/**
+ * Specifies how to launch a Mesos worker.
+ */
+public class LaunchableMesosWorker implements LaunchableTask {
+
+   /**
+* The set of configuration keys to be dynamically configured with a 
port allocated from Mesos.
+*/
+   private static String[] TM_PORT_KEYS = {
+   "taskmanager.rpc.port",
+   "taskmanager.data.port" };
+
+   private final MesosTaskManagerParameters params;
+   private final Protos.TaskInfo.Builder template;
+   private final Protos.TaskID taskID;
+   private final Request taskRequest;
+
+   /**
+* Construct a launchable Mesos worker.
+* @param params the TM parameters such as memory, cpu to acquire.
+* @param template a template for the TaskInfo to be constructed at 
launch time.
+* @param taskID the taskID for this worker.
+ */
+   public LaunchableMesosWorker(MesosTaskManagerParameters params, 
Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+   this.params = params;
+   this.template = template;
+   this.taskID = taskID;
+   this.taskRequest = new Request();
+   }
+
+   public Protos.TaskID taskID() {
+   return taskID;
+   }
+
+   @Override
+   public TaskRequest taskRequest() {
+   return taskRequest;
+   }
+
+   class Request implements TaskRequest {
+   private final AtomicReference 
assignedResources = new AtomicReference<>();
+
+   @Override
+   public String getId() {
+   return taskID.getValue();
+   }
+
+   @Override
+   public String taskGroupName() {
+   return "";
+   }
+
+   @Override
+   public double getCPUs() {
+   return params.cpus();
+   }
+
+   @Override
+   public double getMemory() {
+   return 
params.containeredParameters().taskManagerTotalMemoryMB();
+   }
+
+   @Override
+   public double getNetworkMbps() {
+   return 0.0;
+   }
+
+   @Override
+   public double getDisk() {
+   return 0.0;
+   }
+
+   @Override
+   public int getPorts() {
+   return TM_PORT_KEYS.length;
+   }
+
+   @Override
+   public Map 
getCustomNamedResources() {
+   return Collections.emptyMap();
+   }
+
+   @Override
+   public List getHardConstraints() 
{
+   return null;
+   }
+
+   @

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74938853
  
--- Diff: flink-mesos/pom.xml ---
@@ -0,0 +1,294 @@
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-parent
+   1.1-SNAPSHOT
+   ..
+   
+   
+   flink-mesos_2.10
+   flink-mesos
+   jar
+
+
+0.27.1
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   
+   
+   hadoop-core
+   org.apache.hadoop
+   
+   
+   
+   
+   
+   org.apache.flink
+   flink-clients_2.10
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   ${shading-artifact.name}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.10
+   ${project.version}
+   test-jar
+   test
+   
--- End diff --

Maybe group all test dependencies under the test structural comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74947651
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74948586
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74948792
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74949114
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74947914
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74949669
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74948145
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74950002
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74950095
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74950716
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR

[GitHub] flink pull request #2358: [FLINK-4382] Buffer rpc calls until the RpcEndpoin...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/2358


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74963589
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNe

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74963941
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNe

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74964722
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNe

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74965913
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNe

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74971461
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNe

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74972631
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNe

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74972977
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+
+import static java.util.Objects.requireNonNull;
+
+public class MesosTaskManagerParameters {
--- End diff --

Java docs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75080549
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNe

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75080685
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNe

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75080823
  
--- Diff: flink-dist/pom.xml ---
@@ -113,8 +113,13 @@ under the License.
flink-metrics-jmx
${project.version}

+
+   
+   org.apache.flink
+   flink-mesos_2.10
+   ${project.version}
+   
--- End diff --

We do the same for YARN. See the `include-yarn` profile.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #:

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:


https://github.com/apache/flink/commit/68addf39e0e5f9e1656818f923be362680ed93b0#commitcomment-18669730
  
Really nice abstraction @StephanEwen 👍 . I like it a lot :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r7518
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75113276
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.Map;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The entry point for running a TaskManager in a Mesos container.
+ */
+public class MesosTaskManagerRunner {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MesosTaskManagerRunner.class);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   public static void runTaskManager(String[] args, final Class taskManager) throws IOException {
+   EnvironmentInformation.logEnvironmentInfo(LOG, 
taskManager.getSimpleName(), args);
+   org.apache.flink.runtime.util.SignalHandler.register(LOG);
+
+   // try to parse the command line arguments
+   final Configuration configuration;
+   try {
+   configuration = 
TaskManager.parseArgsAndLoadConfig(args);
+
+   // add dynamic properties to TaskManager configuration.
+   final Configuration dynamicProperties =
+   
FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
+   LOG.debug("Mesos dynamic properties: {}", 
dynamicProperties);
+   configuration.addAll(dynamicProperties);
+   }
+   catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
+   System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+   return;
+   }
+
+   // read the environment variables
+   final Map envs = System.getenv();
+   final String effectiveUsername = 
envs.get(MesosConfigKeys.ENV_CLIENT_USERNAME);
+   final String tmpDirs = 
envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
+
+   // configure local directory
+   String flinkTempDirs = 
configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
+   if (flinkTempDirs != null) {
+   LOG.info("Overriding Mesos temporary file directories 
with those " +
+   "specified in the Flink config: " + 
flinkTempDirs);
+   }
+   else if (tmpDirs != null) {
+   LOG.info("Setting directories for temporary files to: " 
+ tmpDirs);
--- End diff --

`{}`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75113239
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.Map;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The entry point for running a TaskManager in a Mesos container.
+ */
+public class MesosTaskManagerRunner {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MesosTaskManagerRunner.class);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   public static void runTaskManager(String[] args, final Class taskManager) throws IOException {
+   EnvironmentInformation.logEnvironmentInfo(LOG, 
taskManager.getSimpleName(), args);
+   org.apache.flink.runtime.util.SignalHandler.register(LOG);
+
+   // try to parse the command line arguments
+   final Configuration configuration;
+   try {
+   configuration = 
TaskManager.parseArgsAndLoadConfig(args);
+
+   // add dynamic properties to TaskManager configuration.
+   final Configuration dynamicProperties =
+   
FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
+   LOG.debug("Mesos dynamic properties: {}", 
dynamicProperties);
+   configuration.addAll(dynamicProperties);
+   }
+   catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
--- End diff --

Maybe we could output a different error message because the throwable's 
message will be output anyway. Something like "failed loading the 
configuration" or so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75113310
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.Map;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The entry point for running a TaskManager in a Mesos container.
+ */
+public class MesosTaskManagerRunner {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MesosTaskManagerRunner.class);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   public static void runTaskManager(String[] args, final Class taskManager) throws IOException {
+   EnvironmentInformation.logEnvironmentInfo(LOG, 
taskManager.getSimpleName(), args);
+   org.apache.flink.runtime.util.SignalHandler.register(LOG);
+
+   // try to parse the command line arguments
+   final Configuration configuration;
+   try {
+   configuration = 
TaskManager.parseArgsAndLoadConfig(args);
+
+   // add dynamic properties to TaskManager configuration.
+   final Configuration dynamicProperties =
+   
FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
+   LOG.debug("Mesos dynamic properties: {}", 
dynamicProperties);
+   configuration.addAll(dynamicProperties);
+   }
+   catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
+   System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+   return;
+   }
+
+   // read the environment variables
+   final Map envs = System.getenv();
+   final String effectiveUsername = 
envs.get(MesosConfigKeys.ENV_CLIENT_USERNAME);
+   final String tmpDirs = 
envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
+
+   // configure local directory
+   String flinkTempDirs = 
configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
+   if (flinkTempDirs != null) {
+   LOG.info("Overriding Mesos temporary file directories 
with those " +
+   "specified in the Flink config: " + 
flinkTempDirs);
+   }
+   else if (tmpDirs != null) {
+   LOG.info("Setting directories for temporary files to: " 
+ tmpDirs);
+   
configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs);
+   }
+
+   LOG.info("Mesos task runs as '" + 
UserGroupInformation.getCurrentUser().getShortUserName() +
+   "', setting user to execute Flink TaskManager to '" + 
effectiveUsername + "'");
--- End diff --

`{}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. 

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75113264
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.Map;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The entry point for running a TaskManager in a Mesos container.
+ */
+public class MesosTaskManagerRunner {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MesosTaskManagerRunner.class);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   public static void runTaskManager(String[] args, final Class taskManager) throws IOException {
+   EnvironmentInformation.logEnvironmentInfo(LOG, 
taskManager.getSimpleName(), args);
+   org.apache.flink.runtime.util.SignalHandler.register(LOG);
+
+   // try to parse the command line arguments
+   final Configuration configuration;
+   try {
+   configuration = 
TaskManager.parseArgsAndLoadConfig(args);
+
+   // add dynamic properties to TaskManager configuration.
+   final Configuration dynamicProperties =
+   
FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
+   LOG.debug("Mesos dynamic properties: {}", 
dynamicProperties);
+   configuration.addAll(dynamicProperties);
+   }
+   catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
+   System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+   return;
+   }
+
+   // read the environment variables
+   final Map envs = System.getenv();
+   final String effectiveUsername = 
envs.get(MesosConfigKeys.ENV_CLIENT_USERNAME);
+   final String tmpDirs = 
envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
+
+   // configure local directory
+   String flinkTempDirs = 
configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
+   if (flinkTempDirs != null) {
+   LOG.info("Overriding Mesos temporary file directories 
with those " +
+   "specified in the Flink config: " + 
flinkTempDirs);
--- End diff --

`{}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75114404
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.store;
+
+import org.apache.mesos.Protos;
+import scala.Option;
+
+import java.io.Serializable;
+import java.text.DecimalFormat;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A store of Mesos workers and associated framework information.
+ *
+ * Generates a framework ID as necessary.
+ */
+public interface MesosWorkerStore {
+
+   static final DecimalFormat TASKID_FORMAT = new 
DecimalFormat("taskmanager-0");
+
+   void start() throws Exception;
+
+   void stop() throws Exception;
+
+   Option getFrameworkID() throws Exception;
+
+   void setFrameworkID(Option frameworkID) throws 
Exception;
+
+   List recoverWorkers() throws Exception;
+
+   Protos.TaskID newTaskID() throws Exception;
+
+   void putWorker(Worker worker) throws Exception;
+
+   void removeWorker(Protos.TaskID taskID) throws Exception;
+
+   void cleanup() throws Exception;
+
+   /**
+* A stored task.
+*
+* The assigned slaveid/hostname is valid in Launched and Released 
states.  The hostname is needed
+* by Fenzo for optimization purposes.
+*/
+   class Worker implements Serializable {
--- End diff --

`serialVersionUID` missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75114471
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.store;
+
+import org.apache.mesos.Protos;
+import scala.Option;
+
+import java.io.Serializable;
+import java.text.DecimalFormat;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A store of Mesos workers and associated framework information.
+ *
+ * Generates a framework ID as necessary.
+ */
+public interface MesosWorkerStore {
+
+   static final DecimalFormat TASKID_FORMAT = new 
DecimalFormat("taskmanager-0");
+
+   void start() throws Exception;
+
+   void stop() throws Exception;
+
+   Option getFrameworkID() throws Exception;
+
+   void setFrameworkID(Option frameworkID) throws 
Exception;
+
+   List recoverWorkers() throws Exception;
+
+   Protos.TaskID newTaskID() throws Exception;
+
+   void putWorker(Worker worker) throws Exception;
+
+   void removeWorker(Protos.TaskID taskID) throws Exception;
+
+   void cleanup() throws Exception;
+
+   /**
+* A stored task.
+*
+* The assigned slaveid/hostname is valid in Launched and Released 
states.  The hostname is needed
+* by Fenzo for optimization purposes.
+*/
+   class Worker implements Serializable {
+   private Protos.TaskID taskID;
+
+   private Option slaveID;
+
+   private Option hostname;
+
+   private TaskState state;
--- End diff --

Can these fields be final?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75114739
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.store;
+
+import org.apache.mesos.Protos;
+import scala.Option;
+
+import java.io.Serializable;
+import java.text.DecimalFormat;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A store of Mesos workers and associated framework information.
+ *
+ * Generates a framework ID as necessary.
+ */
+public interface MesosWorkerStore {
+
+   static final DecimalFormat TASKID_FORMAT = new 
DecimalFormat("taskmanager-0");
+
+   void start() throws Exception;
+
+   void stop() throws Exception;
+
+   Option getFrameworkID() throws Exception;
+
+   void setFrameworkID(Option frameworkID) throws 
Exception;
+
+   List recoverWorkers() throws Exception;
+
+   Protos.TaskID newTaskID() throws Exception;
+
+   void putWorker(Worker worker) throws Exception;
+
+   void removeWorker(Protos.TaskID taskID) throws Exception;
+
+   void cleanup() throws Exception;
+
+   /**
+* A stored task.
+*
+* The assigned slaveid/hostname is valid in Launched and Released 
states.  The hostname is needed
+* by Fenzo for optimization purposes.
+*/
+   class Worker implements Serializable {
+   private Protos.TaskID taskID;
+
+   private Option slaveID;
+
+   private Option hostname;
+
+   private TaskState state;
+
+   public Worker(Protos.TaskID taskID, Option 
slaveID, Option hostname, TaskState state) {
+   requireNonNull(taskID, "taskID");
+   requireNonNull(slaveID, "slaveID");
+   requireNonNull(hostname, "hostname");
+   requireNonNull(state, "state");
+
+   this.taskID = taskID;
+   this.slaveID = slaveID;
+   this.hostname = hostname;
+   this.state = state;
+   }
+
+   public Protos.TaskID taskID() {
+   return taskID;
+   }
+
+   public Option slaveID() {
+   return slaveID;
+   }
+
+   public Option hostname() {
+   return hostname;
+   }
+
+   public TaskState state() {
+   return state;
+   }
--- End diff --

Getters in Java classes should follow the Java getter style and no the 
Scala style.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75115154
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.store;
+
+import org.apache.mesos.Protos;
+import scala.Option;
+
+import java.io.Serializable;
+import java.text.DecimalFormat;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A store of Mesos workers and associated framework information.
+ *
+ * Generates a framework ID as necessary.
+ */
+public interface MesosWorkerStore {
+
+   static final DecimalFormat TASKID_FORMAT = new 
DecimalFormat("taskmanager-0");
+
+   void start() throws Exception;
+
+   void stop() throws Exception;
+
+   Option getFrameworkID() throws Exception;
+
+   void setFrameworkID(Option frameworkID) throws 
Exception;
+
+   List recoverWorkers() throws Exception;
+
+   Protos.TaskID newTaskID() throws Exception;
+
+   void putWorker(Worker worker) throws Exception;
+
+   void removeWorker(Protos.TaskID taskID) throws Exception;
+
+   void cleanup() throws Exception;
+
+   /**
+* A stored task.
+*
+* The assigned slaveid/hostname is valid in Launched and Released 
states.  The hostname is needed
+* by Fenzo for optimization purposes.
+*/
+   class Worker implements Serializable {
+   private Protos.TaskID taskID;
+
+   private Option slaveID;
+
+   private Option hostname;
+
+   private TaskState state;
+
+   public Worker(Protos.TaskID taskID, Option 
slaveID, Option hostname, TaskState state) {
--- End diff --

Is this class intended to be created somewhere else than in the static 
methods of this class itself? If not, then we could narrow down the visibility 
of the constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75115442
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.store;
+
+import org.apache.mesos.Protos;
+import scala.Option;
+
+import java.io.Serializable;
+import java.text.DecimalFormat;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A store of Mesos workers and associated framework information.
+ *
+ * Generates a framework ID as necessary.
+ */
+public interface MesosWorkerStore {
+
+   static final DecimalFormat TASKID_FORMAT = new 
DecimalFormat("taskmanager-0");
+
+   void start() throws Exception;
+
+   void stop() throws Exception;
+
+   Option getFrameworkID() throws Exception;
+
+   void setFrameworkID(Option frameworkID) throws 
Exception;
+
+   List recoverWorkers() throws Exception;
+
+   Protos.TaskID newTaskID() throws Exception;
+
+   void putWorker(Worker worker) throws Exception;
+
+   void removeWorker(Protos.TaskID taskID) throws Exception;
+
+   void cleanup() throws Exception;
+
+   /**
+* A stored task.
+*
+* The assigned slaveid/hostname is valid in Launched and Released 
states.  The hostname is needed
+* by Fenzo for optimization purposes.
+*/
+   class Worker implements Serializable {
+   private Protos.TaskID taskID;
+
+   private Option slaveID;
+
+   private Option hostname;
+
+   private TaskState state;
+
+   public Worker(Protos.TaskID taskID, Option 
slaveID, Option hostname, TaskState state) {
+   requireNonNull(taskID, "taskID");
+   requireNonNull(slaveID, "slaveID");
+   requireNonNull(hostname, "hostname");
+   requireNonNull(state, "state");
+
+   this.taskID = taskID;
+   this.slaveID = slaveID;
+   this.hostname = hostname;
+   this.state = state;
+   }
+
+   public Protos.TaskID taskID() {
+   return taskID;
+   }
+
+   public Option slaveID() {
+   return slaveID;
+   }
+
+   public Option hostname() {
+   return hostname;
+   }
+
+   public TaskState state() {
+   return state;
+   }
+
+   // valid transition methods
+
+   public static Worker newTask(Protos.TaskID taskID) {
+   return new Worker(
+   taskID,
+   Option.empty(), 
Option.empty(),
+   TaskState.New);
+   }
+
+   public Worker launchTask(Protos.SlaveID slaveID, String 
hostname) {
+   return new Worker(taskID, Option.apply(slaveID), 
Option.apply(hostname), TaskState.Launched);
+   }
+
+   public Worker releaseTask() {
+   return new Worker(taskID, slaveID, hostname, 
TaskState.Released);
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   Worker worker = (Worker) o;
+   return Objects.equals(ta

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75115525
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.store;
+
+import org.apache.mesos.Protos;
+import scala.Option;
+
+import java.io.Serializable;
+import java.text.DecimalFormat;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A store of Mesos workers and associated framework information.
+ *
+ * Generates a framework ID as necessary.
+ */
+public interface MesosWorkerStore {
+
+   static final DecimalFormat TASKID_FORMAT = new 
DecimalFormat("taskmanager-0");
+
+   void start() throws Exception;
+
+   void stop() throws Exception;
+
+   Option getFrameworkID() throws Exception;
+
+   void setFrameworkID(Option frameworkID) throws 
Exception;
+
+   List recoverWorkers() throws Exception;
+
+   Protos.TaskID newTaskID() throws Exception;
+
+   void putWorker(Worker worker) throws Exception;
+
+   void removeWorker(Protos.TaskID taskID) throws Exception;
+
+   void cleanup() throws Exception;
+
+   /**
+* A stored task.
+*
+* The assigned slaveid/hostname is valid in Launched and Released 
states.  The hostname is needed
+* by Fenzo for optimization purposes.
+*/
+   class Worker implements Serializable {
+   private Protos.TaskID taskID;
+
+   private Option slaveID;
+
+   private Option hostname;
+
+   private TaskState state;
+
+   public Worker(Protos.TaskID taskID, Option 
slaveID, Option hostname, TaskState state) {
+   requireNonNull(taskID, "taskID");
+   requireNonNull(slaveID, "slaveID");
+   requireNonNull(hostname, "hostname");
+   requireNonNull(state, "state");
+
+   this.taskID = taskID;
+   this.slaveID = slaveID;
+   this.hostname = hostname;
+   this.state = state;
+   }
+
+   public Protos.TaskID taskID() {
+   return taskID;
+   }
+
+   public Option slaveID() {
+   return slaveID;
+   }
+
+   public Option hostname() {
+   return hostname;
+   }
+
+   public TaskState state() {
+   return state;
+   }
+
+   // valid transition methods
+
+   public static Worker newTask(Protos.TaskID taskID) {
+   return new Worker(
+   taskID,
+   Option.empty(), 
Option.empty(),
+   TaskState.New);
+   }
+
+   public Worker launchTask(Protos.SlaveID slaveID, String 
hostname) {
+   return new Worker(taskID, Option.apply(slaveID), 
Option.apply(hostname), TaskState.Launched);
+   }
+
+   public Worker releaseTask() {
+   return new Worker(taskID, slaveID, hostname, 
TaskState.Released);
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   Worker worker = (Worker) o;
+   return Objects.equals(ta

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75117043
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
 ---
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.store;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.SharedValue;
+import org.apache.curator.framework.recipes.shared.VersionedValue;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.StateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.mesos.Protos;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A ZooKeeper-backed Mesos worker store.
+ */
+public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperMesosWorkerStore.class);
+
+   private final Object cacheLock = new Object();
+
+   /** Client (not a namespace facade) */
+   private final CuratorFramework client;
+
+   /** Flag indicating whether this instance is running. */
+   private boolean isRunning;
+
+   /** A persistent value of the assigned framework ID */
+   private final SharedValue frameworkIdInZooKeeper;
+
+   /** A persistent count of all tasks created, for generating unique IDs 
*/
+   private final SharedCount totalTaskCountInZooKeeper;
+
+   /** A persistent store of serialized workers */
+   private final ZooKeeperStateHandleStore 
workersInZooKeeper;
+
+   @SuppressWarnings("unchecked")
+   ZooKeeperMesosWorkerStore(
+   CuratorFramework client,
+   String storePath,
+   StateStorageHelper stateStorage
+   ) throws Exception {
+   checkNotNull(storePath, "storePath");
+   checkNotNull(stateStorage, "stateStorage");
+
+   // Keep a reference to the original client and not the 
namespace facade. The namespace
+   // facade cannot be closed.
+   this.client = checkNotNull(client, "client");
+
+   // All operations will have the given path as root
+   
client.newNamespaceAwareEnsurePath(storePath).ensure(client.getZookeeperClient());
+   CuratorFramework facade = 
client.usingNamespace(client.getNamespace() + storePath);
+
+   // Track the assignd framework ID.
+   frameworkIdInZooKeeper = new SharedValue(facade, 
"/frameworkId", new byte[0]);
+
+   // Keep a count of all tasks created ever, as the basis for a 
unique ID.
+   totalTaskCountInZooKeeper = new SharedCount(facade, "/count", 
0);
+
+   // Keep track of the workers in state handle storage.
+   
facade.newNamespaceAwareEnsurePath("/workers").ensure(client.getZookeeperClient());
+   CuratorFramework storeFacade = 
client.usingNamespace(facade.getNamespace() + "/workers");
+
+   this.workersInZooKeeper = ZooKeeperStateHandleS

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75119617
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
 ---
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.store;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.SharedValue;
+import org.apache.curator.framework.recipes.shared.VersionedValue;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.StateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.mesos.Protos;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A ZooKeeper-backed Mesos worker store.
+ */
+public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperMesosWorkerStore.class);
+
+   private final Object cacheLock = new Object();
+
+   /** Client (not a namespace facade) */
+   private final CuratorFramework client;
+
+   /** Flag indicating whether this instance is running. */
+   private boolean isRunning;
+
+   /** A persistent value of the assigned framework ID */
+   private final SharedValue frameworkIdInZooKeeper;
+
+   /** A persistent count of all tasks created, for generating unique IDs 
*/
+   private final SharedCount totalTaskCountInZooKeeper;
+
+   /** A persistent store of serialized workers */
+   private final ZooKeeperStateHandleStore 
workersInZooKeeper;
+
+   @SuppressWarnings("unchecked")
+   ZooKeeperMesosWorkerStore(
+   CuratorFramework client,
+   String storePath,
+   StateStorageHelper stateStorage
+   ) throws Exception {
+   checkNotNull(storePath, "storePath");
+   checkNotNull(stateStorage, "stateStorage");
+
+   // Keep a reference to the original client and not the 
namespace facade. The namespace
+   // facade cannot be closed.
+   this.client = checkNotNull(client, "client");
+
+   // All operations will have the given path as root
+   
client.newNamespaceAwareEnsurePath(storePath).ensure(client.getZookeeperClient());
+   CuratorFramework facade = 
client.usingNamespace(client.getNamespace() + storePath);
+
+   // Track the assignd framework ID.
+   frameworkIdInZooKeeper = new SharedValue(facade, 
"/frameworkId", new byte[0]);
+
+   // Keep a count of all tasks created ever, as the basis for a 
unique ID.
+   totalTaskCountInZooKeeper = new SharedCount(facade, "/count", 
0);
+
+   // Keep track of the workers in state handle storage.
+   
facade.newNamespaceAwareEnsurePath("/workers").ensure(client.getZookeeperClient());
+   CuratorFramework storeFacade = 
client.usingNamespace(facade.getNamespace() + "/workers");
+
+   this.workersInZooKeeper = ZooKeeperStateHandleS

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75120223
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
 ---
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.store;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.SharedValue;
+import org.apache.curator.framework.recipes.shared.VersionedValue;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.StateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.mesos.Protos;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A ZooKeeper-backed Mesos worker store.
+ */
+public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperMesosWorkerStore.class);
+
+   private final Object cacheLock = new Object();
--- End diff --

Why do we have to lock? Where does the concurrent access come from?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75120933
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler;
+
+import akka.actor.ActorRef;
+
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.SlaveLost;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import java.util.List;
+
+/**
+ * This class reacts to callbacks from the Mesos scheduler driver.
+ *
+ * In order to preserve actor concurrency safety, this class simply sends
+ * corresponding messages to the Mesos resource master actor.
+ *
+ * See 
https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html
+ */
+public class SchedulerProxy implements Scheduler {
+
+   /** The actor to which we report the callbacks */
+   private ActorRef mesosActor;
--- End diff --

final?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75121105
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler;
+
+import akka.actor.ActorRef;
+
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.Error;
--- End diff --

Twice the same import.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75121439
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/AcceptOffers.java
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler.messages;
+
+import org.apache.mesos.Protos;
+
+import java.util.Collection;
+
+/**
+ * Local message sent by the launch coordinator to the scheduler to accept 
offers.
+ */
+public class AcceptOffers {
--- End diff --

Would be good to make this class immutable by using final fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75121525
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/AcceptOffers.java
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler.messages;
+
+import org.apache.mesos.Protos;
+
+import java.util.Collection;
+
+/**
+ * Local message sent by the launch coordinator to the scheduler to accept 
offers.
+ */
+public class AcceptOffers {
+
+   private String hostname;
+   private Collection offerIds;
+   private Collection operations;
+   private Protos.Filters filters;
+
+   public AcceptOffers(String hostname, Collection 
offerIds, Collection operations) {
+   this.hostname = hostname;
+   this.offerIds = offerIds;
+   this.operations = operations;
+   this.filters = Protos.Filters.newBuilder().build();
+   }
+
+   public AcceptOffers(String hostname, Collection 
offerIds, Collection operations, Protos.Filters 
filters) {
+   this.hostname = hostname;
+   this.offerIds = offerIds;
+   this.operations = operations;
+   this.filters = filters;
+   }
+
+   public String hostname() {
--- End diff --

Scala getter style. Would be good to follow the Java style.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75121852
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/AcceptOffers.java
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler.messages;
+
+import org.apache.mesos.Protos;
+
+import java.util.Collection;
+
+/**
+ * Local message sent by the launch coordinator to the scheduler to accept 
offers.
+ */
+public class AcceptOffers {
--- End diff --

I think it makes sense to make all message classes immutable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75123394
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/AcceptOffers.java
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler.messages;
+
+import org.apache.mesos.Protos;
+
+import java.util.Collection;
+
+/**
+ * Local message sent by the launch coordinator to the scheduler to accept 
offers.
+ */
+public class AcceptOffers {
--- End diff --

Can these messages also be serializable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75124552
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java 
---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.util;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.router.Handler;
+import io.netty.handler.codec.http.router.Routed;
+import io.netty.handler.codec.http.router.Router;
+import io.netty.util.CharsetUtil;
+import org.jets3t.service.utils.Mimetypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpMethod.HEAD;
+import static io.netty.handler.codec.http.HttpResponseStatus.GONE;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+
+/**
+ * A generic Mesos artifact server, designed specifically for use by the 
Mesos Fetcher.
+ *
+ * More information:
+ * http://mesos.apache.org/documentation/latest/fetcher/
+ * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/
+ */
+public class MesosArtifactServer {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MesosArtifactServer.class);
+
+   private final Router router;
+
+   private ServerBootstrap bootstrap;
+
+   private Channel serverChannel;
+
+   private URL baseURL;
+
+   public MesosArtifactServer(String sessionID, String serverHostname, int 
configuredPort) throws Exception {
+   if (configuredPort < 0 || configuredPort > 0x) {
+   throw new IllegalArgumentException("File server port is 
invalid: " + configuredPort);
+   }
+
+   router = new Router();
+
+   ChannelInitializer initializer = new 
ChannelInitializer() {
+
+   @Override
+   protected void initChannel(SocketChannel ch) {
+   

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75124909
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java 
---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.util;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.router.Handler;
+import io.netty.handler.codec.http.router.Routed;
+import io.netty.handler.codec.http.router.Router;
+import io.netty.util.CharsetUtil;
+import org.jets3t.service.utils.Mimetypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpMethod.HEAD;
+import static io.netty.handler.codec.http.HttpResponseStatus.GONE;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+
+/**
+ * A generic Mesos artifact server, designed specifically for use by the 
Mesos Fetcher.
+ *
+ * More information:
+ * http://mesos.apache.org/documentation/latest/fetcher/
+ * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/
+ */
+public class MesosArtifactServer {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MesosArtifactServer.class);
+
+   private final Router router;
+
+   private ServerBootstrap bootstrap;
+
+   private Channel serverChannel;
+
+   private URL baseURL;
--- End diff --

Can these fields be final?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75126761
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java 
---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.util;
+
+import org.apache.mesos.MesosSchedulerDriver;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.Map;
+
+/**
+ * The typed configuration settings associated with a Mesos scheduler.
+ */
+public class MesosConfiguration {
+
+   private String masterUrl;
+
+   private Protos.FrameworkInfo.Builder frameworkInfo;
+
+   private Option credential = Option.empty();
+
+   public MesosConfiguration(
+   String masterUrl,
+   Protos.FrameworkInfo.Builder frameworkInfo,
+   Option credential) {
+
+   this.masterUrl = masterUrl;
+   this.frameworkInfo = frameworkInfo;
+   this.credential = credential;
--- End diff --

Maybe check not null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75126311
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java 
---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.util;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.router.Handler;
+import io.netty.handler.codec.http.router.Routed;
+import io.netty.handler.codec.http.router.Router;
+import io.netty.util.CharsetUtil;
+import org.jets3t.service.utils.Mimetypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpMethod.HEAD;
+import static io.netty.handler.codec.http.HttpResponseStatus.GONE;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+
+/**
+ * A generic Mesos artifact server, designed specifically for use by the 
Mesos Fetcher.
+ *
+ * More information:
+ * http://mesos.apache.org/documentation/latest/fetcher/
+ * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/
+ */
+public class MesosArtifactServer {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MesosArtifactServer.class);
+
+   private final Router router;
+
+   private ServerBootstrap bootstrap;
+
+   private Channel serverChannel;
+
+   private URL baseURL;
+
+   public MesosArtifactServer(String sessionID, String serverHostname, int 
configuredPort) throws Exception {
+   if (configuredPort < 0 || configuredPort > 0x) {
+   throw new IllegalArgumentException("File server port is 
invalid: " + configuredPort);
+   }
+
+   router = new Router();
+
+   ChannelInitializer initializer = new 
ChannelInitializer() {
+
+   @Override
+   protected void initChannel(SocketChannel ch) {
+   

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75128222
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.util;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.Configuration;
+
+public class ZooKeeperUtils {
+
+   /**
+* Starts a {@link CuratorFramework} instance and connects it to the 
given ZooKeeper
+* quorum.
+*
+* @param configuration {@link Configuration} object containing the 
configuration values
+* @return {@link CuratorFramework} instance
+*/
+   @SuppressWarnings("unchecked")
+   public static CuratorFramework startCuratorFramework(Configuration 
configuration) {
+
+   // workaround for shaded curator dependency of flink-runtime
--- End diff --

What is exactly the problem here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75128299
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.util;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.Configuration;
+
+public class ZooKeeperUtils {
+
+   /**
+* Starts a {@link CuratorFramework} instance and connects it to the 
given ZooKeeper
+* quorum.
+*
+* @param configuration {@link Configuration} object containing the 
configuration values
+* @return {@link CuratorFramework} instance
+*/
+   @SuppressWarnings("unchecked")
+   public static CuratorFramework startCuratorFramework(Configuration 
configuration) {
+
+   // workaround for shaded curator dependency of flink-runtime
+   Object client = 
org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(configuration);
--- End diff --

Why storing the client in an `Object` and then casting it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75130862
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.instance.InstanceConnectionInfo
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
+import org.apache.flink.runtime.memory.MemoryManager
+import org.apache.flink.runtime.taskmanager.{TaskManager, 
TaskManagerConfiguration}
+
+/** An extension of the TaskManager that listens for additional 
Mesos-related
+  * messages.
+  */
+class MesosTaskManager(
+   config: TaskManagerConfiguration,
--- End diff --

Formatting inconsistent with `MesosJobManager`. I usually try to indent 
Scala class parameter twice and extended classes once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75131308
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.instance.InstanceConnectionInfo
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
+import org.apache.flink.runtime.memory.MemoryManager
+import org.apache.flink.runtime.taskmanager.{TaskManager, 
TaskManagerConfiguration}
+
+/** An extension of the TaskManager that listens for additional 
Mesos-related
+  * messages.
--- End diff --

Which additional messages is the `MesosTaskManager` listening to?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75132701
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/ConnectionMonitor.scala
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import akka.actor.{Actor, FSM, Props}
+import grizzled.slf4j.Logger
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.mesos.scheduler.ConnectionMonitor._
+import org.apache.flink.mesos.scheduler.messages._
+
+import scala.concurrent.duration._
+
+/**
+  * Actively monitors the Mesos connection.
+  */
+class ConnectionMonitor() extends Actor with FSM[FsmState, Unit] {
--- End diff --

Does this class do anything else than logging the state transitions? 
Couldn't that also happen in the callbacks defined in 
`MesosFlinkResourceManager`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75133244
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala
 ---
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import akka.actor.{Actor, ActorRef, FSM, Props}
+import com.netflix.fenzo._
+import com.netflix.fenzo.functions.Action1
+import com.netflix.fenzo.plugins.VMLeaseObject
+import grizzled.slf4j.Logger
+import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.mesos.scheduler.LaunchCoordinator._
+import org.apache.flink.mesos.scheduler.messages._
+import org.apache.mesos.Protos.TaskInfo
+import org.apache.mesos.{SchedulerDriver, Protos}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{Map => MutableMap}
+import scala.concurrent.duration._
+
+/**
+  * The launch coordinator handles offer processing, including
+  * matching offers to tasks and making reservations.
+  *
+  * The coordinator uses Netflix Fenzo to optimize task placement.   
During the GatheringOffers phase,
--- End diff --

Any particular reason why you've chosen Fenzo as scheduler?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2381: [FLINK-4414][cluster management] Remove restriction on Rp...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2381
  
Thanks for your contribution @wenlong88. The PR mixes multiple things 
together. It would be better if you could separate these changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2381: [FLINK-4414][cluster management] Remove restrictio...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2381#discussion_r75140948
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 ---
@@ -197,11 +201,11 @@ public void stopService() {
}
 
@Override
-   public String getAddress(RpcGateway selfGateway) {
+   public String getAddress(RpcGateway gateway) {
checkState(!stopped, "RpcService is stopped");
 
-   if (selfGateway instanceof AkkaGateway) {
-   ActorRef actorRef = ((AkkaGateway) 
selfGateway).getRpcEndpoint();
+   if (gateway instanceof AkkaGateway) {
+   ActorRef actorRef = ((AkkaGateway) 
gateway).getActorRef();
return AkkaUtils.getAkkaURL(actorSystem, actorRef);
--- End diff --

This will be problematic if you pass in a local actor ref which has been 
created by a different actor system. Because then you will return a wrong 
remote address.

So for example, the following test would fail:
```
@Test
public void test() {
LinkedBlockingQueue linkedBlockingQueue = new 
LinkedBlockingQueue<>();
TestEndpoint testEndpoint = new 
TestEndpoint(akkaRpcService1,linkedBlockingQueue);
testEndpoint.start();

TestEndpoint testEndpoint2 = new TestEndpoint(akkaRpcService2, 
linkedBlockingQueue);
testEndpoint2.start();

assertEquals(testEndpoint2.getAddress(), 
akkaRpcService1.getAddress(testEndpoint2.getSelf()));
assertEquals(testEndpoint.getAddress(), 
akkaRpcService2.getAddress(testEndpoint.getSelf()));
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2381: [FLINK-4414][cluster management] Remove restriction on Rp...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2381
  
Maybe it makes more sense to extends `RpcGateway` interface and offer a 
getAddress method which returns the remote address of the gateway. We should 
also add tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75147221
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala
 ---
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import akka.actor.{Actor, ActorRef, FSM, Props}
+import com.netflix.fenzo._
+import com.netflix.fenzo.functions.Action1
+import com.netflix.fenzo.plugins.VMLeaseObject
+import grizzled.slf4j.Logger
+import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.mesos.scheduler.LaunchCoordinator._
+import org.apache.flink.mesos.scheduler.messages._
+import org.apache.mesos.Protos.TaskInfo
+import org.apache.mesos.{SchedulerDriver, Protos}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{Map => MutableMap}
+import scala.concurrent.duration._
+
+/**
+  * The launch coordinator handles offer processing, including
+  * matching offers to tasks and making reservations.
+  *
+  * The coordinator uses Netflix Fenzo to optimize task placement.   
During the GatheringOffers phase,
+  * offers are evaluated by Fenzo for suitability to the planned tasks.   
Reservations are then placed
+  * against the best offers, leading to revised offers containing reserved 
resources with which to launch task(s).
+  */
+class LaunchCoordinator(
+manager: ActorRef,
+config: Configuration,
+schedulerDriver: SchedulerDriver,
+optimizerBuilder: TaskSchedulerBuilder
+  ) extends Actor with FSM[TaskState, GatherData] {
+
+  val LOG = Logger(getClass)
+
+  /**
+* The task placement optimizer.
+*
+* The optimizer contains the following state:
+*  - unused offers
+*  - existing task placement (for fitness calculation involving task 
colocation)
+*/
+  private[mesos] val optimizer: TaskScheduler = {
+optimizerBuilder
+  .withLeaseRejectAction(new Action1[VirtualMachineLease]() {
+def call(lease: VirtualMachineLease) {
+  LOG.info(s"Declined offer ${lease.getId} from 
${lease.hostname()} of ${lease.memoryMB()} MB, ${lease.cpuCores()} cpus.")
+  schedulerDriver.declineOffer(lease.getOffer.getId)
+}
+  }).build
+  }
+
+  override def postStop(): Unit = {
+optimizer.shutdown()
+super.postStop()
+  }
+
+  /**
+* Initial state
+*/
+  startWith(Suspended, GatherData(tasks = Nil, newLeases = Nil))
+
+  /**
+* State: Suspended
+*
+* Wait for (re-)connection to Mesos.   No offers exist in this state, 
but outstanding tasks might.
+*/
+  when(Suspended) {
+case Event(msg: Connected, data: GatherData) =>
+  if(data.tasks.nonEmpty) goto(GatheringOffers)
+  else goto(Idle)
+  }
+
+  /**
+* State: Idle
+*
+* Wait for a task request to arrive, then transition into gathering 
offers.
+*/
+  onTransition {
+case _ -> Idle => assert(nextStateData.tasks.isEmpty)
+  }
+
+  when(Idle) {
+case Event(msg: Disconnected, data: GatherData) =>
+  goto(Suspended)
+
+case Event(offers: ResourceOffers, data: GatherData) =>
+  // decline any offers that come in
+  schedulerDriver.suppressOffers()
+  for(offer <- offers.offers().asScala) { 
schedulerDriver.declineOffer(offer.getId) }
+  stay()
+
+case Event(msg: Launch, data: GatherData) =>
+  goto(GatheringOffers) using data.copy(tasks = data.tasks ++ 
msg.tasks.asScala)
+  }
+
+  /**
+* Transition logic to control the flow of offers.
+*/
+  onTransition {
+case _ -> GatheringOffer

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75149938
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import akka.actor.{Actor, ActorRef, Props}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile
+import org.apache.flink.mesos.scheduler.TaskMonitor.{TaskGoalState, 
TaskGoalStateUpdated, TaskTerminated}
+import org.apache.flink.mesos.scheduler.Tasks._
+import org.apache.flink.mesos.scheduler.messages._
+import org.apache.mesos.{SchedulerDriver, Protos}
+
+import scala.collection.mutable.{Map => MutableMap}
+
+/**
+  * Aggregate of monitored tasks.
+  *
+  * Routes messages between the scheduler and individual task monitor 
actors.
+  */
+class Tasks[M <: TaskMonitor](
+ flinkConfig: Configuration,
+ schedulerDriver: SchedulerDriver,
+ taskMonitorClass: Class[M]) extends Actor {
+
+  /**
+* A map of task monitors by task ID.
+*/
+  private val taskMap: MutableMap[Protos.TaskID,ActorRef] = MutableMap()
+
+  /**
+* Cache of current connection state.
+*/
+  private var registered: Option[Any] = None
+
+  override def preStart(): Unit = {
+// TODO subscribe to context.system.deadLetters for messages to 
nonexistent tasks
--- End diff --

Can we resolve this TODO?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75150474
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import akka.actor.{Actor, ActorRef, Props}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile
+import org.apache.flink.mesos.scheduler.TaskMonitor.{TaskGoalState, 
TaskGoalStateUpdated, TaskTerminated}
+import org.apache.flink.mesos.scheduler.Tasks._
+import org.apache.flink.mesos.scheduler.messages._
+import org.apache.mesos.{SchedulerDriver, Protos}
+
+import scala.collection.mutable.{Map => MutableMap}
+
+/**
+  * Aggregate of monitored tasks.
+  *
+  * Routes messages between the scheduler and individual task monitor 
actors.
+  */
+class Tasks[M <: TaskMonitor](
+ flinkConfig: Configuration,
+ schedulerDriver: SchedulerDriver,
+ taskMonitorClass: Class[M]) extends Actor {
+
+  /**
+* A map of task monitors by task ID.
+*/
+  private val taskMap: MutableMap[Protos.TaskID,ActorRef] = MutableMap()
+
+  /**
+* Cache of current connection state.
+*/
+  private var registered: Option[Any] = None
+
+  override def preStart(): Unit = {
+// TODO subscribe to context.system.deadLetters for messages to 
nonexistent tasks
+  }
+
+  override def receive: Receive = {
+
+case msg: Disconnected =>
+  registered = None
+  context.actorSelection("*").tell(msg, self)
+
+case msg : Connected =>
+  registered = Some(msg)
+  context.actorSelection("*").tell(msg, self)
+
+case msg: TaskGoalStateUpdated =>
+  val taskID = msg.state.taskID
+
+  // ensure task monitor exists
+  if(!taskMap.contains(taskID)) {
+val actorRef = createTask(msg.state)
+registered.foreach(actorRef ! _)
+  }
+
+  taskMap(taskID) ! msg
+
+case msg: StatusUpdate =>
+  taskMap(msg.status().getTaskId) ! msg
+
+case msg: Reconcile =>
+  context.parent.forward(msg)
+
+case msg: TaskTerminated =>
+  context.parent.forward(msg)
+  }
+
+  private def createTask(task: TaskGoalState): ActorRef = {
+val actorProps = TaskMonitor.createActorProps(taskMonitorClass, 
flinkConfig, schedulerDriver, task)
--- End diff --

line is longer than 100 characters. This should cause a checkstyle 
violation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75151735
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework
+
+import java.util.concurrent.{TimeUnit, ExecutorService}
+
+import akka.actor.ActorRef
+
+import org.apache.flink.api.common.JobID
+import org.apache.flink.configuration.{Configuration => 
FlinkConfiguration, ConfigConstants}
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.clusterframework.ApplicationStatus
+import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.clusterframework.messages._
+import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, 
JobManager}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import 
org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, 
CurrentJobStatus, JobNotFound}
+import org.apache.flink.runtime.messages.Messages.Acknowledge
+import org.apache.flink.runtime.metrics.{MetricRegistry => 
FlinkMetricRegistry}
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+
+/** JobManager actor for execution on Yarn or Mesos. It enriches the 
[[JobManager]] with additional messages
+  * to start/administer/stop the session.
+  *
+  * @param flinkConfiguration Configuration object for the actor
+  * @param executorService Execution context which is used to execute 
concurrent tasks in the
+  * 
[[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param instanceManager Instance manager to manage the registered
+  *
[[org.apache.flink.runtime.taskmanager.TaskManager]]
+  * @param scheduler Scheduler to schedule Flink jobs
+  * @param libraryCacheManager Manager to manage uploaded jar files
+  * @param archive Archive for finished Flink jobs
+  * @param restartStrategyFactory Restart strategy to be used in case of a 
job recovery
+  * @param timeout Timeout for futures
+  * @param leaderElectionService LeaderElectionService to participate in 
the leader election
+  */
+abstract class ContaineredJobManager(
+  flinkConfiguration: FlinkConfiguration,
+  executorService: ExecutorService,
+  instanceManager: InstanceManager,
+  scheduler: FlinkScheduler,
+  libraryCacheManager: BlobLibraryCacheManager,
+  archive: ActorRef,
+  restartStrategyFactory: RestartStrategyFactory,
+  timeout: FiniteDuration,
+  leaderElectionService: LeaderElectionService,
+  submittedJobGraphs : SubmittedJobGraphStore,
+  checkpointRecoveryFactory : 
CheckpointRecoveryFactory,
+  savepointStore: SavepointStore,
+  jobRecoveryTimeout: FiniteDuration,
+  metricsRegistry: Option[FlinkMetricRegistry])
+  extends JobManager(
+flinkConfiguration,
+executorService,
+instanceManager,
+scheduler,
+libraryCacheManager,
+archive,
+restartStrategyFactory,
+timeout,
+leaderElectionService,
+submittedJobGraphs,
+

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75156694
  
--- Diff: flink-mesos/src/test/resources/log4j-test.properties ---
@@ -0,0 +1,32 @@

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+log4j.rootLogger=INFO, console
--- End diff --

log level should be `OFF`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1)

2016-08-17 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2315
  
Great work @EronWright. Code quality, testing and extent of this PR is 
really impressive :-) I think you've nicely modularized the Mesos resource 
manager with the additional actors. This makes testing much nicer :-)

I only had some minor comments and questions for my own understanding. 
Tomorrow I want to try it out on a Mesos cluster to see how it works.

There is only one thing I'm wondering. Since we're also currently working 
on Flip-6 where we try to put a new RPC abstraction in place in order to 
eventually remove Akka and Scala from flink-runtime, I wanted to ask whether 
you think that the FSM actors could also be replaced (sometime in the future) 
by something else? I totally agree that they are a nice abstraction for Mesos 
and allow to express the logic succinctly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2305: [FLINK-4271] [DataStreamAPI] Enable CoGroupedStreams and ...

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2305
  
+1 for this approach.

On Thu, Aug 18, 2016 at 6:59 AM, Jark  wrote:

> I'm ok with the with(...) approach. In addition, we should create a JIRA
> under [FLINK-3957]
> "Breaking API changes for Flink 2.0", to keep track of deprecating and
> changing the apply method return type.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/flink/pull/2305#issuecomment-240624274>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AFfXul4OQ2wEhcdMXC3_gqtZ-zqEIV6Kks5qg-bNgaJpZM4JW9go>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75301960
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
sysoutLogUpdates);
 
ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
-   
+
+   Future submissionFuture = Patterns.ask(
+   jobClientActor,
+   new 
JobClientMessages.SubmitJobAndWait(jobGraph),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   return new JobListeningContext(
+   jobGraph.getJobID(),
+   submissionFuture,
+   jobClientActor,
+   classLoader);
+   }
+
+
+   /**
+* Attaches to a running Job using the JobID.
+* Reconstructs the user class loader by downloading the jars from the 
JobManager.
+* @throws JobRetrievalException if anything goes wrong while 
retrieving the job
+*/
+   public static JobListeningContext attachToRunningJob(
+   JobID jobID,
+   ActorGateway jobManagerGateWay,
+   Configuration configuration,
+   ActorSystem actorSystem,
+   LeaderRetrievalService leaderRetrievalService,
+   FiniteDuration timeout,
+   boolean sysoutLogUpdates) throws JobRetrievalException {
+
+   checkNotNull(jobID, "The jobID must not be null.");
+   checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not 
be null.");
+   checkNotNull(configuration, "The configuration must not be 
null.");
+   checkNotNull(actorSystem, "The actorSystem must not be null.");
+   checkNotNull(leaderRetrievalService, "The jobManagerGateway 
must not be null.");
+   checkNotNull(timeout, "The timeout must not be null.");
+
+   // retrieve classloader first before doing anything
+   ClassLoader classloader;
+   try {
+   classloader = retrieveClassLoader(jobID, 
jobManagerGateWay, configuration, timeout);
--- End diff --

What if the `JobManager` has already changed at this point? We would no 
longer be able to retrieve the ClassLoader, wouldn't we?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75302372
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
sysoutLogUpdates);
 
ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
-   
+
+   Future submissionFuture = Patterns.ask(
+   jobClientActor,
+   new 
JobClientMessages.SubmitJobAndWait(jobGraph),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   return new JobListeningContext(
+   jobGraph.getJobID(),
+   submissionFuture,
+   jobClientActor,
+   classLoader);
+   }
+
+
+   /**
+* Attaches to a running Job using the JobID.
+* Reconstructs the user class loader by downloading the jars from the 
JobManager.
+* @throws JobRetrievalException if anything goes wrong while 
retrieving the job
+*/
+   public static JobListeningContext attachToRunningJob(
+   JobID jobID,
+   ActorGateway jobManagerGateWay,
+   Configuration configuration,
+   ActorSystem actorSystem,
+   LeaderRetrievalService leaderRetrievalService,
+   FiniteDuration timeout,
+   boolean sysoutLogUpdates) throws JobRetrievalException {
+
+   checkNotNull(jobID, "The jobID must not be null.");
+   checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not 
be null.");
+   checkNotNull(configuration, "The configuration must not be 
null.");
+   checkNotNull(actorSystem, "The actorSystem must not be null.");
+   checkNotNull(leaderRetrievalService, "The jobManagerGateway 
must not be null.");
+   checkNotNull(timeout, "The timeout must not be null.");
+
+   // retrieve classloader first before doing anything
+   ClassLoader classloader;
+   try {
+   classloader = retrieveClassLoader(jobID, 
jobManagerGateWay, configuration, timeout);
+   LOG.info("Reconstructed class loader for Job {}" , 
jobID);
+   } catch (Exception e) {
+   LOG.warn("Couldn't retrieve classloader for {}. Using 
system class loader", jobID, e);
+   classloader = JobClient.class.getClassLoader();
+   }
+
+   // we create a proxy JobClientActor that deals with all 
communication with
+   // the JobManager. It forwards the job submission, checks the 
success/failure responses, logs
+   // update messages, watches for disconnect between client and 
JobManager, ...
+   Props jobClientActorProps = 
JobClientActor.createJobClientActorProps(
+   leaderRetrievalService,
+   timeout,
+   sysoutLogUpdates);
+
+   ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
+
+   Future attachmentFuture = Patterns.ask(
+   jobClientActor,
+   new JobClientMessages.AttachToJobAndWait(jobID),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   return new JobListeningContext(
+   jobID,
+   attachmentFuture,
+   jobClientActor,
+   classloader);
+   }
+
+   /**
+* Reconstructs the class loader by first requesting information about 
it at the JobManager
+* and then downloading missing jar files.
+* @param jobID id of job
+* @param jobManager gateway to the JobManager
+* @param config the flink configuration
+* @param timeout timeout for querying the jobmanager
+* @return A classloader that should behave like the original 
classloader
+* @throws JobRetrievalException if anything goes wrong
+*/
+   public static ClassLoader retrieveClassLoader(
+   JobID jobID,
+   ActorGateway jobManager,
+   Configuration config,
+   FiniteDuration timeout)
+   throws JobRetrievalException {
+
+   final Object jmAnswer;
+   try {
+   jmAnswer = Await.result(
+   jobManager.ask(
+   new 
JobManagerMessages.Re

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75305261
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java 
---
@@ -198,10 +211,47 @@ else if (message instanceof SubmitJobAndWait) {
decorateMessage(new Status.Failure(new 
Exception(msg))), ActorRef.noSender());
}
}
+   else if (message instanceof AttachToJobAndWait) {
--- End diff --

At the moment the `JobClientActor` can be used to attach to a job and 
submit a new job at the same time. Then depending on the order in which the 
jobs terminate either the submitted job's or the attached job's result is 
reported back but the other future will never be completed. It might make sense 
to guard against this or to have two different JobClientActors based on the 
same base class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75305767
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala 
---
@@ -58,10 +62,63 @@ class JobInfo(
 }
   }
 
-  override def toString = s"JobInfo(client: $client ($listeningBehaviour), 
start: $start)"
+
+  /**
+* Notifies all clients by sending a message
+* @param message the message to send
+*/
+  def notifyClients(message: Any) = {
+clients foreach {
+  case (clientActor, _) =>
+clientActor ! message
+}
+  }
+
+  /**
+* Notifies all clients which are not of type detached
+* @param message the message to sent to non-detached clients
+*/
+  def notifyNonDetachedClients(message: Any) = {
+clients foreach {
+  case (clientActor, ListeningBehaviour.DETACHED) =>
+// do nothing
+  case (clientActor, _) =>
+clientActor ! message
+}
+  }
+
+  /**
+* Sends a message to job clients that match the listening behavior
+* @param message the message to send to all clients
+* @param listeningBehaviour the desired listening behaviour
+*/
+  def notifyClients(message: Any, listeningBehaviour: ListeningBehaviour) 
= {
+clients foreach {
+  case (clientActor, `listeningBehaviour`) =>
+clientActor ! message
+  case _ =>
+}
+  }
 
   def setLastActive() =
 lastActive = System.currentTimeMillis()
+
+
+  override def toString = s"JobInfo(clients: ${clients.toString()}, start: 
$start)"
+
+  override def equals(other: Any): Boolean = other match {
+case that: JobInfo =>
+  this.isInstanceOf[JobInfo] &&
--- End diff --

Why do we need this check here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75307091
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
 ---
@@ -100,13 +101,51 @@ public void testSubmissionTimeout() throws Exception {
Await.result(jobExecutionResult, timeout);
}
 
+
+   /** Tests that a {@link JobClientActorRegistrationTimeoutException} is 
thrown when the registration
+* cannot be performd at the JobManager by the JobClientActor. This is 
here the case, because the
+* started JobManager never replies to a {@link RegisterJobClient} 
message.
+*/
+   @Test(expected=JobClientActorRegistrationTimeoutException.class)
+   public void testRegistrationTimeout() throws Exception {
+   FiniteDuration jobClientActorTimeout = new FiniteDuration(5, 
TimeUnit.SECONDS);
+   FiniteDuration timeout = jobClientActorTimeout.$times(2);
+
+   UUID leaderSessionID = UUID.randomUUID();
+
+   ActorRef jobManager = system.actorOf(
+   Props.create(
+   PlainActor.class,
+   leaderSessionID));
+
+   TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+   jobManager.path().toString(),
+   leaderSessionID
+   );
+
+   Props jobClientActorProps = 
JobClientActor.createJobClientActorProps(
+   testingLeaderRetrievalService,
+   jobClientActorTimeout,
+   false);
+
+   ActorRef jobClientActor = system.actorOf(jobClientActorProps);
+
+
--- End diff --

Two line breaks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   5   6   7   8   9   10   >