Best Practices for Deploying Spark Applications

There is a lot of confusion swirling around deploying Spark Applications.

This article is meant to explicitly define the components involved in deploying your Spark Application in varying configurations popular among Spark users.

 

Spark Applications

Every Spark Application's main() method gets converted by the Spark Execution Engine into the following:

Application == Job

  -> Stages (defined by shuffle boundaries)

      -> Tasks (thread within a JVM Process)

A Job containStages are defined by shuffle boundaries.

 

Physical Resources

Worker Node

Forget everything you know about Spark Deployment and just accept that there is only 1 type of Physical Node in Spark:  the Worker Node.  Everything else is a Process that could be running on any Physical Node.

The Worker Node has a fixed number of CPUs (ie. 32) and amount of memory (ie. 244 GB).

Spark creates Processes that run on these Worker Nodes and carry out a specific function.  More about these Processes in the next section.

CPU Core

Every Task that executes on behalf of your Spark Application requires a Thread within an Executor Process.

Thread

Every Thread requires an available CPU on a Physical Worker Node within the cluster.

Memory

Spark has 2 types of memory regions:

  • Data Cache:  Stores your data in-process or off-heap for fast retrieval of iterative workloads
  • Spark Execution Engine:  Network buffers for data shuffles, joins, aggregations, etc used by Spark internally 

As of Spark 1.6, these memory regions are adaptive and can change from Spark Application to Application.

 

Processes

Cluster Resource Manager Process (Usually JVM, but could be C++ or other runtime)

A Cluster Resource Manager is responsible for managing all available cluster resources - specifically, CPU and Memory - among all Physical Spark Worker Nodes.

The Cluster Resource Managers allocate Physical Spark Worker Node resources to your Spark Applications when you call spark-submit.

There are 3 Cluster Resource Managers that are popular with the Spark ecosystem:

  • Spark Standalone (default)
  • YARN (hybrid MapReduce and Spark workloads)
  • Mesos (hybrid + coarse and fine-grained mode.)

We'll cover each of these in more detail later.

*Note:  In Spark Standalone, the Cluster Resource Manager Process is often called the Master Process or Master Node.  This is a source of much confusion as this is usually not a separate Physical Node, but rather a JVM Process running on a Physical Worker Node that handles resource allocations.

Application Driver Process

A single Application Driver Process is created each time you run spark-submit.

The Application Driver Process is responsible for the following:

  • Work with the Spark Execution Engine to break your Spark Application into Job -> Stages -> Tasks.
  • Work with the Cluster Resource Manager Process to find a Physical Worker Node with enough CPU and Memory to carry out Tasks that make up your Spark Application
  • Serve up any library dependencies to the Executor Process needed by the Executor Process performing Tasks on behalf of your Spark Application.

Executor Process

The Executor Process is a process that runs on the Physical Worker Node - potentially alongside many other processes including the Application Driver Process or the Cluster Resource Manager Process.

Executor Processes are reused across Spark Applications, so they do not incur the usual startup and teardown costs of a typical MapReduce Executor Process.

Ideally, the Executor Process is running on the same node - or in the same rack - where the data lives.  This is called data locality and is one of the following (in order of most to least desirable):

  • PROCESS_LOCAL (data is retrieved from in-memory cache on the same Physical Worker Node and in the same Physical Process that is executing the Task)
  • NODE_LOCAL (data is retrieved from a disk on the same Physical Worker Node that runs the Physical Process that is executing the Task)
  • RACK_LOCAL (data is retrieved from a disk of a Physical Worker Node in the same rack as the Physical Worker Node that is executing the Task) 
  • ANY (data is retrieved from disk of a Physical Worker Node outside the rack of the Physical Worker Node that is executing the Task)

Note:  ANY usually requires going through network switches across to other racks within a Data Center - or other Data Centers completely.  This is the slowest data locality and should be avoided where possible.  The Spark UI will show the data locality of each Task that executes on behalf of your application.

The Executor Process is responsible for executing Tasks.  Each Task runs on a separate Thread within the Executor Process (ie. JVM Thread).  Each Thread occupies a CPU and Memory for the duration of the Task.

The number of Threads and amount of Memory needed by a Spark Application is either defaulted to the values in spark-default.conf or when spark-submit is called.

The Application Driver Process works with the Cluster Resource Manager Process to both fulfill the Thread/Memory request - as well as find a Executor Process on a Physical Worker Node that offers the best data locality for each Task based on the data needs of that Task - ideally, PROCESS_LOCAL or NODE_LOCAL.

 

Deployment Modes

When launching a Spark Application (aka "Spark Job"), you use the `spark-submit` script as follows:

spark-submit --jars $SPARK_SUBMIT_JARS --packages $SPARK_SUBMIT_PACKAGES --class com.advancedspark.streaming.rating.store.Cassandra $PIPELINE_HOME/myapps/streaming/target/scala-2.10/streaming_2.10-1.0.jar 2>&1 1>$PIPELINE_HOME/logs/streaming/ratings-cassandra.log &

You can also use the experimental REST API equivalent as follows:

curl -X POST http://127.0.0.1:6066/v1/submissions/create --header "Content-Type:application/json;charset=UTF-8" --data '{

