You can register a streaming listener – in the BatchInfo you’ll find a lot of stats (including count of received records) that you can base your logic on: https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.scheduler.BatchInfo
From: Kyle Lin Date: Friday, November 6, 2015 at 11:48 AM To: Tathagata Das Cc: robert towne, user Subject: Re: Dynamic Allocation & Spark Streaming Hey there I run Spark streaming 1.5.1 on YARN with Dynamic allocation, and use direct stream API to read data from Kafka. Spark job can dynamically request a executor when reaching spark.dynamicAllocation.schedulerBacklogTimeout. However, it won't dynamically remove executor when there is no more data from Kafka, because executors won't be idle but continually get empty RDD. Is it possible to find that there are more than N continuing empty RDDs and remove executors manually? How could I remember how many empty RDD I get and remove executors? Kyle 2015-10-20 4:48 GMT+08:00 Tathagata Das <t...@databricks.com<mailto:t...@databricks.com>>: Unfortunately the title on the JIRA is extremely confusing. I have fixed it. The reason why dynamic allocation does not work well with streaming is that the heuristic that is used to automatically scale up or down the number of executors works for the pattern of task schedules in batch jobs, not for streaming jobs. We would definitely solve this in future, may be 1.7.0 or later. In the mean time, there are developer API function that allows you add and remove executors explicitly. See sparkContext.requestExecutors() and sparkContext.killExecutors(). With this you can write your own scaling logic. In your case I would do the following. 1. Ask for a large number of executors / cores through spark-submit. 2. Use a StreamingListener to monitor whether it has caught up. 3. Then call killExecutors, to slowly kill a few of them, but make sure using the listener that the scheduling delay does not go up. Hope this helps. Let me know if this works for you. On Mon, Oct 19, 2015 at 1:13 PM, robert towne <binarymecha...@gmail.com<mailto:binarymecha...@gmail.com>> wrote: I have watched a few videos from Databricks/Andrew Or around the Spark 1.2 release and it seemed that dynamic allocation was not yet available for Spark Streaming. I now see SPARK-10955<https://issues.apache.org/jira/browse/SPARK-10955> which is tied to 1.5.2 and allows disabling of Spark Streaming with dynamic allocation. I use Spark Streaming with a receiverless/direct Kafka connection. When I start up an app reading from the beginning of the topic I would like to have more resources than once I have caught up. Is it possible to use dynamic allocation for this use case? thanks, Robert