These are both really good posts: you should try and get them in to the documentation.
with anything implementing dynamicness, there are some fun problems (a) detecting the delays in the workflow. There's some good ideas here (b) deciding where to address it. That means you need to monitor the entire pipeline —which you should be doing in production anyway. (c) choosing the action. More nodes, more memory & CPU (not that useful for Java code, even when YARN adds support for dynamic container resize) (d) choosing the size of the action. In a shared cluster, extra resources for one app comes at the expense of others. If you have pre-emption turned on in YARN, the scheduler can take containers off lower priority work, which automates a lot of this decision making. That will lose other work though, so to justify it you'd better hang on those containers (e) deciding if/when to hand things back. Scaling things down can be very expensive if lots of state has to get rebuilt elsewhere. I think Apache Helix from LinkedIn has done some good work here -worth looking at to see what lessons & code to lift. And as you'd expect, it sits right behind Kafka in production. I think it gets away with low delays to scale up/down and relying on low rebuild costs. [In the work I've been doing with colleagues on dynamic HBase and Accumulo clusters, we've not attempted to do any autoscale, because scale down is an expensive decision...we're focusing on liveness detection and reaction, then publishing the metrics needed to allow people or cross-application tools to make the decision) On 12 Jun 2015, at 04:38, Dmitry Goldenberg <dgoldenberg...@gmail.com<mailto:dgoldenberg...@gmail.com>> wrote: Yes, Tathagata, thank you. For #1, the 'need detection', one idea we're entertaining is timestamping the messages coming into the Kafka topics. The consumers would check the interval between the time they get the message and that message origination timestamp. As Kafka topics start to fill up more, we would presumably see longer and longer wait times (delays) for messages to be getting processed by the consumers. The consumers would then start firing off critical events into an Event Analyzer/Aggregator which would decide that more resources are needed, then ask the Provisioning Component to allocate N new machines. We do want to set maxRatePerPartition in order to not overwhelm the consumers and run out of memory. Machine provisioning may take a while, and if left with no maxRate guards, our consumers could run out of memory. "Since there are no receivers, if the cluster gets a new executor, it will automatically start getting used to run tasks... no need to do anything further." This is great, actually. We were wondering whether we'd need to restart the consumers once the new machines have been added. Tathagata's point implies, as I read it, that no further orchestration is needed, the load will start getting redistributed automatically. This makes implementation of autoscaling a lot simpler, as far as #3. One issue that's not yet been covered much is the scenario when *fewer* cluster resources become required (a system load valley rather than a peak). To detect a low volume, we'd need to measure the throughput in messages per second over time. Real low volumes would cause firing off of critical events signaling to the Analyzer that machines could be decommissioned. If machines are being decommissioned, it would seem that the consumers would need to get acquiesced (allowed to process any current batch, then shut down), then they would restart themselves or be restarted. Thoughts on this? There is also a hefty #4 here which is the "hysteresis" of this, where the system operates adaptively and learns over time, remembering the history of cluster expansions and contractions and allowing a certain slack for letting things cool down or heat up more gradually; also not contracting or expanding too frequently. PID controllers and thermostat types of design patterns have been mentioned before in this discussion. If you look at the big cloud apps, they dynamically reallocate VM images based on load history, with Netflix being the poster user: Hadoop work in the quiet hours, user interaction evenings and weekends. Excluding special events (including holidays), there's a lot of regularity over time, which lets you predict workload in advance. It's like your thermostat knowing fridays are cold and it should crank up the heating in advance. On Thu, Jun 11, 2015 at 11:08 PM, Tathagata Das <t...@databricks.com<mailto:t...@databricks.com>> wrote: Let me try to add some clarity in the different thought directions that's going on in this thread. 1. HOW TO DETECT THE NEED FOR MORE CLUSTER RESOURCES? If there are not rate limits set up, the most reliable way to detect whether the current Spark cluster is being insufficient to handle the data load is to use the StreamingListner interface which gives all the information about when batches start and end. See the internal implementation of the StreamingListener called StreamingJobProgressListener. This is the one that drives the streaming UI. You can get the scheduling delay (time take for a batch to start processing) from it and use that as a reliable indicator that Spark Streaming is not able to process as fast as data is being received. But if you have already set rate limits based on the max load that cluster can handle, then you will probably never detect that the actual input rate into Kafka has gone up and data is getting buffered inside Kafka. In that case, you have to monitor kafka load to correctly detect the high load. You may to use a combination of both techniques for robust and safe elastic solution - Have rate limits set, use StreamingListener for early detect that processing load is increasing (can increase without actual increase in data rate) and also make sure from Kafka monitoring that the whole end-to-end system is keeping up. 2. HOW TO GET MORE CLUSTER RESOURCES? Currently for YARN, you can use the developer API of dynamic allocation that Andrew Or has introduced to ask for more executors from YARN. Note that the existing dynamic allocation solution is unlikely to work for streaming, and should not be used. Rather I recommend building your own logic that sees the streaming scheduling delay, and accordingly uses the low level developer API to directly ask for more executors (sparkContext.requestExecutors). In other approaches, the Provising Component idea can also work. 3. HOW TO TAKE ADVANTAGE OF MORE CLUSTER RESOURCES? There are two approaches depending on receiver vs Kafka direct. I am assuming the number of topic partitions pre-determined to be large enough to handle peak load. (a) Kafka Direct: This is the simpler scenario. Since there are no receivers, if the cluster gets a new executor, it will automatically start getting used to run tasks, including reading from Kafka (remember, Kafka direct approach reads from Kafka like a file system, from any node that runs the task). So it will immediately start using the extra resources, no need to do anything further. (b) Receiver: This is definitely tricky. If you dont need to increase the number of receivers, then a new executor will start getting used for computations (shuffles, writing out, etc.), but the parallelism in receiving will not increase. If you need to increase that, then its best to shutdown the context gracefully (so that no data is lost), and a new StreamingContext can be started with more receivers (# receivers <= # executors), and may be more #partitions for shuffles. You have call stop on currently running streaming context, to start a new one. If a context is stopped, any thread stuck in awaitTermniation will get unblocked. Does that clarify things?