[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2365#discussion_r74737756 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java --- @@ -25,33 +25,42 @@ import org.apache.flink.runtime.rpc.MainThreadExecutor; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.akka.messages.CallAsync; +import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation; +import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation; import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation; import org.apache.flink.runtime.rpc.akka.messages.RunAsync; import org.apache.flink.util.Preconditions; +import org.apache.log4j.Logger; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; import java.lang.annotation.Annotation; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.util.BitSet; import java.util.concurrent.Callable; /** - * Invocation handler to be used with a {@link AkkaRpcActor}. The invocation handler wraps the - * rpc in a {@link RpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is + * Invocation handler to be used with an {@link AkkaRpcActor}. The invocation handler wraps the + * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is * executed. */ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor { + private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class); + private final ActorRef rpcServer; --- End diff -- Good point, will change it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2365#discussion_r74737803 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RemoteRpcInvocation.java --- @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.messages; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +/** + * Remote rpc invocation message which is used when the actor communication is remote and, thus, the + * message has to be serialized. + * + * In order to fail fast and report an appropriate error message to the user, the method name, the + * parameter types and the arguments are eagerly serialized. In case the the invocation call + * contains a non-serializable object, then an {@link IOException} is thrown. + */ +public class RemoteRpcInvocation implements RpcInvocation, Serializable { + private static final long serialVersionUID = 6179354390913843809L; + + // Serialized invocation data + private final SerializedValue serializedMethodInvocation; + + // Transient field which is lazily initialized upon first access to the invocation data + private transient RemoteRpcInvocation.MethodInvocation methodInvocation; + + public RemoteRpcInvocation( + final String methodName, + final Class[] parameterTypes, + final Object[] args) throws IOException { + + serializedMethodInvocation = new SerializedValue<>(new RemoteRpcInvocation.MethodInvocation(methodName, parameterTypes, args)); --- End diff -- You're right. Will add it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2365#discussion_r74737963 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java --- @@ -25,33 +25,42 @@ import org.apache.flink.runtime.rpc.MainThreadExecutor; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.akka.messages.CallAsync; +import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation; +import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation; import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation; import org.apache.flink.runtime.rpc.akka.messages.RunAsync; import org.apache.flink.util.Preconditions; +import org.apache.log4j.Logger; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; import java.lang.annotation.Annotation; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.util.BitSet; import java.util.concurrent.Callable; /** - * Invocation handler to be used with a {@link AkkaRpcActor}. The invocation handler wraps the - * rpc in a {@link RpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is + * Invocation handler to be used with an {@link AkkaRpcActor}. The invocation handler wraps the + * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is * executed. */ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor { + private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class); + private final ActorRef rpcServer; // default timeout for asks private final Timeout timeout; - AkkaInvocationHandler(ActorRef rpcServer, Timeout timeout) { + private final long maximumFramesize; + + AkkaInvocationHandler(ActorRef rpcServer, Timeout timeout, long maximumFramesize) { --- End diff -- `isLocalActorRef` should actually be quite cheap since it simply checks whether the host option of the actor's address is defined or not. But, anyway, it makes sense to check it once and store the result. Will add the change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc invocatio...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2365 Thanks for the review @StephanEwen. I've addressed your comments. Once Travis gives green light, I'll merge it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2370#discussion_r74768392 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import java.io.Serializable; + +public class ResourceProfile implements Serializable { --- End diff -- Java docs are missing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2370#discussion_r74768700 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import java.io.Serializable; + +public class ResourceProfile implements Serializable { + + private static final long serialVersionUID = -784900073893060124L; + + /** How many cpu cores are needed, use double so we can specify cpu like 0.1 */ + private final double cpuCores; + + /** How many memory in mb are needed */ + private final long memoryInMB; --- End diff -- It might be helpful to have getter methods for the fields. The RM can then use the information from the ResourceProfile to create container requests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2370#discussion_r74768989 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class SlotID implements ResourceIDRetrievable, Serializable { --- End diff -- Java docs missing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2370#discussion_r74769608 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import java.io.Serializable; + +public class ResourceProfile implements Serializable { + + private static final long serialVersionUID = -784900073893060124L; + + /** How many cpu cores are needed, use double so we can specify cpu like 0.1 */ + private final double cpuCores; + + /** How many memory in mb are needed */ + private final long memoryInMB; + + public ResourceProfile(double cpuCores, long memoryInMB) { + this.cpuCores = cpuCores; + this.memoryInMB = memoryInMB; + } + + /** +* Check whether required resource profile can be matched +* +* @param required the required resource profile +* @return true if the requirement is matched, otherwise false +*/ + public boolean matchRequirement(ResourceProfile required) { --- End diff -- Maybe we could rename this method to `isMatching` or `isMatchingRequirement`. Starting these kind of methods with a question indicates that it does not change the internal state but checks for a certain condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2370#discussion_r74769810 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import java.io.Serializable; + +public class ResourceProfile implements Serializable { + + private static final long serialVersionUID = -784900073893060124L; + + /** How many cpu cores are needed, use double so we can specify cpu like 0.1 */ + private final double cpuCores; + + /** How many memory in mb are needed */ + private final long memoryInMB; + + public ResourceProfile(double cpuCores, long memoryInMB) { + this.cpuCores = cpuCores; + this.memoryInMB = memoryInMB; + } + + /** +* Check whether required resource profile can be matched +* +* @param required the required resource profile +* @return true if the requirement is matched, otherwise false +*/ + public boolean matchRequirement(ResourceProfile required) { + return Double.compare(cpuCores, required.cpuCores) >= 0 && memoryInMB >= required.memoryInMB; --- End diff -- Maybe we could create a getter method for `memoryInMB`. Then we don't have to access the private fields of another instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2370#discussion_r74769869 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import java.io.Serializable; + +public class ResourceProfile implements Serializable { + + private static final long serialVersionUID = -784900073893060124L; + + /** How many cpu cores are needed, use double so we can specify cpu like 0.1 */ + private final double cpuCores; + + /** How many memory in mb are needed */ + private final long memoryInMB; + + public ResourceProfile(double cpuCores, long memoryInMB) { + this.cpuCores = cpuCores; + this.memoryInMB = memoryInMB; + } + + /** +* Check whether required resource profile can be matched +* +* @param required the required resource profile +* @return true if the requirement is matched, otherwise false +*/ + public boolean matchRequirement(ResourceProfile required) { + return Double.compare(cpuCores, required.cpuCores) >= 0 && memoryInMB >= required.memoryInMB; --- End diff -- The same would then apply to `cpuCores` as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2370#discussion_r74770025 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class SlotID implements ResourceIDRetrievable, Serializable { + + private static final long serialVersionUID = -6399206032549807771L; + + /** The resource id which this slot located */ + private final ResourceID resourceId; + + /** The numeric id for single slot */ + private final int slotId; + + public SlotID(ResourceID resourceId, int slotId) { + checkNotNull(resourceId, "ResourceID must not be null"); + this.resourceId = resourceId; --- End diff -- You can write: `this.resourceId = checkNotNull(resourceId, "")` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2370: [FLINK-4373] [cluster management] Introduce SlotID, Alloc...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2370 Thanks for you contribution @KurtYoung. Good work! I only had some minor comments. Once they are addressed it's good to be merged :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc invocatio...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2365 Somehow this PR was not closed automatically. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...
Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/2365 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2358: [FLINK-4382] Buffer rpc calls until the RpcEndpoint has b...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2358 The stashing is a feature which comes for free with Akka. Of course the sending side should not rely on this behaviour when implementing its failure treatment. But in cases where we have a race condition between the start-up of the `RpcEndpoint` and a `RpcGateway` it should result into a slightly more forgiving receiver behaviour. I will rebase the PR and if Travis gives green light I would like to merge this PR to the flip-6 feature branch if nobody objects. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2370: [FLINK-4373] [cluster management] Introduce SlotID...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2370#discussion_r74785154 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java --- @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import org.apache.flink.util.AbstractID; + +/** + * Unique identifier for the attempt to allocate a slot, normally created by JobManager when requesting a slot, + * constant across re-tries. This can also be used to identify responses by the ResourceManager and to identify + * deployment calls towards the TaskManager that was allocated from. + */ +public class AllocationID extends AbstractID { + + private static final long serialVersionUID = 1L; + + /** +* Creates an new random allocation ID. +*/ + public AllocationID() { + super(); + } --- End diff -- This constructor is the standard default constructor, right? I guess we don't need to specify this constructor then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2353: [FLINK-4355] [cluster management] Implement TaskMa...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2353#discussion_r74800246 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/TaskExecutorRegistrationResponse.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.resourcemanager; + +import org.apache.flink.runtime.instance.InstanceID; + +/** + * Base class for responses from the ResourceManager to a registration attempt by a + * TaskExecutor. + */ +public abstract class TaskExecutorRegistrationResponse { + + /** +* Successful registration. +*/ + public static final class Success extends TaskExecutorRegistrationResponse { + + private final InstanceID registrationId; + + private final long heartbeatInterval; + + public Success(InstanceID registrationId, long heartbeatInterval) { + this.registrationId = registrationId; + this.heartbeatInterval = heartbeatInterval; + } + + public InstanceID getRegistrationId() { + return registrationId; + } + + public long getHeartbeatInterval() { + return heartbeatInterval; + } + + @Override + public String toString() { + return "TaskExecutorRegistrationSuccess (" + registrationId + " / " + heartbeatInterval + ')'; + } + } + + // + + /** +* A rejected (declined) registration. +*/ + public static final class Decline extends TaskExecutorRegistrationResponse { + + private final String reason; + + public Decline(String reason) { + this.reason = reason; + } + + public String getReason() { + return reason; + } + + @Override + public String toString() { + return "TaskExecutorRegistrationDecline (" + reason + ')'; + } + } +} + + + + + + + --- End diff -- Many line breaks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2353: [FLINK-4355] [cluster management] Implement TaskMa...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2353#discussion_r74802315 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/ResourceManagerRegistration.java --- @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.taskexecutor; + +import akka.dispatch.OnFailure; +import akka.dispatch.OnSuccess; + +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rpc.resourcemanager.TaskExecutorRegistrationResponse; + +import org.slf4j.Logger; + +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + + +/** + * This utility class handles the registration of the TaskExecutor at the ResourceManager. + * It implements the initial address resolution and the retries-with-backoff strategy. + * + * The registration process notifies its {@link TaskExecutorToResourceManagerConnection} + * upon successful registration. The registration can be canceled, for example when the + * ResourceManager that it tries to register at looses leader status. + * + * Implementation note: This class does not act like a single-threaded actor. + * It holds only constant state and passes all variable state via stack and closures. + */ +public class ResourceManagerRegistration { + + // + // configuration constants + // + + private static final long INITIAL_REGISTRATION_TIMEOUT_MILLIS = 100; + + private static final long MAX_REGISTRATION_TIMEOUT_MILLIS = 3; + + private static final long ERROR_REGISTRATION_DELAY_MILLIS = 1; + + private static final long REFUSED_REGISTRATION_DELAY_MILLIS = 3; + + // + + private final Logger log; + + private final TaskExecutorToResourceManagerConnection connection; + + private final TaskExecutor taskExecutor; + + private final String resourceManagerAddress; + + private final UUID resourceManagerLeaderId; + + private volatile boolean canceled; + + public ResourceManagerRegistration( + Logger log, + TaskExecutorToResourceManagerConnection connection, + TaskExecutor taskExecutor, + String resourceManagerAddress, + UUID resourceManagerLeaderId) { + + this.log = checkNotNull(log); + this.connection = checkNotNull(connection); + this.taskExecutor = checkNotNull(taskExecutor); + this.resourceManagerAddress = checkNotNull(resourceManagerAddress); + this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId); + } + + // + // cancellation + // + + /** +* Cancels the registration procedure. +*/ + public void cancel() { + canceled = true; + } + + /** +* Checks if the registration was canceled. +* @return True if the registration was canceled, false otherwise. +*/ + public boolean isCanceled() { + return canceled; + } + + //
[GitHub] flink pull request #2353: [FLINK-4355] [cluster management] Implement TaskMa...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2353#discussion_r74803435 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.taskexecutor; + +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; +import org.slf4j.Logger; + +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +public class TaskExecutorToResourceManagerConnection { + + private final Logger log; + + private final TaskExecutor taskExecutor; + + private final UUID resourceManagerLeaderId; + + private final String resourceManagerAddress; + + private ResourceManagerRegistration pendingRegistration; + + private ResourceManagerGateway registeredResourceManager; + + private InstanceID registrationId; + + private volatile boolean closed; + + + public TaskExecutorToResourceManagerConnection( + Logger log, --- End diff -- I'm not so sure whether passing a logger from a different context makes the logging statements easier or harder to understand. Usually I have troubles to locate the logging statements (and thus the state of the computation) when I see logging statements logged from a different (wider) scope than they actually happened. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2353: [FLINK-4355] [cluster management] Implement TaskManager s...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2353 Really good code @StephanEwen :-) I like it a lot that you've pulled out the registration logic from the `TaskExecutor`. This makes it much cleaner and easier to test. I'm actually wondering whether we can actually take it a step further. What I've noticed is that we have to pass the `TaskExecutor` to different places to have access to the its state and to run things in the main thread context. I'm not sure whether this is really necessary. The registration information should be more or less constant and the main thing which has to happen in the main thread context is the setting of the connected gateway. So wouldn't it be possible that the `ResourceManagerRegistration` is something like a cancelable future which is instantiated with all necessary information. It performs the registration logic completely in the background and once it is completed we can create a `TaskExecutorToResourceManagerConnection` in the main thread context. Whenever we have to start a new registration, we cancel the future (if there is still another registration running). Pulling out the `TaskExecutor` from the registration process has the advantage that the components are even further separated and, thus, easier to test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2370: [FLINK-4373] [cluster management] Introduce SlotID, Alloc...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2370 Failing test cases are unrelated. Will merge this PR. Thanks for your contribution @KurtYoung. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2371: [FLINK-4309] Potential null pointer dereference in...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2371#discussion_r74893388 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java --- @@ -178,14 +178,19 @@ public String toString() { @Override public Set keySet() { + final HashSet set = new HashSet(); - final int prefixLen = this.prefix == null ? 0 : this.prefix.length(); for (String key : this.backingConfig.keySet()) { - if (key.startsWith(this.prefix)) { + + if (this.prefix == null) { --- End diff -- Can we move the condition out of the loop? The condition does not change over different iteration so we don't have to perform it every time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2371: [FLINK-4309] Potential null pointer dereference in Delega...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2371 Thanks for your contribution @tsunny. Your changes look good. I only had a minor comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2370: [FLINK-4373] [cluster management] Introduce SlotID, Alloc...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2370 Not automatically closed by the asfgit bot. @KurtYoung could you close this PR manually? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74937306 --- Diff: flink-dist/pom.xml --- @@ -113,8 +113,13 @@ under the License. flink-metrics-jmx ${project.version} + + + org.apache.flink + flink-mesos_2.10 + ${project.version} + --- End diff -- We might introduce an "include-mesos" profile where we add this dependency to flink-dist. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74937890 --- Diff: flink-mesos/pom.xml --- @@ -0,0 +1,294 @@ + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + 4.0.0 + + + org.apache.flink + flink-parent + 1.1-SNAPSHOT + .. + + + flink-mesos_2.10 + flink-mesos + jar + + +0.27.1 + + + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + + + hadoop-core + org.apache.hadoop + + + + + + org.apache.flink + flink-clients_2.10 + ${project.version} + + + + org.apache.flink + ${shading-artifact.name} + ${project.version} + + + + org.apache.flink + flink-test-utils_2.10 + ${project.version} + test-jar + test + + + + + com.typesafe.akka + akka-actor_${scala.binary.version} + + + + com.typesafe.akka + akka-remote_${scala.binary.version} + + + + com.typesafe.akka + akka-slf4j_${scala.binary.version} + + + + com.typesafe.akka + akka-testkit_${scala.binary.version} + + + --- End diff -- Can we remove this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74938223 --- Diff: flink-mesos/pom.xml --- @@ -0,0 +1,294 @@ + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + 4.0.0 + + + org.apache.flink + flink-parent + 1.1-SNAPSHOT + .. + + + flink-mesos_2.10 + flink-mesos + jar + + +0.27.1 + + + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + + + hadoop-core + org.apache.hadoop + + + + + + org.apache.flink + flink-clients_2.10 + ${project.version} + + + + org.apache.flink + ${shading-artifact.name} + ${project.version} + + + + org.apache.flink + flink-test-utils_2.10 + ${project.version} + test-jar + test + + + + + com.typesafe.akka + akka-actor_${scala.binary.version} + + + + com.typesafe.akka + akka-remote_${scala.binary.version} + + + + com.typesafe.akka + akka-slf4j_${scala.binary.version} + + + + com.typesafe.akka + akka-testkit_${scala.binary.version} + + + + + + com.google.guava + guava + ${guava.version} + --- End diff -- Are you using Guava or is this a dependency for Mesos? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74938631 --- Diff: flink-mesos/pom.xml --- @@ -0,0 +1,294 @@ + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + 4.0.0 + + + org.apache.flink + flink-parent + 1.1-SNAPSHOT + .. + + + flink-mesos_2.10 + flink-mesos + jar + + +0.27.1 + + + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + + + hadoop-core + org.apache.hadoop + + + + + + org.apache.flink + flink-clients_2.10 + ${project.version} + + + + org.apache.flink + ${shading-artifact.name} + ${project.version} + + + + org.apache.flink + flink-test-utils_2.10 + ${project.version} + test-jar + test + + + + + com.typesafe.akka + akka-actor_${scala.binary.version} + + + + com.typesafe.akka + akka-remote_${scala.binary.version} + + + + com.typesafe.akka + akka-slf4j_${scala.binary.version} + + + + com.typesafe.akka + akka-testkit_${scala.binary.version} + + + + + + com.google.guava + guava + ${guava.version} + + + + org.apache.curator + curator-test + ${curator.version} + test --- End diff -- It might make sense to add this dependency to the dependency management section of the parent pom since also other modules use it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74940821 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.cli; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.Configuration; + +import java.io.IOException; +import java.util.Map; + +public class FlinkMesosSessionCli { --- End diff -- Java docs are missing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74941466 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import com.netflix.fenzo.ConstraintEvaluator; +import com.netflix.fenzo.TaskAssignmentResult; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.VMTaskFitnessCalculator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.mesos.Protos; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.mesos.Utils.variable; +import static org.apache.flink.mesos.Utils.range; +import static org.apache.flink.mesos.Utils.ranges; +import static org.apache.flink.mesos.Utils.scalar; + +/** + * Specifies how to launch a Mesos worker. + */ +public class LaunchableMesosWorker implements LaunchableTask { + + /** +* The set of configuration keys to be dynamically configured with a port allocated from Mesos. +*/ + private static String[] TM_PORT_KEYS = { + "taskmanager.rpc.port", + "taskmanager.data.port" }; + + private final MesosTaskManagerParameters params; + private final Protos.TaskInfo.Builder template; + private final Protos.TaskID taskID; + private final Request taskRequest; + + /** +* Construct a launchable Mesos worker. +* @param params the TM parameters such as memory, cpu to acquire. +* @param template a template for the TaskInfo to be constructed at launch time. +* @param taskID the taskID for this worker. + */ + public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) { + this.params = params; + this.template = template; + this.taskID = taskID; + this.taskRequest = new Request(); + } + + public Protos.TaskID taskID() { + return taskID; + } + + @Override + public TaskRequest taskRequest() { + return taskRequest; + } + + class Request implements TaskRequest { + private final AtomicReference assignedResources = new AtomicReference<>(); + + @Override + public String getId() { + return taskID.getValue(); + } + + @Override + public String taskGroupName() { + return ""; + } + + @Override + public double getCPUs() { + return params.cpus(); + } + + @Override + public double getMemory() { + return params.containeredParameters().taskManagerTotalMemoryMB(); + } + + @Override + public double getNetworkMbps() { + return 0.0; + } + + @Override + public double getDisk() { + return 0.0; + } + + @Override + public int getPorts() { + return TM_PORT_KEYS.length; + } + + @Override + public Map getCustomNamedResources() { + return Collections.emptyMap(); + } + + @Override + public List getHardConstraints() { + return null; + } + + @
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74941614 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import com.netflix.fenzo.ConstraintEvaluator; +import com.netflix.fenzo.TaskAssignmentResult; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.VMTaskFitnessCalculator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.mesos.Protos; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.mesos.Utils.variable; +import static org.apache.flink.mesos.Utils.range; +import static org.apache.flink.mesos.Utils.ranges; +import static org.apache.flink.mesos.Utils.scalar; + +/** + * Specifies how to launch a Mesos worker. + */ +public class LaunchableMesosWorker implements LaunchableTask { + + /** +* The set of configuration keys to be dynamically configured with a port allocated from Mesos. +*/ + private static String[] TM_PORT_KEYS = { + "taskmanager.rpc.port", + "taskmanager.data.port" }; + + private final MesosTaskManagerParameters params; + private final Protos.TaskInfo.Builder template; + private final Protos.TaskID taskID; + private final Request taskRequest; + + /** +* Construct a launchable Mesos worker. +* @param params the TM parameters such as memory, cpu to acquire. +* @param template a template for the TaskInfo to be constructed at launch time. +* @param taskID the taskID for this worker. + */ + public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) { + this.params = params; + this.template = template; + this.taskID = taskID; + this.taskRequest = new Request(); + } + + public Protos.TaskID taskID() { + return taskID; + } + + @Override + public TaskRequest taskRequest() { + return taskRequest; + } + + class Request implements TaskRequest { + private final AtomicReference assignedResources = new AtomicReference<>(); + + @Override + public String getId() { + return taskID.getValue(); + } + + @Override + public String taskGroupName() { + return ""; + } + + @Override + public double getCPUs() { + return params.cpus(); + } + + @Override + public double getMemory() { + return params.containeredParameters().taskManagerTotalMemoryMB(); + } + + @Override + public double getNetworkMbps() { + return 0.0; + } + + @Override + public double getDisk() { + return 0.0; + } + + @Override + public int getPorts() { + return TM_PORT_KEYS.length; + } + + @Override + public Map getCustomNamedResources() { + return Collections.emptyMap(); + } + + @Override + public List getHardConstraints() { + return null; + } + + @
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74941852 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import com.netflix.fenzo.ConstraintEvaluator; +import com.netflix.fenzo.TaskAssignmentResult; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.VMTaskFitnessCalculator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.mesos.Protos; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.mesos.Utils.variable; +import static org.apache.flink.mesos.Utils.range; +import static org.apache.flink.mesos.Utils.ranges; +import static org.apache.flink.mesos.Utils.scalar; + +/** + * Specifies how to launch a Mesos worker. + */ +public class LaunchableMesosWorker implements LaunchableTask { + + /** +* The set of configuration keys to be dynamically configured with a port allocated from Mesos. +*/ + private static String[] TM_PORT_KEYS = { + "taskmanager.rpc.port", + "taskmanager.data.port" }; + + private final MesosTaskManagerParameters params; + private final Protos.TaskInfo.Builder template; + private final Protos.TaskID taskID; + private final Request taskRequest; + + /** +* Construct a launchable Mesos worker. +* @param params the TM parameters such as memory, cpu to acquire. +* @param template a template for the TaskInfo to be constructed at launch time. +* @param taskID the taskID for this worker. + */ + public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) { + this.params = params; + this.template = template; + this.taskID = taskID; + this.taskRequest = new Request(); + } + + public Protos.TaskID taskID() { + return taskID; + } + + @Override + public TaskRequest taskRequest() { + return taskRequest; + } + + class Request implements TaskRequest { + private final AtomicReference assignedResources = new AtomicReference<>(); + + @Override + public String getId() { + return taskID.getValue(); + } + + @Override + public String taskGroupName() { + return ""; + } + + @Override + public double getCPUs() { + return params.cpus(); + } + + @Override + public double getMemory() { + return params.containeredParameters().taskManagerTotalMemoryMB(); + } + + @Override + public double getNetworkMbps() { + return 0.0; + } + + @Override + public double getDisk() { + return 0.0; + } + + @Override + public int getPorts() { + return TM_PORT_KEYS.length; + } + + @Override + public Map getCustomNamedResources() { + return Collections.emptyMap(); + } + + @Override + public List getHardConstraints() { + return null; + } + + @
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74938853 --- Diff: flink-mesos/pom.xml --- @@ -0,0 +1,294 @@ + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + 4.0.0 + + + org.apache.flink + flink-parent + 1.1-SNAPSHOT + .. + + + flink-mesos_2.10 + flink-mesos + jar + + +0.27.1 + + + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + + + hadoop-core + org.apache.hadoop + + + + + + org.apache.flink + flink-clients_2.10 + ${project.version} + + + + org.apache.flink + ${shading-artifact.name} + ${project.version} + + + + org.apache.flink + flink-test-utils_2.10 + ${project.version} + test-jar + test + --- End diff -- Maybe group all test dependencies under the test structural comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74947651 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74948586 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74948792 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74949114 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74947914 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74949669 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74948145 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74950002 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74950095 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74950716 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR
[GitHub] flink pull request #2358: [FLINK-4382] Buffer rpc calls until the RpcEndpoin...
Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/2358 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74963589 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNe
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74963941 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNe
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74964722 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNe
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74965913 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNe
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74971461 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNe
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74972631 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNe
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74972977 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; + +import static java.util.Objects.requireNonNull; + +public class MesosTaskManagerParameters { --- End diff -- Java docs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75080549 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNe
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75080685 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNe
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75080823 --- Diff: flink-dist/pom.xml --- @@ -113,8 +113,13 @@ under the License. flink-metrics-jmx ${project.version} + + + org.apache.flink + flink-mesos_2.10 + ${project.version} + --- End diff -- We do the same for YARN. See the `include-yarn` profile. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #:
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/commit/68addf39e0e5f9e1656818f923be362680ed93b0#commitcomment-18669730 Really nice abstraction @StephanEwen ð . I like it a lot :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r7518 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75113276 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.util.Map; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; + +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The entry point for running a TaskManager in a Mesos container. + */ +public class MesosTaskManagerRunner { + + private static final Logger LOG = LoggerFactory.getLogger(MesosTaskManagerRunner.class); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + public static void runTaskManager(String[] args, final Class taskManager) throws IOException { + EnvironmentInformation.logEnvironmentInfo(LOG, taskManager.getSimpleName(), args); + org.apache.flink.runtime.util.SignalHandler.register(LOG); + + // try to parse the command line arguments + final Configuration configuration; + try { + configuration = TaskManager.parseArgsAndLoadConfig(args); + + // add dynamic properties to TaskManager configuration. + final Configuration dynamicProperties = + FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES)); + LOG.debug("Mesos dynamic properties: {}", dynamicProperties); + configuration.addAll(dynamicProperties); + } + catch (Throwable t) { + LOG.error(t.getMessage(), t); + System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE()); + return; + } + + // read the environment variables + final Map envs = System.getenv(); + final String effectiveUsername = envs.get(MesosConfigKeys.ENV_CLIENT_USERNAME); + final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR); + + // configure local directory + String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null); + if (flinkTempDirs != null) { + LOG.info("Overriding Mesos temporary file directories with those " + + "specified in the Flink config: " + flinkTempDirs); + } + else if (tmpDirs != null) { + LOG.info("Setting directories for temporary files to: " + tmpDirs); --- End diff -- `{}`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75113239 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.util.Map; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; + +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The entry point for running a TaskManager in a Mesos container. + */ +public class MesosTaskManagerRunner { + + private static final Logger LOG = LoggerFactory.getLogger(MesosTaskManagerRunner.class); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + public static void runTaskManager(String[] args, final Class taskManager) throws IOException { + EnvironmentInformation.logEnvironmentInfo(LOG, taskManager.getSimpleName(), args); + org.apache.flink.runtime.util.SignalHandler.register(LOG); + + // try to parse the command line arguments + final Configuration configuration; + try { + configuration = TaskManager.parseArgsAndLoadConfig(args); + + // add dynamic properties to TaskManager configuration. + final Configuration dynamicProperties = + FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES)); + LOG.debug("Mesos dynamic properties: {}", dynamicProperties); + configuration.addAll(dynamicProperties); + } + catch (Throwable t) { + LOG.error(t.getMessage(), t); --- End diff -- Maybe we could output a different error message because the throwable's message will be output anyway. Something like "failed loading the configuration" or so. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75113310 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.util.Map; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; + +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The entry point for running a TaskManager in a Mesos container. + */ +public class MesosTaskManagerRunner { + + private static final Logger LOG = LoggerFactory.getLogger(MesosTaskManagerRunner.class); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + public static void runTaskManager(String[] args, final Class taskManager) throws IOException { + EnvironmentInformation.logEnvironmentInfo(LOG, taskManager.getSimpleName(), args); + org.apache.flink.runtime.util.SignalHandler.register(LOG); + + // try to parse the command line arguments + final Configuration configuration; + try { + configuration = TaskManager.parseArgsAndLoadConfig(args); + + // add dynamic properties to TaskManager configuration. + final Configuration dynamicProperties = + FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES)); + LOG.debug("Mesos dynamic properties: {}", dynamicProperties); + configuration.addAll(dynamicProperties); + } + catch (Throwable t) { + LOG.error(t.getMessage(), t); + System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE()); + return; + } + + // read the environment variables + final Map envs = System.getenv(); + final String effectiveUsername = envs.get(MesosConfigKeys.ENV_CLIENT_USERNAME); + final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR); + + // configure local directory + String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null); + if (flinkTempDirs != null) { + LOG.info("Overriding Mesos temporary file directories with those " + + "specified in the Flink config: " + flinkTempDirs); + } + else if (tmpDirs != null) { + LOG.info("Setting directories for temporary files to: " + tmpDirs); + configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs); + } + + LOG.info("Mesos task runs as '" + UserGroupInformation.getCurrentUser().getShortUserName() + + "', setting user to execute Flink TaskManager to '" + effectiveUsername + "'"); --- End diff -- `{}` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well.
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75113264 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.util.Map; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; + +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The entry point for running a TaskManager in a Mesos container. + */ +public class MesosTaskManagerRunner { + + private static final Logger LOG = LoggerFactory.getLogger(MesosTaskManagerRunner.class); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + public static void runTaskManager(String[] args, final Class taskManager) throws IOException { + EnvironmentInformation.logEnvironmentInfo(LOG, taskManager.getSimpleName(), args); + org.apache.flink.runtime.util.SignalHandler.register(LOG); + + // try to parse the command line arguments + final Configuration configuration; + try { + configuration = TaskManager.parseArgsAndLoadConfig(args); + + // add dynamic properties to TaskManager configuration. + final Configuration dynamicProperties = + FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES)); + LOG.debug("Mesos dynamic properties: {}", dynamicProperties); + configuration.addAll(dynamicProperties); + } + catch (Throwable t) { + LOG.error(t.getMessage(), t); + System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE()); + return; + } + + // read the environment variables + final Map envs = System.getenv(); + final String effectiveUsername = envs.get(MesosConfigKeys.ENV_CLIENT_USERNAME); + final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR); + + // configure local directory + String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null); + if (flinkTempDirs != null) { + LOG.info("Overriding Mesos temporary file directories with those " + + "specified in the Flink config: " + flinkTempDirs); --- End diff -- `{}` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75114404 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.mesos.Protos; +import scala.Option; + +import java.io.Serializable; +import java.text.DecimalFormat; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * A store of Mesos workers and associated framework information. + * + * Generates a framework ID as necessary. + */ +public interface MesosWorkerStore { + + static final DecimalFormat TASKID_FORMAT = new DecimalFormat("taskmanager-0"); + + void start() throws Exception; + + void stop() throws Exception; + + Option getFrameworkID() throws Exception; + + void setFrameworkID(Option frameworkID) throws Exception; + + List recoverWorkers() throws Exception; + + Protos.TaskID newTaskID() throws Exception; + + void putWorker(Worker worker) throws Exception; + + void removeWorker(Protos.TaskID taskID) throws Exception; + + void cleanup() throws Exception; + + /** +* A stored task. +* +* The assigned slaveid/hostname is valid in Launched and Released states. The hostname is needed +* by Fenzo for optimization purposes. +*/ + class Worker implements Serializable { --- End diff -- `serialVersionUID` missing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75114471 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.mesos.Protos; +import scala.Option; + +import java.io.Serializable; +import java.text.DecimalFormat; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * A store of Mesos workers and associated framework information. + * + * Generates a framework ID as necessary. + */ +public interface MesosWorkerStore { + + static final DecimalFormat TASKID_FORMAT = new DecimalFormat("taskmanager-0"); + + void start() throws Exception; + + void stop() throws Exception; + + Option getFrameworkID() throws Exception; + + void setFrameworkID(Option frameworkID) throws Exception; + + List recoverWorkers() throws Exception; + + Protos.TaskID newTaskID() throws Exception; + + void putWorker(Worker worker) throws Exception; + + void removeWorker(Protos.TaskID taskID) throws Exception; + + void cleanup() throws Exception; + + /** +* A stored task. +* +* The assigned slaveid/hostname is valid in Launched and Released states. The hostname is needed +* by Fenzo for optimization purposes. +*/ + class Worker implements Serializable { + private Protos.TaskID taskID; + + private Option slaveID; + + private Option hostname; + + private TaskState state; --- End diff -- Can these fields be final? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75114739 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.mesos.Protos; +import scala.Option; + +import java.io.Serializable; +import java.text.DecimalFormat; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * A store of Mesos workers and associated framework information. + * + * Generates a framework ID as necessary. + */ +public interface MesosWorkerStore { + + static final DecimalFormat TASKID_FORMAT = new DecimalFormat("taskmanager-0"); + + void start() throws Exception; + + void stop() throws Exception; + + Option getFrameworkID() throws Exception; + + void setFrameworkID(Option frameworkID) throws Exception; + + List recoverWorkers() throws Exception; + + Protos.TaskID newTaskID() throws Exception; + + void putWorker(Worker worker) throws Exception; + + void removeWorker(Protos.TaskID taskID) throws Exception; + + void cleanup() throws Exception; + + /** +* A stored task. +* +* The assigned slaveid/hostname is valid in Launched and Released states. The hostname is needed +* by Fenzo for optimization purposes. +*/ + class Worker implements Serializable { + private Protos.TaskID taskID; + + private Option slaveID; + + private Option hostname; + + private TaskState state; + + public Worker(Protos.TaskID taskID, Option slaveID, Option hostname, TaskState state) { + requireNonNull(taskID, "taskID"); + requireNonNull(slaveID, "slaveID"); + requireNonNull(hostname, "hostname"); + requireNonNull(state, "state"); + + this.taskID = taskID; + this.slaveID = slaveID; + this.hostname = hostname; + this.state = state; + } + + public Protos.TaskID taskID() { + return taskID; + } + + public Option slaveID() { + return slaveID; + } + + public Option hostname() { + return hostname; + } + + public TaskState state() { + return state; + } --- End diff -- Getters in Java classes should follow the Java getter style and no the Scala style. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75115154 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.mesos.Protos; +import scala.Option; + +import java.io.Serializable; +import java.text.DecimalFormat; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * A store of Mesos workers and associated framework information. + * + * Generates a framework ID as necessary. + */ +public interface MesosWorkerStore { + + static final DecimalFormat TASKID_FORMAT = new DecimalFormat("taskmanager-0"); + + void start() throws Exception; + + void stop() throws Exception; + + Option getFrameworkID() throws Exception; + + void setFrameworkID(Option frameworkID) throws Exception; + + List recoverWorkers() throws Exception; + + Protos.TaskID newTaskID() throws Exception; + + void putWorker(Worker worker) throws Exception; + + void removeWorker(Protos.TaskID taskID) throws Exception; + + void cleanup() throws Exception; + + /** +* A stored task. +* +* The assigned slaveid/hostname is valid in Launched and Released states. The hostname is needed +* by Fenzo for optimization purposes. +*/ + class Worker implements Serializable { + private Protos.TaskID taskID; + + private Option slaveID; + + private Option hostname; + + private TaskState state; + + public Worker(Protos.TaskID taskID, Option slaveID, Option hostname, TaskState state) { --- End diff -- Is this class intended to be created somewhere else than in the static methods of this class itself? If not, then we could narrow down the visibility of the constructor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75115442 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.mesos.Protos; +import scala.Option; + +import java.io.Serializable; +import java.text.DecimalFormat; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * A store of Mesos workers and associated framework information. + * + * Generates a framework ID as necessary. + */ +public interface MesosWorkerStore { + + static final DecimalFormat TASKID_FORMAT = new DecimalFormat("taskmanager-0"); + + void start() throws Exception; + + void stop() throws Exception; + + Option getFrameworkID() throws Exception; + + void setFrameworkID(Option frameworkID) throws Exception; + + List recoverWorkers() throws Exception; + + Protos.TaskID newTaskID() throws Exception; + + void putWorker(Worker worker) throws Exception; + + void removeWorker(Protos.TaskID taskID) throws Exception; + + void cleanup() throws Exception; + + /** +* A stored task. +* +* The assigned slaveid/hostname is valid in Launched and Released states. The hostname is needed +* by Fenzo for optimization purposes. +*/ + class Worker implements Serializable { + private Protos.TaskID taskID; + + private Option slaveID; + + private Option hostname; + + private TaskState state; + + public Worker(Protos.TaskID taskID, Option slaveID, Option hostname, TaskState state) { + requireNonNull(taskID, "taskID"); + requireNonNull(slaveID, "slaveID"); + requireNonNull(hostname, "hostname"); + requireNonNull(state, "state"); + + this.taskID = taskID; + this.slaveID = slaveID; + this.hostname = hostname; + this.state = state; + } + + public Protos.TaskID taskID() { + return taskID; + } + + public Option slaveID() { + return slaveID; + } + + public Option hostname() { + return hostname; + } + + public TaskState state() { + return state; + } + + // valid transition methods + + public static Worker newTask(Protos.TaskID taskID) { + return new Worker( + taskID, + Option.empty(), Option.empty(), + TaskState.New); + } + + public Worker launchTask(Protos.SlaveID slaveID, String hostname) { + return new Worker(taskID, Option.apply(slaveID), Option.apply(hostname), TaskState.Launched); + } + + public Worker releaseTask() { + return new Worker(taskID, slaveID, hostname, TaskState.Released); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Worker worker = (Worker) o; + return Objects.equals(ta
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75115525 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.mesos.Protos; +import scala.Option; + +import java.io.Serializable; +import java.text.DecimalFormat; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * A store of Mesos workers and associated framework information. + * + * Generates a framework ID as necessary. + */ +public interface MesosWorkerStore { + + static final DecimalFormat TASKID_FORMAT = new DecimalFormat("taskmanager-0"); + + void start() throws Exception; + + void stop() throws Exception; + + Option getFrameworkID() throws Exception; + + void setFrameworkID(Option frameworkID) throws Exception; + + List recoverWorkers() throws Exception; + + Protos.TaskID newTaskID() throws Exception; + + void putWorker(Worker worker) throws Exception; + + void removeWorker(Protos.TaskID taskID) throws Exception; + + void cleanup() throws Exception; + + /** +* A stored task. +* +* The assigned slaveid/hostname is valid in Launched and Released states. The hostname is needed +* by Fenzo for optimization purposes. +*/ + class Worker implements Serializable { + private Protos.TaskID taskID; + + private Option slaveID; + + private Option hostname; + + private TaskState state; + + public Worker(Protos.TaskID taskID, Option slaveID, Option hostname, TaskState state) { + requireNonNull(taskID, "taskID"); + requireNonNull(slaveID, "slaveID"); + requireNonNull(hostname, "hostname"); + requireNonNull(state, "state"); + + this.taskID = taskID; + this.slaveID = slaveID; + this.hostname = hostname; + this.state = state; + } + + public Protos.TaskID taskID() { + return taskID; + } + + public Option slaveID() { + return slaveID; + } + + public Option hostname() { + return hostname; + } + + public TaskState state() { + return state; + } + + // valid transition methods + + public static Worker newTask(Protos.TaskID taskID) { + return new Worker( + taskID, + Option.empty(), Option.empty(), + TaskState.New); + } + + public Worker launchTask(Protos.SlaveID slaveID, String hostname) { + return new Worker(taskID, Option.apply(slaveID), Option.apply(hostname), TaskState.Launched); + } + + public Worker releaseTask() { + return new Worker(taskID, slaveID, hostname, TaskState.Released); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Worker worker = (Worker) o; + return Objects.equals(ta
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75117043 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.SharedValue; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.StateStorageHelper; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.mesos.Protos; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A ZooKeeper-backed Mesos worker store. + */ +public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperMesosWorkerStore.class); + + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + /** A persistent value of the assigned framework ID */ + private final SharedValue frameworkIdInZooKeeper; + + /** A persistent count of all tasks created, for generating unique IDs */ + private final SharedCount totalTaskCountInZooKeeper; + + /** A persistent store of serialized workers */ + private final ZooKeeperStateHandleStore workersInZooKeeper; + + @SuppressWarnings("unchecked") + ZooKeeperMesosWorkerStore( + CuratorFramework client, + String storePath, + StateStorageHelper stateStorage + ) throws Exception { + checkNotNull(storePath, "storePath"); + checkNotNull(stateStorage, "stateStorage"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = checkNotNull(client, "client"); + + // All operations will have the given path as root + client.newNamespaceAwareEnsurePath(storePath).ensure(client.getZookeeperClient()); + CuratorFramework facade = client.usingNamespace(client.getNamespace() + storePath); + + // Track the assignd framework ID. + frameworkIdInZooKeeper = new SharedValue(facade, "/frameworkId", new byte[0]); + + // Keep a count of all tasks created ever, as the basis for a unique ID. + totalTaskCountInZooKeeper = new SharedCount(facade, "/count", 0); + + // Keep track of the workers in state handle storage. + facade.newNamespaceAwareEnsurePath("/workers").ensure(client.getZookeeperClient()); + CuratorFramework storeFacade = client.usingNamespace(facade.getNamespace() + "/workers"); + + this.workersInZooKeeper = ZooKeeperStateHandleS
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75119617 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.SharedValue; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.StateStorageHelper; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.mesos.Protos; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A ZooKeeper-backed Mesos worker store. + */ +public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperMesosWorkerStore.class); + + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + /** A persistent value of the assigned framework ID */ + private final SharedValue frameworkIdInZooKeeper; + + /** A persistent count of all tasks created, for generating unique IDs */ + private final SharedCount totalTaskCountInZooKeeper; + + /** A persistent store of serialized workers */ + private final ZooKeeperStateHandleStore workersInZooKeeper; + + @SuppressWarnings("unchecked") + ZooKeeperMesosWorkerStore( + CuratorFramework client, + String storePath, + StateStorageHelper stateStorage + ) throws Exception { + checkNotNull(storePath, "storePath"); + checkNotNull(stateStorage, "stateStorage"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = checkNotNull(client, "client"); + + // All operations will have the given path as root + client.newNamespaceAwareEnsurePath(storePath).ensure(client.getZookeeperClient()); + CuratorFramework facade = client.usingNamespace(client.getNamespace() + storePath); + + // Track the assignd framework ID. + frameworkIdInZooKeeper = new SharedValue(facade, "/frameworkId", new byte[0]); + + // Keep a count of all tasks created ever, as the basis for a unique ID. + totalTaskCountInZooKeeper = new SharedCount(facade, "/count", 0); + + // Keep track of the workers in state handle storage. + facade.newNamespaceAwareEnsurePath("/workers").ensure(client.getZookeeperClient()); + CuratorFramework storeFacade = client.usingNamespace(facade.getNamespace() + "/workers"); + + this.workersInZooKeeper = ZooKeeperStateHandleS
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75120223 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.SharedValue; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.StateStorageHelper; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.mesos.Protos; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A ZooKeeper-backed Mesos worker store. + */ +public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperMesosWorkerStore.class); + + private final Object cacheLock = new Object(); --- End diff -- Why do we have to lock? Where does the concurrent access come from? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75120933 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler; + +import akka.actor.ActorRef; + +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.SlaveLost; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.mesos.Protos; +import org.apache.mesos.Scheduler; +import org.apache.mesos.SchedulerDriver; + +import java.util.List; + +/** + * This class reacts to callbacks from the Mesos scheduler driver. + * + * In order to preserve actor concurrency safety, this class simply sends + * corresponding messages to the Mesos resource master actor. + * + * See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html + */ +public class SchedulerProxy implements Scheduler { + + /** The actor to which we report the callbacks */ + private ActorRef mesosActor; --- End diff -- final? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75121105 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler; + +import akka.actor.ActorRef; + +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.Error; --- End diff -- Twice the same import. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75121439 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/AcceptOffers.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler.messages; + +import org.apache.mesos.Protos; + +import java.util.Collection; + +/** + * Local message sent by the launch coordinator to the scheduler to accept offers. + */ +public class AcceptOffers { --- End diff -- Would be good to make this class immutable by using final fields. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75121525 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/AcceptOffers.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler.messages; + +import org.apache.mesos.Protos; + +import java.util.Collection; + +/** + * Local message sent by the launch coordinator to the scheduler to accept offers. + */ +public class AcceptOffers { + + private String hostname; + private Collection offerIds; + private Collection operations; + private Protos.Filters filters; + + public AcceptOffers(String hostname, Collection offerIds, Collection operations) { + this.hostname = hostname; + this.offerIds = offerIds; + this.operations = operations; + this.filters = Protos.Filters.newBuilder().build(); + } + + public AcceptOffers(String hostname, Collection offerIds, Collection operations, Protos.Filters filters) { + this.hostname = hostname; + this.offerIds = offerIds; + this.operations = operations; + this.filters = filters; + } + + public String hostname() { --- End diff -- Scala getter style. Would be good to follow the Java style. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75121852 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/AcceptOffers.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler.messages; + +import org.apache.mesos.Protos; + +import java.util.Collection; + +/** + * Local message sent by the launch coordinator to the scheduler to accept offers. + */ +public class AcceptOffers { --- End diff -- I think it makes sense to make all message classes immutable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75123394 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/AcceptOffers.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler.messages; + +import org.apache.mesos.Protos; + +import java.util.Collection; + +/** + * Local message sent by the launch coordinator to the scheduler to accept offers. + */ +public class AcceptOffers { --- End diff -- Can these messages also be serializable? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75124552 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Handler; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.codec.http.router.Router; +import io.netty.util.CharsetUtil; +import org.jets3t.service.utils.Mimetypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URL; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpMethod.HEAD; +import static io.netty.handler.codec.http.HttpResponseStatus.GONE; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + + +/** + * A generic Mesos artifact server, designed specifically for use by the Mesos Fetcher. + * + * More information: + * http://mesos.apache.org/documentation/latest/fetcher/ + * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/ + */ +public class MesosArtifactServer { + + private static final Logger LOG = LoggerFactory.getLogger(MesosArtifactServer.class); + + private final Router router; + + private ServerBootstrap bootstrap; + + private Channel serverChannel; + + private URL baseURL; + + public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort) throws Exception { + if (configuredPort < 0 || configuredPort > 0x) { + throw new IllegalArgumentException("File server port is invalid: " + configuredPort); + } + + router = new Router(); + + ChannelInitializer initializer = new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) { +
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75124909 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Handler; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.codec.http.router.Router; +import io.netty.util.CharsetUtil; +import org.jets3t.service.utils.Mimetypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URL; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpMethod.HEAD; +import static io.netty.handler.codec.http.HttpResponseStatus.GONE; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + + +/** + * A generic Mesos artifact server, designed specifically for use by the Mesos Fetcher. + * + * More information: + * http://mesos.apache.org/documentation/latest/fetcher/ + * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/ + */ +public class MesosArtifactServer { + + private static final Logger LOG = LoggerFactory.getLogger(MesosArtifactServer.class); + + private final Router router; + + private ServerBootstrap bootstrap; + + private Channel serverChannel; + + private URL baseURL; --- End diff -- Can these fields be final? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75126761 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import org.apache.mesos.MesosSchedulerDriver; +import org.apache.mesos.Protos; +import org.apache.mesos.Scheduler; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.Map; + +/** + * The typed configuration settings associated with a Mesos scheduler. + */ +public class MesosConfiguration { + + private String masterUrl; + + private Protos.FrameworkInfo.Builder frameworkInfo; + + private Option credential = Option.empty(); + + public MesosConfiguration( + String masterUrl, + Protos.FrameworkInfo.Builder frameworkInfo, + Option credential) { + + this.masterUrl = masterUrl; + this.frameworkInfo = frameworkInfo; + this.credential = credential; --- End diff -- Maybe check not null? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75126311 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Handler; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.codec.http.router.Router; +import io.netty.util.CharsetUtil; +import org.jets3t.service.utils.Mimetypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URL; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpMethod.HEAD; +import static io.netty.handler.codec.http.HttpResponseStatus.GONE; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + + +/** + * A generic Mesos artifact server, designed specifically for use by the Mesos Fetcher. + * + * More information: + * http://mesos.apache.org/documentation/latest/fetcher/ + * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/ + */ +public class MesosArtifactServer { + + private static final Logger LOG = LoggerFactory.getLogger(MesosArtifactServer.class); + + private final Router router; + + private ServerBootstrap bootstrap; + + private Channel serverChannel; + + private URL baseURL; + + public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort) throws Exception { + if (configuredPort < 0 || configuredPort > 0x) { + throw new IllegalArgumentException("File server port is invalid: " + configuredPort); + } + + router = new Router(); + + ChannelInitializer initializer = new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) { +
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75128222 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.Configuration; + +public class ZooKeeperUtils { + + /** +* Starts a {@link CuratorFramework} instance and connects it to the given ZooKeeper +* quorum. +* +* @param configuration {@link Configuration} object containing the configuration values +* @return {@link CuratorFramework} instance +*/ + @SuppressWarnings("unchecked") + public static CuratorFramework startCuratorFramework(Configuration configuration) { + + // workaround for shaded curator dependency of flink-runtime --- End diff -- What is exactly the problem here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75128299 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.Configuration; + +public class ZooKeeperUtils { + + /** +* Starts a {@link CuratorFramework} instance and connects it to the given ZooKeeper +* quorum. +* +* @param configuration {@link Configuration} object containing the configuration values +* @return {@link CuratorFramework} instance +*/ + @SuppressWarnings("unchecked") + public static CuratorFramework startCuratorFramework(Configuration configuration) { + + // workaround for shaded curator dependency of flink-runtime + Object client = org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(configuration); --- End diff -- Why storing the client in an `Object` and then casting it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75130862 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework + +import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.instance.InstanceConnectionInfo +import org.apache.flink.runtime.io.disk.iomanager.IOManager +import org.apache.flink.runtime.io.network.NetworkEnvironment +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService +import org.apache.flink.runtime.memory.MemoryManager +import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration} + +/** An extension of the TaskManager that listens for additional Mesos-related + * messages. + */ +class MesosTaskManager( + config: TaskManagerConfiguration, --- End diff -- Formatting inconsistent with `MesosJobManager`. I usually try to indent Scala class parameter twice and extended classes once. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75131308 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework + +import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.instance.InstanceConnectionInfo +import org.apache.flink.runtime.io.disk.iomanager.IOManager +import org.apache.flink.runtime.io.network.NetworkEnvironment +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService +import org.apache.flink.runtime.memory.MemoryManager +import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration} + +/** An extension of the TaskManager that listens for additional Mesos-related + * messages. --- End diff -- Which additional messages is the `MesosTaskManager` listening to? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75132701 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/ConnectionMonitor.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, FSM, Props} +import grizzled.slf4j.Logger +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ConnectionMonitor._ +import org.apache.flink.mesos.scheduler.messages._ + +import scala.concurrent.duration._ + +/** + * Actively monitors the Mesos connection. + */ +class ConnectionMonitor() extends Actor with FSM[FsmState, Unit] { --- End diff -- Does this class do anything else than logging the state transitions? Couldn't that also happen in the callbacks defined in `MesosFlinkResourceManager`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75133244 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala --- @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, FSM, Props} +import com.netflix.fenzo._ +import com.netflix.fenzo.functions.Action1 +import com.netflix.fenzo.plugins.VMLeaseObject +import grizzled.slf4j.Logger +import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.LaunchCoordinator._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.Protos.TaskInfo +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{Map => MutableMap} +import scala.concurrent.duration._ + +/** + * The launch coordinator handles offer processing, including + * matching offers to tasks and making reservations. + * + * The coordinator uses Netflix Fenzo to optimize task placement. During the GatheringOffers phase, --- End diff -- Any particular reason why you've chosen Fenzo as scheduler? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2381: [FLINK-4414][cluster management] Remove restriction on Rp...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2381 Thanks for your contribution @wenlong88. The PR mixes multiple things together. It would be better if you could separate these changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2381: [FLINK-4414][cluster management] Remove restrictio...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2381#discussion_r75140948 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java --- @@ -197,11 +201,11 @@ public void stopService() { } @Override - public String getAddress(RpcGateway selfGateway) { + public String getAddress(RpcGateway gateway) { checkState(!stopped, "RpcService is stopped"); - if (selfGateway instanceof AkkaGateway) { - ActorRef actorRef = ((AkkaGateway) selfGateway).getRpcEndpoint(); + if (gateway instanceof AkkaGateway) { + ActorRef actorRef = ((AkkaGateway) gateway).getActorRef(); return AkkaUtils.getAkkaURL(actorSystem, actorRef); --- End diff -- This will be problematic if you pass in a local actor ref which has been created by a different actor system. Because then you will return a wrong remote address. So for example, the following test would fail: ``` @Test public void test() { LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue<>(); TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1,linkedBlockingQueue); testEndpoint.start(); TestEndpoint testEndpoint2 = new TestEndpoint(akkaRpcService2, linkedBlockingQueue); testEndpoint2.start(); assertEquals(testEndpoint2.getAddress(), akkaRpcService1.getAddress(testEndpoint2.getSelf())); assertEquals(testEndpoint.getAddress(), akkaRpcService2.getAddress(testEndpoint.getSelf())); } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2381: [FLINK-4414][cluster management] Remove restriction on Rp...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2381 Maybe it makes more sense to extends `RpcGateway` interface and offer a getAddress method which returns the remote address of the gateway. We should also add tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75147221 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala --- @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, FSM, Props} +import com.netflix.fenzo._ +import com.netflix.fenzo.functions.Action1 +import com.netflix.fenzo.plugins.VMLeaseObject +import grizzled.slf4j.Logger +import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.LaunchCoordinator._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.Protos.TaskInfo +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{Map => MutableMap} +import scala.concurrent.duration._ + +/** + * The launch coordinator handles offer processing, including + * matching offers to tasks and making reservations. + * + * The coordinator uses Netflix Fenzo to optimize task placement. During the GatheringOffers phase, + * offers are evaluated by Fenzo for suitability to the planned tasks. Reservations are then placed + * against the best offers, leading to revised offers containing reserved resources with which to launch task(s). + */ +class LaunchCoordinator( +manager: ActorRef, +config: Configuration, +schedulerDriver: SchedulerDriver, +optimizerBuilder: TaskSchedulerBuilder + ) extends Actor with FSM[TaskState, GatherData] { + + val LOG = Logger(getClass) + + /** +* The task placement optimizer. +* +* The optimizer contains the following state: +* - unused offers +* - existing task placement (for fitness calculation involving task colocation) +*/ + private[mesos] val optimizer: TaskScheduler = { +optimizerBuilder + .withLeaseRejectAction(new Action1[VirtualMachineLease]() { +def call(lease: VirtualMachineLease) { + LOG.info(s"Declined offer ${lease.getId} from ${lease.hostname()} of ${lease.memoryMB()} MB, ${lease.cpuCores()} cpus.") + schedulerDriver.declineOffer(lease.getOffer.getId) +} + }).build + } + + override def postStop(): Unit = { +optimizer.shutdown() +super.postStop() + } + + /** +* Initial state +*/ + startWith(Suspended, GatherData(tasks = Nil, newLeases = Nil)) + + /** +* State: Suspended +* +* Wait for (re-)connection to Mesos. No offers exist in this state, but outstanding tasks might. +*/ + when(Suspended) { +case Event(msg: Connected, data: GatherData) => + if(data.tasks.nonEmpty) goto(GatheringOffers) + else goto(Idle) + } + + /** +* State: Idle +* +* Wait for a task request to arrive, then transition into gathering offers. +*/ + onTransition { +case _ -> Idle => assert(nextStateData.tasks.isEmpty) + } + + when(Idle) { +case Event(msg: Disconnected, data: GatherData) => + goto(Suspended) + +case Event(offers: ResourceOffers, data: GatherData) => + // decline any offers that come in + schedulerDriver.suppressOffers() + for(offer <- offers.offers().asScala) { schedulerDriver.declineOffer(offer.getId) } + stay() + +case Event(msg: Launch, data: GatherData) => + goto(GatheringOffers) using data.copy(tasks = data.tasks ++ msg.tasks.asScala) + } + + /** +* Transition logic to control the flow of offers. +*/ + onTransition { +case _ -> GatheringOffer
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75149938 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, Props} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile +import org.apache.flink.mesos.scheduler.TaskMonitor.{TaskGoalState, TaskGoalStateUpdated, TaskTerminated} +import org.apache.flink.mesos.scheduler.Tasks._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.collection.mutable.{Map => MutableMap} + +/** + * Aggregate of monitored tasks. + * + * Routes messages between the scheduler and individual task monitor actors. + */ +class Tasks[M <: TaskMonitor]( + flinkConfig: Configuration, + schedulerDriver: SchedulerDriver, + taskMonitorClass: Class[M]) extends Actor { + + /** +* A map of task monitors by task ID. +*/ + private val taskMap: MutableMap[Protos.TaskID,ActorRef] = MutableMap() + + /** +* Cache of current connection state. +*/ + private var registered: Option[Any] = None + + override def preStart(): Unit = { +// TODO subscribe to context.system.deadLetters for messages to nonexistent tasks --- End diff -- Can we resolve this TODO? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75150474 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, Props} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile +import org.apache.flink.mesos.scheduler.TaskMonitor.{TaskGoalState, TaskGoalStateUpdated, TaskTerminated} +import org.apache.flink.mesos.scheduler.Tasks._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.collection.mutable.{Map => MutableMap} + +/** + * Aggregate of monitored tasks. + * + * Routes messages between the scheduler and individual task monitor actors. + */ +class Tasks[M <: TaskMonitor]( + flinkConfig: Configuration, + schedulerDriver: SchedulerDriver, + taskMonitorClass: Class[M]) extends Actor { + + /** +* A map of task monitors by task ID. +*/ + private val taskMap: MutableMap[Protos.TaskID,ActorRef] = MutableMap() + + /** +* Cache of current connection state. +*/ + private var registered: Option[Any] = None + + override def preStart(): Unit = { +// TODO subscribe to context.system.deadLetters for messages to nonexistent tasks + } + + override def receive: Receive = { + +case msg: Disconnected => + registered = None + context.actorSelection("*").tell(msg, self) + +case msg : Connected => + registered = Some(msg) + context.actorSelection("*").tell(msg, self) + +case msg: TaskGoalStateUpdated => + val taskID = msg.state.taskID + + // ensure task monitor exists + if(!taskMap.contains(taskID)) { +val actorRef = createTask(msg.state) +registered.foreach(actorRef ! _) + } + + taskMap(taskID) ! msg + +case msg: StatusUpdate => + taskMap(msg.status().getTaskId) ! msg + +case msg: Reconcile => + context.parent.forward(msg) + +case msg: TaskTerminated => + context.parent.forward(msg) + } + + private def createTask(task: TaskGoalState): ActorRef = { +val actorProps = TaskMonitor.createActorProps(taskMonitorClass, flinkConfig, schedulerDriver, task) --- End diff -- line is longer than 100 characters. This should cause a checkstyle violation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75151735 --- Diff: flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework + +import java.util.concurrent.{TimeUnit, ExecutorService} + +import akka.actor.ActorRef + +import org.apache.flink.api.common.JobID +import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants} +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory +import org.apache.flink.runtime.clusterframework.ApplicationStatus +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory +import org.apache.flink.runtime.clusterframework.messages._ +import org.apache.flink.runtime.jobgraph.JobStatus +import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager} +import org.apache.flink.runtime.leaderelection.LeaderElectionService +import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus, JobNotFound} +import org.apache.flink.runtime.messages.Messages.Acknowledge +import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager +import org.apache.flink.runtime.instance.InstanceManager +import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} + +import scala.concurrent.duration._ +import scala.language.postfixOps + + +/** JobManager actor for execution on Yarn or Mesos. It enriches the [[JobManager]] with additional messages + * to start/administer/stop the session. + * + * @param flinkConfiguration Configuration object for the actor + * @param executorService Execution context which is used to execute concurrent tasks in the + * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] + * @param instanceManager Instance manager to manage the registered + * [[org.apache.flink.runtime.taskmanager.TaskManager]] + * @param scheduler Scheduler to schedule Flink jobs + * @param libraryCacheManager Manager to manage uploaded jar files + * @param archive Archive for finished Flink jobs + * @param restartStrategyFactory Restart strategy to be used in case of a job recovery + * @param timeout Timeout for futures + * @param leaderElectionService LeaderElectionService to participate in the leader election + */ +abstract class ContaineredJobManager( + flinkConfiguration: FlinkConfiguration, + executorService: ExecutorService, + instanceManager: InstanceManager, + scheduler: FlinkScheduler, + libraryCacheManager: BlobLibraryCacheManager, + archive: ActorRef, + restartStrategyFactory: RestartStrategyFactory, + timeout: FiniteDuration, + leaderElectionService: LeaderElectionService, + submittedJobGraphs : SubmittedJobGraphStore, + checkpointRecoveryFactory : CheckpointRecoveryFactory, + savepointStore: SavepointStore, + jobRecoveryTimeout: FiniteDuration, + metricsRegistry: Option[FlinkMetricRegistry]) + extends JobManager( +flinkConfiguration, +executorService, +instanceManager, +scheduler, +libraryCacheManager, +archive, +restartStrategyFactory, +timeout, +leaderElectionService, +submittedJobGraphs, +
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75156694 --- Diff: flink-mesos/src/test/resources/log4j-test.properties --- @@ -0,0 +1,32 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +log4j.rootLogger=INFO, console --- End diff -- log level should be `OFF`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1)
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2315 Great work @EronWright. Code quality, testing and extent of this PR is really impressive :-) I think you've nicely modularized the Mesos resource manager with the additional actors. This makes testing much nicer :-) I only had some minor comments and questions for my own understanding. Tomorrow I want to try it out on a Mesos cluster to see how it works. There is only one thing I'm wondering. Since we're also currently working on Flip-6 where we try to put a new RPC abstraction in place in order to eventually remove Akka and Scala from flink-runtime, I wanted to ask whether you think that the FSM actors could also be replaced (sometime in the future) by something else? I totally agree that they are a nice abstraction for Mesos and allow to express the logic succinctly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2305: [FLINK-4271] [DataStreamAPI] Enable CoGroupedStreams and ...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2305 +1 for this approach. On Thu, Aug 18, 2016 at 6:59 AM, Jark wrote: > I'm ok with the with(...) approach. In addition, we should create a JIRA > under [FLINK-3957] > "Breaking API changes for Flink 2.0", to keep track of deprecating and > changing the apply method return type. > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/flink/pull/2305#issuecomment-240624274>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AFfXul4OQ2wEhcdMXC3_gqtZ-zqEIV6Kks5qg-bNgaJpZM4JW9go> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75301960 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait( sysoutLogUpdates); ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); - + + Future submissionFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.SubmitJobAndWait(jobGraph), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobGraph.getJobID(), + submissionFuture, + jobClientActor, + classLoader); + } + + + /** +* Attaches to a running Job using the JobID. +* Reconstructs the user class loader by downloading the jars from the JobManager. +* @throws JobRetrievalException if anything goes wrong while retrieving the job +*/ + public static JobListeningContext attachToRunningJob( + JobID jobID, + ActorGateway jobManagerGateWay, + Configuration configuration, + ActorSystem actorSystem, + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutLogUpdates) throws JobRetrievalException { + + checkNotNull(jobID, "The jobID must not be null."); + checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null."); + checkNotNull(configuration, "The configuration must not be null."); + checkNotNull(actorSystem, "The actorSystem must not be null."); + checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null."); + checkNotNull(timeout, "The timeout must not be null."); + + // retrieve classloader first before doing anything + ClassLoader classloader; + try { + classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout); --- End diff -- What if the `JobManager` has already changed at this point? We would no longer be able to retrieve the ClassLoader, wouldn't we? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75302372 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait( sysoutLogUpdates); ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); - + + Future submissionFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.SubmitJobAndWait(jobGraph), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobGraph.getJobID(), + submissionFuture, + jobClientActor, + classLoader); + } + + + /** +* Attaches to a running Job using the JobID. +* Reconstructs the user class loader by downloading the jars from the JobManager. +* @throws JobRetrievalException if anything goes wrong while retrieving the job +*/ + public static JobListeningContext attachToRunningJob( + JobID jobID, + ActorGateway jobManagerGateWay, + Configuration configuration, + ActorSystem actorSystem, + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutLogUpdates) throws JobRetrievalException { + + checkNotNull(jobID, "The jobID must not be null."); + checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null."); + checkNotNull(configuration, "The configuration must not be null."); + checkNotNull(actorSystem, "The actorSystem must not be null."); + checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null."); + checkNotNull(timeout, "The timeout must not be null."); + + // retrieve classloader first before doing anything + ClassLoader classloader; + try { + classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout); + LOG.info("Reconstructed class loader for Job {}" , jobID); + } catch (Exception e) { + LOG.warn("Couldn't retrieve classloader for {}. Using system class loader", jobID, e); + classloader = JobClient.class.getClassLoader(); + } + + // we create a proxy JobClientActor that deals with all communication with + // the JobManager. It forwards the job submission, checks the success/failure responses, logs + // update messages, watches for disconnect between client and JobManager, ... + Props jobClientActorProps = JobClientActor.createJobClientActorProps( + leaderRetrievalService, + timeout, + sysoutLogUpdates); + + ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); + + Future attachmentFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.AttachToJobAndWait(jobID), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobID, + attachmentFuture, + jobClientActor, + classloader); + } + + /** +* Reconstructs the class loader by first requesting information about it at the JobManager +* and then downloading missing jar files. +* @param jobID id of job +* @param jobManager gateway to the JobManager +* @param config the flink configuration +* @param timeout timeout for querying the jobmanager +* @return A classloader that should behave like the original classloader +* @throws JobRetrievalException if anything goes wrong +*/ + public static ClassLoader retrieveClassLoader( + JobID jobID, + ActorGateway jobManager, + Configuration config, + FiniteDuration timeout) + throws JobRetrievalException { + + final Object jmAnswer; + try { + jmAnswer = Await.result( + jobManager.ask( + new JobManagerMessages.Re
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75305261 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java --- @@ -198,10 +211,47 @@ else if (message instanceof SubmitJobAndWait) { decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender()); } } + else if (message instanceof AttachToJobAndWait) { --- End diff -- At the moment the `JobClientActor` can be used to attach to a job and submit a new job at the same time. Then depending on the order in which the jobs terminate either the submitted job's or the attached job's result is reported back but the other future will never be completed. It might make sense to guard against this or to have two different JobClientActors based on the same base class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75305767 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala --- @@ -58,10 +62,63 @@ class JobInfo( } } - override def toString = s"JobInfo(client: $client ($listeningBehaviour), start: $start)" + + /** +* Notifies all clients by sending a message +* @param message the message to send +*/ + def notifyClients(message: Any) = { +clients foreach { + case (clientActor, _) => +clientActor ! message +} + } + + /** +* Notifies all clients which are not of type detached +* @param message the message to sent to non-detached clients +*/ + def notifyNonDetachedClients(message: Any) = { +clients foreach { + case (clientActor, ListeningBehaviour.DETACHED) => +// do nothing + case (clientActor, _) => +clientActor ! message +} + } + + /** +* Sends a message to job clients that match the listening behavior +* @param message the message to send to all clients +* @param listeningBehaviour the desired listening behaviour +*/ + def notifyClients(message: Any, listeningBehaviour: ListeningBehaviour) = { +clients foreach { + case (clientActor, `listeningBehaviour`) => +clientActor ! message + case _ => +} + } def setLastActive() = lastActive = System.currentTimeMillis() + + + override def toString = s"JobInfo(clients: ${clients.toString()}, start: $start)" + + override def equals(other: Any): Boolean = other match { +case that: JobInfo => + this.isInstanceOf[JobInfo] && --- End diff -- Why do we need this check here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75307091 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java --- @@ -100,13 +101,51 @@ public void testSubmissionTimeout() throws Exception { Await.result(jobExecutionResult, timeout); } + + /** Tests that a {@link JobClientActorRegistrationTimeoutException} is thrown when the registration +* cannot be performd at the JobManager by the JobClientActor. This is here the case, because the +* started JobManager never replies to a {@link RegisterJobClient} message. +*/ + @Test(expected=JobClientActorRegistrationTimeoutException.class) + public void testRegistrationTimeout() throws Exception { + FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); + FiniteDuration timeout = jobClientActorTimeout.$times(2); + + UUID leaderSessionID = UUID.randomUUID(); + + ActorRef jobManager = system.actorOf( + Props.create( + PlainActor.class, + leaderSessionID)); + + TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + jobManager.path().toString(), + leaderSessionID + ); + + Props jobClientActorProps = JobClientActor.createJobClientActorProps( + testingLeaderRetrievalService, + jobClientActorTimeout, + false); + + ActorRef jobClientActor = system.actorOf(jobClientActorProps); + + --- End diff -- Two line breaks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---