Hi all,

My team uses Spark Streaming to implement the batch processing component of a 
lambda architecture with 5 min intervals. We process roughly 15 TB/day using 
three discrete Spark clusters and about 250 receivers per cluster. We've been 
having some issues migrating our platform from Spark 1.4.x to Spark 1.5.x.

The first issue we've been having relates to receiver scheduling. Under Spark 
1.4.x, each receiver becomes active almost immediately and the application 
quickly reaches its peak input throughput. Under the new receiver scheduling 
mechanism introduced in Spark 1.5.x 
(SPARK-8882<https://issues.apache.org/jira/browse/SPARK-8882>) we see that it 
takes quite a while for our receivers to become active. I haven't spent too 
much time gathering hard numbers on this, but my estimate would be that it 
takes over half an hour for half the receivers to become active and well over 
an hour for all of them to become active.

I spent some time digging into the code for the ReceiverTracker, 
ReceiverSchedulingPolicy, and ReceiverSupervisor classes and recompiling Spark 
with some added debug logging. As far as I can tell, this is what is happening:

  *   On program start, the ReceiverTracker RPC endpoint receives a 
StartAllReceivers message via its own launchReceivers() method (itself invoked 
by start())
  *   The handler for StartAllReceivers invokes 
ReceiverSchedulingPolicy.scheduleReceivers() to generate a desired receiver to 
executor mapping and calls ReceiverTracker.startReceiver() for each receiver
  *   startReceiver() uses the SparkContext to submit a job that creates an 
instance of ReceiverSupervisorImpl to run the receiver on a random executor
  *   While bootstrapping the receiver, the 
ReceiverSupervisorImpl.onReceiverStart() sends a RegisterReceiver message to 
the ReceiverTracker RPC endpoint
  *   The handler for RegisterReceiver checks if the randomly-selected executor 
was the one the receiver was assigned to by 
ReceiverSchedulingPolicy.scheduleReceivers() and fails the job if it isn't
  *   ReceiverTracker restarts the failed receiver job and this process 
continues until all receivers are assigned to their proper executor

Assuming this order of operations is correct, I have the following questions:

  1.  Is there any way to coerce SparkContext.submitJob() into scheduling a job 
on a specific executor? Put another way, is there a mechanism we can use to 
ensure that each receiver job is run on the executor it was assigned to on the 
first call to ReceiverSchedulingPolicy.scheduleReceivers()?
  2.  If (1) is not possible, is there anything we can do to speed up the 
StartReceiver -> RegisterReceiver -> RestartReceiver loop? Right now, it seems 
to take about 30-40 sec between attempts to invoke RegisterReceiver on a given 
receiver.

Thanks for the help!

Adam

Reply via email to