
import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.scheduler.Cluster;
import backtype.storm.scheduler.EvenScheduler;
import backtype.storm.scheduler.ExecutorDetails;
import backtype.storm.scheduler.IScheduler;
import backtype.storm.scheduler.SchedulerAssignment;
import backtype.storm.scheduler.SupervisorDetails;
import backtype.storm.scheduler.Topologies;
import backtype.storm.scheduler.TopologyDetails;
import backtype.storm.scheduler.WorkerSlot;

/**
 * This demo scheduler make sure a spout named <code>special-spout</code> in
 * topology <code>neural-network</code> runs on a supervisor named
 * <code>special-supervisor</code>. supervisor does not have name? You can
 * configure it through the config: <code>supervisor.scheduler.meta</code> --
 * actually you can put any config you like in this config item.
 * 
 * In our example, we need to put the following config in supervisor's
 * <code>storm.yaml</code>:
 * 
 * <pre>
 *     # give our supervisor a name: "special-supervisor"
 *     supervisor.scheduler.meta:
 *       name: "special-supervisor"
 * </pre>
 * 
 * Put the following config in <code>nimbus</code>'s <code>storm.yaml</code>:
 * 
 * <pre>
 *     # tell nimbus to use this custom scheduler
 *     storm.scheduler: "storm.DemoScheduler"
 * </pre>
 * 
 * @author xumingmingv May 19, 2012 11:10:43 AM
 */
public class NetworkScheduler implements IScheduler {

	private static Logger LOG = LoggerFactory
			.getLogger(NeuralNetworkScheduler.class);

	public void prepare(Map conf) {
	}

	public void schedule(Topologies topologies, Cluster cluster) {
		LOG.info("DemoScheduler: begin scheduling");
		// Gets the topology which we want to schedule
		TopologyDetails topology = topologies.getByName("network");
		SupervisorDetails specialSupervisor = null;

		// make sure the special topology is submitted,
		if (topology != null) {
			boolean needsScheduling = cluster.needsScheduling(topology);

			if (!needsScheduling) {
				LOG.info("Our special topology DOES NOT NEED scheduling.");
			} else {
				LOG.info("Our special topology needs scheduling.");
				// find out all the needs-scheduling components of this topology
				Map<String, List<ExecutorDetails>> componentToExecutors = cluster
						.getNeedsSchedulingComponentToExecutors(topology);

				LOG.info("needs scheduling(component->executor): "
						+ componentToExecutors);
				LOG.info("needs scheduling(executor->compoenents): "
						+ cluster
								.getNeedsSchedulingExecutorToComponents(topology));
				SchedulerAssignment currentAssignment = cluster
						.getAssignmentById(topologies.getByName(
								"neural-network").getId());
				if (currentAssignment != null) {
					LOG.info("current assignments: "
							+ currentAssignment.getExecutorToSlot());
				} else {
					LOG.info("current assignments: {}");
				}

				// find out the our "special-supervisor" from the supervisor
				// metadata
				Collection<SupervisorDetails> supervisors = cluster
						.getSupervisors().values();

				for (SupervisorDetails supervisor : supervisors) {

					Map meta = (Map) supervisor.getSchedulerMeta();
					LOG.info("#meta:" + meta);

					if (meta != null
							&& meta.get("name").equals("special-supervisor")) {
						specialSupervisor = supervisor;
						break;
					}
				}

				LOG.info("Special supervisor was:" + specialSupervisor);

				String[] componentName = new String[] { "data-generator",
						"measure-bolt" };
				int slotNumber = 0;
				for (int i = 0; i < componentName.length; i++) {
					if (!componentToExecutors.containsKey(componentName[i])) {
						LOG.info("Our " + componentName[i]
								+ " DOES NOT NEED scheduling.");
					} else {
						LOG.info("Our " + componentName[i]
								+ " needs scheduling.");
						List<ExecutorDetails> executors = componentToExecutors
								.get(componentName[i]);

						allocateSlotsForExecutor(cluster, topology, executors,
								specialSupervisor, slotNumber);
					}
				}
			}
		}

		// let system's even scheduler handle the rest scheduling work
		// you can also use your own other scheduler here, this is what
		// makes storm's scheduler composable.

		if (specialSupervisor != null) {
			LOG.info("Cluster supervsiors #before "
					+ cluster.getSupervisors().values());
			cluster.getSupervisors().values().remove(specialSupervisor);
			LOG.info("Cluster supervsiors #after #remove "
					+ cluster.getSupervisors().values());
		}
		new EvenScheduler().schedule(topologies, cluster);
	}

	private void allocateSlotsForExecutor(Cluster cluster,
			TopologyDetails topology, List<ExecutorDetails> executors,
			SupervisorDetails specialSupervisor, int slotNumber) {
		// found the special supervisor
		if (specialSupervisor != null) {
			LOG.info("Found the special-supervisor");
			List<WorkerSlot> availableSlots = cluster
					.getAvailableSlots(specialSupervisor);

			// if there is no available slots on this supervisor, free some.
			// TODO for simplicity, we free all the used slots on the
			// supervisor.
			if (availableSlots.isEmpty() && !executors.isEmpty()) {
				for (Integer port : cluster.getUsedPorts(specialSupervisor)) {
					cluster.freeSlot(new WorkerSlot(specialSupervisor.getId(),
							port));
				}
			}

			// re-get the aviableSlots
			availableSlots = cluster.getAvailableSlots(specialSupervisor);

			// try to put the data-spout and measurement-bolt on separated JVM
			// in order to not disturb the performance
			// since it is just a demo, to keep things simple, we assign all
			// the
			// executors into one slot.

			cluster.assign(availableSlots.get(0), topology.getId(), executors);
			LOG.info("We assigned executors:" + executors + " to slot: ["
					+ availableSlots.get(0).getNodeId() + ", "
					+ availableSlots.get(0).getPort() + "] to host cd "
					+ specialSupervisor.getHost());
		} else {
			LOG.info("There is no supervisor named special-supervisor!!!");
		}
	}

}