Spark on EMR
High Level Problem
Our Dynamic Ads product is designed to show a user ads with specific, recommended products based on their browsing history. Each advertiser gives us a list of all of its products beforehand so that we can generate a “product catalog” for the advertiser. Sometimes, however, the product catalog that the advertiser has given us may not accurately reflect all of the products that exist on their website, and our recommendations become less accurate. Thus, we wanted to run a daily data processing job that would look at all products viewed on a given day and determine if we had a matching record in the advertiser’s product catalog. Then, we could aggregate that data to calculate a “match rate” for each advertiser, allowing us to easily target and troubleshoot troubled product catalogs.
In order to accomplish this task, we needed to perform the following actions:
- Query AdRoll’s raw logs via Presto for initial data set (8 million rows).
- For each row in the data set, query DynamoDB.
- Add each new row to a table in a PostgreSQL database (Amazon RDS).
- Summarize (reduce) the data set to get an aggregated view.
- Add each row to another aggregated table in the PostgreSQL database.
Why PySpark?
Given that we were faced with a large-scale data processing task, it became clear that we would need to use a cluster computing solution. As a new engineer who is most familiar with Python, I was initially drawn to Spark (as opposed to Hadoop) because I would be able to use PySpark, a Python wrapper around Spark. While there are other Python libraries to perform mapreduce tasks (like mrjob), PySpark was the most attractive of the Python options due to its intuitive, concise syntax and its easy maintainability. Moreover, the performance gains from using Spark were also a major contributing factor.
While the PySpark code needed to build the necessary RDDs and write to the database ended up being rather trivial, I ran into numerous operational roadblocks around deploying and running PySpark on EMR. Many of these issues stemmed from the fact that despite the new EMR 4.1.0 AMI supporting Spark and PySpark out of the box, it was difficult to find documentation on configuring Spark to optimize resource allocation and deploying batch applications. Among the various issues I encountered and ultimately solved were: installing Python dependencies, setting Python 2.7 as the default, configuring executor and driver memory, and running the spark-submit step. In this post, I present a guide based on my experience that I hope serves to smooth the development process for future users.
Spark
- Transformations vs. Actions
- Spark RDDs support two types of operations: transformations and actions. As the documentation clearly explains, transformations (like
map
) create a new dataset from an existing one, but they are lazy, meaning that they do not compute their results right away. In other words, transformations simply remember which transformations were applied to the base data. Actions (likeforeachPartition
), on the other hand, return a value to the driver after running a computation. The computations associated with a transformation are not actually computed until an action is called. - An understanding of this difference is hugely important in actually leveraging Spark’s performance gains. For example, prior to understanding this crucial difference, I was actually making the 8 million DynamoDB calls twice, instead of just once. With this understanding, I was able to decrease my job runtime by more than 50%.
- Below, I have shown the difference between the code before and after this realization. Depending on the size of your data set and the memory available, you can either call
.persist()
on the RDD that you will reuse (rdd1
in our case) or write the data to disk. Due to the size of our data set and the fact that the intermediary RDD was precisely what I wanted to store in the database, I chose to writerdd1
to the database, and then query the database to initiate a new RDD.
- Spark RDDs support two types of operations: transformations and actions. As the documentation clearly explains, transformations (like
- Partitions
- An important parameter when considering how to parallelize your data is the number of partitions to use to split up your dataset.
- Spark runs one task (action) per partition, so it’s important to partition your data appropriately.
- Spark’s documentation recommends 2-4 partitions for each CPU in the cluster.
- RDDs created from
textFile
orhadoopFile
will use the mapreduce API to determine the number of partitions and thus will have a reasonable predetermined number of partitions. - On the other hand, the data set returned from my Presto query returns a number of partitions determined by the
spark.default.parallelism
setting. By default, this setting sets the number of partitions to the number of executors (spark.executor.instances
, 20 in our case) or 2, whichever is greater. Thus, in order to optimize, we manually tuned the number of partitions to be equal to 80 (20 executors * 4 partitions each).
- Importing from Self-created Modules
- Unlike in base Python, where one is able to import from any other Python file within a given repository, in Pyspark, external self-created libraries need to be zipped up and then added to the global SparkContext object.
- First the SparkContext object must be created, and then the zip file is added to the object via the addPyFiles method.
- Using Sqlalchemy
- Because of the distributed nature of Spark, a Sqlalchemy
Session
object can not be shared across multiple executors or paritions. - For this reason, a new
Session
object must be created per partition, andsqlalchemy.create_engine
andsqlalchemy.orm.sessionmaker
must be imported within each partion. All of this is done within a function called byforeachPartition
.
- Because of the distributed nature of Spark, a Sqlalchemy
EMR
- Packaging & Deploying Code
- The PySpark code was organized as follows:
- Use Presto to query raw logs on S3 each day for the count (
view_count
) of product views per product_id. - Convert the results of this query into an RDD.
- Using Spark, map over this RDD to query DynamoDB to see if each row in the RDD has a matching record in DynamoDB. The result of this mapping function is a new RDD (named “Product RDD” in the diagram below), which now has a boolean field
matched
to indicate whether or not a matching record was found. - Write the contents of this new RDD to the Postgres database, an AWS RDS instance.
- Reduce this data to the advertiser level, giving a summary of the number of matched views vs. total views by advertiser by day.
- Use Presto to query raw logs on S3 each day for the count (
- As discussed above, in order to access self-created Python modules from within my main PySpark script, the modules had to be zipped up. My solution to this was to create a main module called
spark_app
, within which I had sub-modulesmodel
andutils
. I then created adeploy.py
script that serves to zip up thespark_app
directory and push the zipped file to a bucket on S3. - Similarly, the
deploy.py
script also pushesprocess_data.py
(the main PySpark script) andsetup.sh
(the bootstrap action script) to the same bucket on S3. - Bootstrap actions are run before your steps run. In this way, they are used to set up your cluster appropriately. For our purposes, that involved installing my necessary Python dependencies and copying
spark_app.zip
andprocess_data.py
to the right directory. - Below is our
setup.sh
script:
- The PySpark code was organized as follows:
- Running the Job
- Ultimately, I chose to run this Spark application using a scheduled Lambda function on AWS (discussed below), but for the purposes of this blog post, I have included both the Lambda function which utilizes boto3 and the equivalent cluster launch using the
aws cli
. - AWS Lambda Function:
- Ultimately, I chose to run this Spark application using a scheduled Lambda function on AWS (discussed below), but for the purposes of this blog post, I have included both the Lambda function which utilizes boto3 and the equivalent cluster launch using the
- AWS CLI Equivalent:
- Notes on Parameters:
- Configurations
- Because we were also accessing data from Presto within the main Pyspark script, it was necessary that all of our machines were using Python 2.7 as the default (Hive/Presto requirement). This involved configuring
PYSPARK_PYTHON
andPYSPARK_DRIVER_PYTHON
environment variables. - Additionally, we needed to add a special configuration to increase the
spark.akka.frameSize
parameter. The maximum frame size is 2047, so that is what we have set it to. This may be unnecessary for some applications, but if you receive an error about frameSize, you will know that you need to add in a configuration for this parameter. - Note: the
--configurations
parameter in the AWS CLI example simply provides a url to a json file stored on S3. That file should contain the json blob fromConfigurations
in the boto3 example above.
- Because we were also accessing data from Presto within the main Pyspark script, it was necessary that all of our machines were using Python 2.7 as the default (Hive/Presto requirement). This involved configuring
- Steps
- Within the Spark step, you can pass in Spark parameters to configure the job to meet your needs. In our case, I needed to increase both the driver and executor memory parameters, along with specifying the number of cores to use on each executor. We also found that we needed to explicitly stipulate that Spark use all 20 executors we had provisioned.
- The final argument in the list is the path to the main PySpark script (
process_data.py
) that runs the actual data processing job.
- Applications
- You must explicitly specify that you intend to run a Spark application.
- Configurations
- Scheduling the Job
- With the recent announcement of Amazon’s newly available ‘Scheduled Event’ feature within Lambda, we were able to very easily set up a recurring cluster launch on a cron schedule.
- As shown above, the code to launch the cluster was extremely simple, leveraging the power of boto3’s
run_job_flow
method. - The only wrinkle here was properly configuring the role for the lambda function, so that it would have the right permissions to launch an EMR cluster. It was necessary that the action “elasticmapreduce:RunJobFlow” had the “iam:PassRole” role associated with it.
- Below is the policy for the role:
Closing Thoughts
While there were several hurdles to overcome in order to get this PySpark application running smoothly on EMR, we are now extremely happy with the successful and smooth operation of the daily job. I hope this guide has been helpful for future PySpark and EMR users.
Finally, after taking a one-day Spark course, I have realized that for performance reasons, it likely makes sense to use Spark’s dataframes instead of RDDs for some parts of this job. Stay tuned for a future post on transitioning to dataframes and (hopefully) resulting performance gains!