"action" : "CreateSubmissionRequest",
"appArgs" : [ "10" ],
"appResource" : "file:/root/spark-1.5.1-bin-fluxcapacitor/lib/spark-examples-1.5.1-hadoop2.6.0.jar",
"clientSparkVersion" : "1.5.1",
"environmentVariables" : {
"SPARK_ENV_LOADED" : "1"
},
"mainClass" : "org.apache.spark.examples.SparkPi",
"sparkProperties" : {
"spark.jars" : "file:/root/spark-1.5.1-bin-fluxcapacitor/lib/spark-examples-1.5.1-hadoop2.6.0.jar",
"spark.app.name" : "SparkPi",
"spark.submit.deployMode" : "client",
"spark.master" : "spark://127.0.0.1:7077",
"spark.executor.cores" : "2",
"spark.executor.memory" : "2048m",
"spark.cores.max" : "2",
}
}'

```

All available properties for spark-submit are detailed here:

  http://spark.apache.org/docs/latest/submitting-applications.html 

When reasoning about the deployment mode of a Spark Application, think about where the spark-submit script physically creates the Application's Driver Process that acts as the Application-specific logic controller - regardless of where you run the spark-submit call (ie. from your laptop).

Client Deployment Mode

In Client Deployment Mode, the spark-submit script creates the Application Driver in the same Process as the spark-submit script itself.  The console (ie. your laptop's terminal) becomes the input and output of the job.  This makes debugging easier as any output is written directly to the console.

This mode is typically for REPL-based experimentation and local development, but not well-suited for long-running production Spark Applications.  Also, remember that the Application Driver is a logic coordinator - and therefore should be co-located with the Physical Worker Nodes, otherwise network communication between the Application Driver (running on your local laptop) and the Physical Worker Nodes (in the cluster) could dominate the total processing time of the Spark Application.   

Recovering from a failure is not automated in Client Deployment Mode.  If, for example, you shut down your laptop, the spark-submit process - as well as the Spark Application Driver running within that Process - will die.  This will cause the Spark Application to fail - even if the Physical Worker Nodes are up and running.

Cluster Deployment Mode

In Cluster Deployment Mode, the spark-submit script runs the Application Driver Process on a Physical Worker Node in the cluster.  The choice of Physical Worker Node cannot be controlled.  

This requires that all dependent libraries (jars/pys) exist on all Physical Workers Nodes - otherwise, the Application Driver JVM Process may not find the libraries - and therefore won't be able to serve up the libraries when the Executor JVM Processes request them.  

In all Spark deployment scenarios, dependent libraries are always requested by the Executor Processes and served up from the Application Driver Process using an embedded HttpServer inside running inside the Application Driver Process.

The dependent libraries are stored in a per-Spark Application work directory on the Physical Worker Node running each Executor Process.  These libraries need to be cleaned up per the spark.worker.cleanup.appDataTtl property.

Cluster Deployment Mode also lets you specify "supervise" mode which will restart the Application Driver Process if it fails for whatever reason.  This is especially useful for long-running Spark Applications such as Spark Streaming.

Cluster Deployment Mode lets you close your laptop without worrying about killing the Spark Application as all Driver controller logic is performed in the cluster.

 

Cluster Resource Managers

Spark Standalone

Spark's default Cluster Resource Manager.  Only supports Spark-based workloads.

YARN

Hadoop's Yet Another Resource Negotiator (YARN) supports hybrid Spark and MapReduce workloads such as Pig, Hive, Cascading, etc.

Mesos

Also supports hybrid Spark and MapReduce workloads as well as the following types of resource allocation:

  • Coarse-grained:  Once resources are allocated to a Spark Application, the must be explicitly released by the Spark Application when it completes
  • Fine-grained:  Resources can be released by the Mesos Cluster Resource Manager if they are not actively being used.  

Note:  In Fine-grained mode, if a user comes back to his/her spark-shell (a Spark Application, itself) after a long lunch, there may be a delay before the spark-shell will respond to commands as resources need to be reallocated from the cluster.

 

Best Practices, Capacity Planning, and Tuning 

While any serious tuning and capacity planning should be done empirically against your specific workloads, here's a couple things to remember when deciding between running a small number of large Executor Processes or running a large number of small Executor Processes:

  • Large JVM's (>32 GB) could potentially suffer from performance issues including lack of Compressed OOPs and excessive Garbage Collection
  • The larger the Executor Process (ie JVM), the higher chance that you will achieve PROCESS_LOCAL data locality where the data is cached in the same Executor Process that is executing the Task.
  • As of Spark 1.2, network I/O used for shuffling data between Executor Processes is handled by Netty.  Sharing data between Executor Processes on the same Physical Worker Node uses efficient memory mapping and zero-copy techniques.  In other words, don't be shy about loading up a single Physical Worker Node with as many Executor Processes as the Node's CPU and Memory resources will allow.
  • Don't forget to allow enough memory for the underlying Operating System to effectively manage Page Cache.

 

Related Links

Spark Standalone Documentation

http://spark.apache.org/docs/latest/spark-standalone.html#launching-spark-applications

http://spark.apache.org/docs/latest/submitting-applications.html

Hidden, Experimental Spark REST API for Job Submission

http://arturmkrtchyan.com/apache-spark-hidden-rest-api

Cloudera Blog Post:  Spark on YARN

http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/

Cloudera Blog Post:  Tuning Spark Jobs (2 Parts)

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

Have more questions? Submit a request

Comments