Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/318#discussion_r23163714 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import org.apache.flink.runtime.AbstractID; +import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment; + +import java.util.HashSet; +import java.util.Set; + +/** + * This class represents a shared slot. A shared slot can have multiple + * {@link org.apache.flink.runtime.instance.SimpleSlot} instances within itself. This allows to + * schedule multiple tasks simultaneously, enabling Flink's streaming capabilities. + * + * IMPORTANT: This class contains no synchronization. Thus, the caller has to guarantee proper + * synchronization. In the current implementation, all concurrently modifying operations are + * passed through a {@link SlotSharingGroupAssignment} object which is responsible for + * synchronization. + * + */ +public class SharedSlot extends Slot { + + private final SlotSharingGroupAssignment assignmentGroup; + + private final Set<Slot> subSlots; + + public SharedSlot(JobID jobID, Instance instance, int slotNumber, + SlotSharingGroupAssignment assignmentGroup, SharedSlot parent, + AbstractID groupID) { + super(jobID, instance, slotNumber, parent, groupID); + + this.assignmentGroup = assignmentGroup; + this.subSlots = new HashSet<Slot>(); + } + + public Set<Slot> getSubSlots() { + return subSlots; + } + + /** + * Removes the simple slot from the {@link org.apache.flink.runtime.instance.SharedSlot}. Should + * only be called through the + * {@link org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment} attribute + * assignmnetGroup. + * + * @param slot slot to be removed from the set of sub slots. + * @return Number of remaining sub slots + */ + public int freeSubSlot(Slot slot){ + if(!subSlots.remove(slot)){ + throw new IllegalArgumentException("Wrong shared slot for sub slot."); + } + + return subSlots.size(); + } + + @Override + public int getNumberLeaves() { + int result = 0; + + for(Slot slot: subSlots){ + result += slot.getNumberLeaves(); + } + + return result; + } + + @Override + public void cancel() { + // Guarantee that the operation is only executed once + if (markCancelled()) { + assignmentGroup.releaseSharedSlot(this); + } + } + + /** + * Release this shared slot. In order to do this: + * + * 1. Cancel and release all sub slots atomically with respect to the assigned assignment group. + * 2. Set the state of the shared slot to be cancelled. + * 3. Dispose the shared slot (returning the slot to the instance). + * + * After cancelAndReleaseSubSlots, the shared slot is marked to be dead. This prevents further + * sub slot creation by the scheduler. + */ + @Override + public void releaseSlot() { + assignmentGroup.releaseSharedSlot(this); + } + + /** + * Creates a new sub slot if the slot is not dead, yet. This method should only be called from + * the assignment group instance to guarantee synchronization. + * + * @param jID id to identify tasks which can be deployed in this sub slot + * @return new sub slot if the shared slot is still alive, otherwise null + */ + public SimpleSlot allocateSubSlot(AbstractID jID){ + if(isDead()){ + return null; + } else { + SimpleSlot slot = new SimpleSlot(jobID, instance, subSlots.size(), this, jID); + subSlots.add(slot); + + return slot; + } + } + + public SharedSlot allocateSharedSlot(AbstractID jID){ + if(isDead()){ + return null; + } else { + SharedSlot slot = new SharedSlot(jobID, instance, subSlots.size(), assignmentGroup, this, jID); + subSlots.add(slot); + + return slot; + } + } + + /** + * Disposes the given sub slot. This + * is done by the means of the assignmentGroup in order to synchronize the method. If the + * disposed slot was the last sub slot, then the shared slot is marked to be cancelled and is + * disposed/returned to the owning instance. + * + * @param slot sub slot which shall be removed from the shared slot + */ + public void disposeChild(SimpleSlot slot){ --- End diff -- For a second I was wondering what a child is? Maybe we should just keep it as `disposeSubSlot` in order to be in sync with the javadocs?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---