These are both really good posts: you should try and get them in to the 
documentation.

with anything implementing dynamicness, there are some fun problems

(a) detecting the delays in the workflow. There's some good ideas here
(b) deciding where to address it. That means you need to monitor the entire 
pipeline —which you should be doing in production anyway.
(c) choosing the action. More nodes, more memory & CPU (not that useful for 
Java code, even when YARN adds support for dynamic container resize)
(d) choosing the size of the action. In a shared cluster, extra resources for 
one app comes at the expense of others. If you have pre-emption turned on in 
YARN, the scheduler can take containers off lower priority work, which 
automates a lot of this decision making. That will lose other work though, so 
to justify it you'd better hang on those containers
(e) deciding if/when to hand things back. Scaling things down can be very 
expensive if lots of state has to get rebuilt elsewhere.

I think Apache Helix from LinkedIn has done some good work here -worth looking 
at to see what lessons & code to lift. And as you'd expect, it sits right 
behind Kafka in production. I think it gets away with low delays to scale 
up/down and relying on low rebuild costs. [In the work I've been doing with 
colleagues on dynamic HBase and Accumulo clusters, we've not attempted to do 
any autoscale, because scale down is an expensive decision...we're focusing on 
liveness detection and reaction, then publishing the metrics needed to allow 
people or cross-application tools to make the decision)

On 12 Jun 2015, at 04:38, Dmitry Goldenberg 
<dgoldenberg...@gmail.com<mailto:dgoldenberg...@gmail.com>> wrote:

Yes, Tathagata, thank you.

For #1, the 'need detection', one idea we're entertaining is timestamping the 
messages coming into the Kafka topics. The consumers would check the interval 
between the time they get the message and that message origination timestamp. 
As Kafka topics start to fill up more, we would presumably see longer and 
longer wait times (delays) for messages to be getting processed by the 
consumers.  The consumers would then start firing off critical events into an 
Event Analyzer/Aggregator which would decide that more resources are needed, 
then ask the Provisioning Component to allocate N new machines.

We do want to set maxRatePerPartition in order to not overwhelm the consumers 
and run out of memory.  Machine provisioning may take a while, and if left with 
no maxRate guards, our consumers could run out of memory.

"Since there are no receivers, if the cluster gets a new executor, it will 
automatically start getting used to run tasks... no need to do anything 
further."  This is great, actually. We were wondering whether we'd need to 
restart the consumers once the new machines have been added. Tathagata's point 
implies, as I read it, that no further orchestration is needed, the load will 
start getting redistributed automatically. This makes implementation of 
autoscaling a lot simpler, as far as #3.

One issue that's not yet been covered much is the scenario when *fewer* cluster 
resources become required (a system load valley rather than a peak). To detect 
a low volume, we'd need to measure the throughput in messages per second over 
time.  Real low volumes would cause firing off of critical events signaling to 
the Analyzer that machines could be decommissioned.

If machines are being decommissioned, it would seem that the consumers would 
need to get acquiesced (allowed to process any current batch, then shut down), 
then they would restart themselves or be restarted. Thoughts on this?

There is also a hefty #4 here which is the "hysteresis" of this, where the 
system operates adaptively and learns over time, remembering the history of 
cluster expansions and contractions and allowing a certain slack for letting 
things cool down or heat up more gradually; also not contracting or expanding 
too frequently.  PID controllers  and thermostat types of design patterns have 
been mentioned before in this discussion.


If you look at the big cloud apps, they dynamically reallocate VM images based 
on load history, with Netflix being the poster user: Hadoop work in the quiet 
hours, user interaction evenings and weekends. Excluding special events 
(including holidays), there's a lot of regularity over time, which lets you 
predict workload in advance.  It's like your thermostat knowing fridays are 
cold and it should crank up the heating in advance.






On Thu, Jun 11, 2015 at 11:08 PM, Tathagata Das 
<t...@databricks.com<mailto:t...@databricks.com>> wrote:
Let me try to add some clarity in the different thought directions that's going 
on in this thread.

1. HOW TO DETECT THE NEED FOR MORE CLUSTER RESOURCES?

If there are not rate limits set up, the most reliable way to detect whether 
the current Spark cluster is being insufficient to handle the data load is to 
use the StreamingListner interface which gives all the information about when 
batches start and end. See the internal implementation of the StreamingListener 
called StreamingJobProgressListener. This is the one that drives the streaming 
UI. You can get the scheduling delay (time take for a batch to start 
processing) from it and use that as a reliable indicator that Spark Streaming 
is not able to process as fast as data is being received.

But if you have already set rate limits based on the max load that cluster can 
handle, then you will probably never detect that the actual input rate into 
Kafka has gone up and data is getting buffered inside Kafka. In that case, you 
have to monitor kafka load to correctly detect the high load. You may to use a 
combination of both techniques for robust and safe elastic solution -  Have 
rate limits set, use StreamingListener for early detect that processing load is 
increasing (can increase without actual increase in data rate) and also make 
sure from Kafka monitoring that the whole end-to-end system is keeping up.


2. HOW TO GET MORE CLUSTER RESOURCES?

Currently for YARN, you can use the developer API of dynamic allocation that 
Andrew Or has introduced to ask for more executors from YARN. Note that the 
existing dynamic allocation solution is unlikely to work for streaming, and 
should not be used. Rather I recommend building your own logic that sees the 
streaming scheduling delay, and accordingly uses the low level developer API to 
directly ask for more executors (sparkContext.requestExecutors). In other 
approaches, the Provising Component idea can also work.


3. HOW TO TAKE ADVANTAGE OF MORE CLUSTER RESOURCES?

There are two approaches depending on receiver vs Kafka direct. I am assuming 
the number of topic partitions pre-determined to be large enough to handle peak 
load.

(a) Kafka Direct: This is the simpler scenario.  Since there are no receivers, 
if the cluster gets a new executor, it will automatically start getting used to 
run tasks, including reading from Kafka (remember, Kafka direct approach reads 
from Kafka like a file system, from any node that runs the task). So it will 
immediately start using the extra resources, no need to do anything further.

(b) Receiver: This is definitely tricky. If you dont need to increase the 
number of receivers, then a new executor will start getting used for 
computations (shuffles, writing out, etc.), but the parallelism in receiving 
will not increase. If you need to increase that, then its best to shutdown the 
context gracefully (so that no data is lost), and a new StreamingContext can be 
started with more receivers (# receivers <= # executors), and may be more 
#partitions for shuffles. You have call stop on currently running streaming 
context, to start a new one. If a context is stopped, any thread stuck in 
awaitTermniation will get unblocked.

Does that clarify things?


Reply via email to