This is Part II of the Databricks certified associate developer for Apache Spark preparation series. In Part I we discussed exam details, pre-requisites, and recommended preparation.
We are following the preparation materials as recommended by Databricks (covered in Part I). So, in this article we will start from Spark’s architecture and try to cover it from certification’s point of view. I will also try to add helpful resources wherever possible.
Basic Architecture
Apache Spark is a distributed processing engine. It is very fast due to its in-memory parallel computation framework. Keep in mind that Spark is just the processing engine, it needs a separate storage (e.g. HDFS) to write data permanently. A typical Spark application runs on a cluster of machines (also called nodes). It breaks a bigger task into multiple smaller ones and distributes it among these machines. However, this creates a challenge to coordinate and manage work across these machines. Spark has this functionality built into it and its architecture is designed to handle large distributed applications. There are several core components and roles assigned to these that help execute this distributed work.
Driver
The driver (or driver program) is a process running on one of the machines that is responsible for whole application lifecycle. It maintains the information about Spark application. It analyzes, distributes, and schedules tasks (processing to be done) on executors (worker machines). It also responds to user after success/failure of the application. Driver is analogous to an orchestrator. Like an orchestrator, the driver doesn’t perform any computations it just manages the overall application lifecycle (Figure 1).
Executors
Executors are the worker processes that run on other machines in the cluster. There can be multiple executors running on same machine. Each executor has a collection of data— also called partition — that they execute code on, as assigned by the Driver. They also report the state (success/failure) of assigned work, back to the Driver (Figure 1).
Slots
Each executor can have multiple slots available for a task (as assigned by Driver) depending upon the cores dedicated by the user for the Spark application. This means that there are two levels of parallelism: First, work is distributed among executors and then an executor may have multiple slots to further distribute it (Figure 1).
Tasks
A task is the work to be done on a partition (collection of rows). These are assigned by Driver to executors. These will run in the slots available on executor machines (Figure 1).
Jobs
A job is a parallel action in Spark. A spark application — maintained by the Driver — can contain multiple jobs.
SparkSession
The SparkSession is a Driver process that controls your Spark application. It is the entry point to all of Spark’s functionality. For Databricks notebooks and REPL environment, it is already created under the variable “spark” (Figure 2). However, when writing applications you have to create it yourself as show below.
## Create a SparkSession when building applicationsfrom pyspark.sql import SparkSessionspark = SparkSession.builder.master("local")
.appName("My First App")
.config("config.name","config.value")
.getOrCreate()
After creating SparkSession, you will be able to run pyspark code. SparkSession was added in Spark 2.x version. In earlier versions we have two separate contexts: SparkContext and sqlContext which provided functionality for interacting with RDDs and Dataframes/Hive respectively. Both of these functionalities has now been merged into SparkSession however, SparkContext and sqlContext have not been deprecated yet and can still be accessed individually (Figure 3).
Transformations, Actions and Execution
Transformations are the functions that help you implement the logic. One can use these to modify underlying data. Examples of transformations are select, filter, groupBy, orderBy, limit etc. Transformations are lazy in nature, that means the processing is not performed until an action is called. Spark just builds a lineage and waits for an action to be called to execute Transformations in that lineage (Figure 4). Lazy evaluation makes it easy to perform operations in parallel and allows for various optimizations.
Transformations can be Narrow or Wide in nature. Narrow transformations do not incur a shuffle (movement of data among machines over network) i.e. data required to compute the result, resides on at-most one partition. On the other hand, Wide transformations cause a shuffle as the underlying data resides in many partitions and required to be re-distributed across machines.
Actions
Actions are the statements that when encountered, triggers the computation from the lineage. While transformations are lazy in nature, Actions are eager. Examples of Actions are show(), count(), collect() etc. e.g.
1. Data_file = spark.read.csv(“/home/users/some_random_data.csv”)2. Selection_df = data_file.select(“first column”, “second column”)3. Filter_df = selection_df.where(“second column is not NULL”)4. Group_df = filter_df.groupBy(“first column”)5. Count_df = Group_df.count()
The above code example shows a typical spark application. For steps 1–4 we are reading data from a csv file and applying a bunch of transformations. Spark is only building a lineage in memory for these steps and no actual processing is performed. Step 5 is a count action, when Spark reaches that it traces the lineage back to step 1 and performs all the processing and outputs the count.
Pipelining
Whenever a shuffle is encountered in a Spark application, data is written to executor disks. However, all the steps before the shuffle operation can be clubbed together and performed at once. This is called pipelining and it makes queries even faster combined with the fact that Spark does its processing in memory and does not spill immediately to disk.
Catalyst Optimizer
Spark dataframes — built on top of Spark SQL — get their performance speeds using underlying catalyst optimizer. Catalyst optimizer finds the most efficient ways to apply your transformations and actions. Catalyst optimizer is the reason why the dataframes have better performance than RDDs.
Note: RDDs are not included in Databricks Spark Certification, so we will not be discussing it here.
Whenever we run a query using Spark SQL (it can be dataframe code in pySpark as well), it undergoes several planning stages before converting into physical plan and getting executed. Using Dataframes and Spark SQL means that you are relying on catalyst optimizer to optimize your query plan instead of using RDDs and doing it yourself. For exmaple,
### Example By: Sumit M
### For example, you are trying to calculate average salary of employees by age### using RDDs
fileRdd = sc.textFile(“/employeeData.csv”)
fileRdd.map( x => {val fields = x.split(",");
(fields(1), fields(2)) })
.map(x => (x._1, (x._2, 1)))
.reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2))
.map( x => (x._1, x._2._1 / x._2._2))### using Dataframes
data_df = fileRdd.toDF(“username”, “age”, “salary”)
data_df.groupBy($”age”).agg(avg(“salary”))
In the RDD approach:
We are telling the Spark exactly “how to do” using Lambda functions, and Spark can’t optimize that. It directly sends those functions to executors to work on the data. If there’s any possible optimization, we have to do it ourselves.
In the dataframes approach:
We are using the declarative way in which we are telling Spark “what to do” and leaving “how to do ” part on Spark’s optimizer. This makes dataframes’ optimization possible via Catalyst optimizer.
Catalyst Optimizer working
When we submit a query using Spark SQL, it undergoes following steps.
- It creates an unresolved logical plan, and checks for validity of column names and table names etc.
- After that, a resolved logical plan is created. At this step commands might get re-organized to optimize performance.
- Catalyst optimizer might generate at least one physical plan at this stage to execute the query. This stage represents what Spark will actually do, after optimizations have been applied.
- In case of more than one physical plans, the cost for each plan is evaluated using a cost model. Plan with best performance is selected, compiled to java bytecode, and executed.
Caching
Spark can store data in memory during computations. This is a great way to speed up queries even further. We know that Spark is an in-memory processing engine, but it has to read data once from disk before starting the processing. e.g.
Data_file = spark.read.csv(“/home/users/some_random_data.csv”)
This is the first read from disk and every lineage this read is a part of, will have to read it from disk. But you can store this in memory to speed up your processing using caching.
df_cached = Data_file.cache()
df_persisted = Data_file.persist()
cache() and persist() are built-in Spark functions for in-memory storage. cache() only stores to default value (MEMORY ONLY) however, persist has several options e.g MEMORY and DISK persistance. More details can be found here.
You can do the same for any data that is frequently accessed in your application logic. In applications that re-use the same datasets over and over caching is one of the most powerful optimization techniques. When you cache a dataframe, each of its partitions will be temporarily stored in the memory of its executor which will make upcoming reads faster.
Lifecycle of a Spark application
We have covered core components of a Spark application. Let’s have a look on the lifecycle of an application itself. Assume that you created a pyspark application my_first_app.py
and submitted it to the cluster.
spark-submit
--master <url>
--deploy-mode cluster
--conf <some_key> = <some_value>
my_first_app.py
First, this application will communicate with resource manager and ask for resources to run. If that request is successful, Driver program is initiated on one of the nodes (Figure 6). As this is a packaged application, the first thing in the code should be creation on SparkSession.
Once the SparkSession is created, it communicates with the cluster manager asking to launch Spark executor process across the cluster. Keep in mind that number of executor and relevant configurations (cores, RAM etc) are set by the user when submitting the application. Cluster manager responds to request by launching executor processes and sends the relevant information to driver process. After that, the driver and executors communicate, move data around and driver schedules tasks onto each executor (Figure 7). Each executor responds with status (success or failure) and the result. In Figure 7, we have three nodes in total. On Node 01, driver process is running. On Worker 01, two executors are running while Worker 02 has only one executor process running.
After the completion, the driver exits with success or failure and cluster manager shuts down the executors in the cluster.
The actual code that you wrote in my_first_app.py
defines your Spark application and each application can have one or more jobs.
In general, there’s one Spark job for one Action. A Spark job is broken down to series of Stages. Stages represent group of tasks that can be executed together e.g. a Select followed by a Where etc. Whenever data is needed to be shuffled among executors (e.g. in JOIN queries), Spark creates a new stage. Each Stage consists of several Tasks that run in the available Slots on executors.
Summary:
Driver is a program that sends tasks to multiple executors. The secret to Spark’s performance is parallelism i.e. the ability assign work to multiple virtual machines.
Recall that Spark does not execute operations as soon as they are read (lazy evaluation). Instead, it builds up a plan for data transformations that will be applied to your source data. That plan is executed only when you call an action.
The Catalyst Optimizer is designed to find the most efficient plan for applying the transformations and actions you called for in your code.
Congratulations! you have covered the basics of Spark architecture. There’s still some things left especially about Spark UI which I will cover some other time. In the upcoming articles of this series I will focus on Spark programming with Dataframes as it is the biggest part of certification.
Finally, I’d encourage you to follow JustEnough Spark on LinkedIn (btw, its NOT my page :p) for daily MCQs about Spark and Databricks.
Happy Learning 🙂