Hi all, My team uses Spark Streaming to implement the batch processing component of a lambda architecture with 5 min intervals. We process roughly 15 TB/day using three discrete Spark clusters and about 250 receivers per cluster. We've been having some issues migrating our platform from Spark 1.4.x to Spark 1.5.x.
The first issue we've been having relates to receiver scheduling. Under Spark 1.4.x, each receiver becomes active almost immediately and the application quickly reaches its peak input throughput. Under the new receiver scheduling mechanism introduced in Spark 1.5.x (SPARK-8882<https://issues.apache.org/jira/browse/SPARK-8882>) we see that it takes quite a while for our receivers to become active. I haven't spent too much time gathering hard numbers on this, but my estimate would be that it takes over half an hour for half the receivers to become active and well over an hour for all of them to become active. I spent some time digging into the code for the ReceiverTracker, ReceiverSchedulingPolicy, and ReceiverSupervisor classes and recompiling Spark with some added debug logging. As far as I can tell, this is what is happening: * On program start, the ReceiverTracker RPC endpoint receives a StartAllReceivers message via its own launchReceivers() method (itself invoked by start()) * The handler for StartAllReceivers invokes ReceiverSchedulingPolicy.scheduleReceivers() to generate a desired receiver to executor mapping and calls ReceiverTracker.startReceiver() for each receiver * startReceiver() uses the SparkContext to submit a job that creates an instance of ReceiverSupervisorImpl to run the receiver on a random executor * While bootstrapping the receiver, the ReceiverSupervisorImpl.onReceiverStart() sends a RegisterReceiver message to the ReceiverTracker RPC endpoint * The handler for RegisterReceiver checks if the randomly-selected executor was the one the receiver was assigned to by ReceiverSchedulingPolicy.scheduleReceivers() and fails the job if it isn't * ReceiverTracker restarts the failed receiver job and this process continues until all receivers are assigned to their proper executor Assuming this order of operations is correct, I have the following questions: 1. Is there any way to coerce SparkContext.submitJob() into scheduling a job on a specific executor? Put another way, is there a mechanism we can use to ensure that each receiver job is run on the executor it was assigned to on the first call to ReceiverSchedulingPolicy.scheduleReceivers()? 2. If (1) is not possible, is there anything we can do to speed up the StartReceiver -> RegisterReceiver -> RestartReceiver loop? Right now, it seems to take about 30-40 sec between attempts to invoke RegisterReceiver on a given receiver. Thanks for the help! Adam