On 27 Aug 2015, at 08:42, Ashish Rawat 
<ashish.ra...@guavus.com<mailto:ashish.ra...@guavus.com>> wrote:

Hi Patrick,

As discussed in another thread, we are looking for a solution to the problem of 
lost state on Spark Driver failure. Can you please share Spark’s long term 
strategy for resolving this problem.

<-- Original Mail Content Below -->

We have come across the problem of Spark Applications (on Yarn) requiring a 
restart in case of Spark Driver (or application master) going down. This is 
hugely inconvenient for long running applications which are maintaining a big 
state in memory. The repopulation of state in itself may require a downtime of 
many minutes, which is not acceptable for most live systems.

As you would have noticed that Yarn community has acknowledged "long running 
services" as an important class of use cases, and thus identified and removed 
problems in working with long running services in Yarn.
http://hortonworks.com/blog/support-long-running-services-hadoop-yarn-clusters/


Yeah, I spent a lot of time on that, or at least using the features, in other 
work under YARN-896, summarised in 
http://www.slideshare.net/steve_l/yarn-services

It would be great if Spark, which is the most important processing engine on 
Yarn,

I'f you look at the CPU-hours going in to the big hadoop clusters, it's 
actually MR work and things behind Hive. but: these apps don't attempt HA

Why not? It requires whatever maintains the overall app status (spark: the 
driver) to persist that state in a way where it can be rebuilt. A restarted AM 
with the "retain containers" feature turned on gets nothing back from YARN 
except the list of previous allocated containers, and is left to sort itself 
out.

also figures out issues in working with long running Spark applications and 
publishes recommendations or make framework changes for removing those. The 
need to keep the application running in case of Driver and Application Master 
failure, seems to be an important requirement from this perspective. The two 
most compelling use cases being:

  1.  Huge state of historical data in Spark Streaming, required for stream 
processing
  2.  Very large cached tables in Spark SQL (very close to our use case where 
we periodically cache RDDs and query using Spark SQL)


Generally spark streaming is viewed as the big need here, but yes, long-lived 
cached data matters.

Bear in mind that before Spark 1.5, you can't run any spark YARN app for longer 
than the expiry time of your delegation tokens, so in a secure cluster you have 
a limit of a couple of days anyway. Unless your cluster is particularly 
unreliable, AM failures are usually pretty unlikely in such a short timespan. 
Container failure is more likely as 1) you have more of them and 2) if you have 
pre-emption turned on in the scheduler or are pushing the work out to a label 
containing spot VMs, the will fail.

In our analysis, for both of these use cases, a working HA solution can be 
built by

  1.  Preserving the state of executors (not killing them on driver failures)

This is a critical one


  1.  Persisting some meta info required by Spark SQL and Block Manager.

again, needs a failure tolerant storage mechanism. HDFS and ZK can work 
together here, but your code needs to handle all the corner cases of 
inconsistency, including the "AM failure partway through state update" scenario.

Sometimes you even need to reach for the mathematics, with TLA+ being the 
language of choice. Start with the ZK proof paper to see if you can get a vague 
idea about what it's up to -as that gives hints about how its behaviour may not 
be what you expect.

  1.  Restarting and reconnecting the Driver to AM and executors

I don't know how Akka can recover from this. Existing long-lived YARN services 
use the Hadoop 2.6+ YARN registry, which was done with this purpose in mind. 
Example, slider: when the containers lose contact with the AM, they pol the 
registry to await a new AM entry.

This would preserve the cached data, enabling the application to come back 
quickly. This can further be extended for preserving and recovering the 
Computation State.


There's also

  1.  Credential recovery. Restarted AMs get an updated HDFS delegation token 
by way of YARN, but nothing else.
  2.  Container/AM failure tracking to identify failing clusters. YARN uses a 
weighted moving average to decide when an AM is unreliable; on long-lived 
services the service itself should reach the same decisions about containers 
and nodes.
  3.  Testing. You need to be confident that things are resilient to failure 
and network partitions. Don't underestimate the effort here -Jepsen shows what 
is needed ( https://aphyr.com/ ). Saying "Zookeeper handles it all" doesn't 
magically fix things.

I an HA runtime is ultimately a great thing to have —but don't underestimate 
the effort.




I would request you to share your thoughts on this issue and possible future 
directions.

Regards,
Ashish

Reply via email to