Interested in working with us? We are hiring!

See open positions

Spark on EMR

Sadie Wilhelm Written by Sadie Wilhelm, January 25, 2016

High Level Problem

Our Dynamic Ads product is designed to show a user ads with specific, recommended products based on their browsing history. Each advertiser gives us a list of all of its products beforehand so that we can generate a “product catalog” for the advertiser. Sometimes, however, the product catalog that the advertiser has given us may not accurately reflect all of the products that exist on their website, and our recommendations become less accurate. Thus, we wanted to run a daily data processing job that would look at all products viewed on a given day and determine if we had a matching record in the advertiser’s product catalog. Then, we could aggregate that data to calculate a “match rate” for each advertiser, allowing us to easily target and troubleshoot troubled product catalogs.

In order to accomplish this task, we needed to perform the following actions:

Why PySpark?

Given that we were faced with a large-scale data processing task, it became clear that we would need to use a cluster computing solution. As a new engineer who is most familiar with Python, I was initially drawn to Spark (as opposed to Hadoop) because I would be able to use PySpark, a Python wrapper around Spark. While there are other Python libraries to perform mapreduce tasks (like mrjob), PySpark was the most attractive of the Python options due to its intuitive, concise syntax and its easy maintainability. Moreover, the performance gains from using Spark were also a major contributing factor.

While the PySpark code needed to build the necessary RDDs and write to the database ended up being rather trivial, I ran into numerous operational roadblocks around deploying and running PySpark on EMR. Many of these issues stemmed from the fact that despite the new EMR 4.1.0 AMI supporting Spark and PySpark out of the box, it was difficult to find documentation on configuring Spark to optimize resource allocation and deploying batch applications. Among the various issues I encountered and ultimately solved were: installing Python dependencies, setting Python 2.7 as the default, configuring executor and driver memory, and running the spark-submit step. In this post, I present a guide based on my experience that I hope serves to smooth the development process for future users.

Spark

    ##### INEFFICIENT CODE #####
    base_rdd = sc.parallelize(presto_data)

    # We do not actually query DynamoDB until foreachPartition is called,
    # as map is a transformation and foreachPartition is an action.
    rdd1 = base_rdd.map(query_dynamo_db)
    rdd1.foreachPartition(add_to_db)

    # At this point, because query_dynamo_db was within a map function
    # (a transformation, which is remembered by rdd1), we actually query
    # DynamoDB all over again when foreachPartition is called.
    rdd2 = rdd1.map(lambda x: (x[0], x[1])).reduceByKey(lambda x,y: x+y)
    rdd2.foreachPartition(add_to_db)
    
    ##### IMPROVED CODE #####
    base_rdd = sc.parallelize(presto_data)
    rdd1 = base_rdd.map(query_dynamo_db)
    rdd1.foreachPartition(add_to_db)

    # Here we build the second RDD based on the data which was added
    # to the database in the first foreachPartition action, instead of
    # based upon rdd1. Thus, we do not repeat the DynamoDB calls.
    db_rows = get_rows_from_db()
    new_rdd = sc.parallelize(db_rows)
    rdd2 = new_rdd.map(lambda x: (x[0], x[1])).reduceByKey(lambda x,y: x+y)
    rdd2.foreachPartition(add_to_db)
    
  base_rdd = sc.parallelize(presto_data, 80)
  
  from pyspark import SparkContext
  sc = SparkContext()
  sc.addPyFile('/home/hadoop/spark_app.zip')
  
  def add_to_db(iterator):
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    engine = create_engine(DATABASE_URL)
    session = sessionmaker(bind=engine)
    Session = session()
    for obj in iterator:
        Session.add(obj)
    Session.commit()

  my_rdd.foreachPartition(add_to_db)
  

EMR

  #!/bin/bash

  # Install our dependencies - replace these libraries with your needs!
  sudo yum install -y gcc python-setuptools python-devel postgresql-devel
  sudo python2.7 -m pip install SQLAlchemy==1.0.8
  sudo python2.7 -m pip install boto==2.38.0
  sudo python2.7 -m pip install funcsigs==0.4
  sudo python2.7 -m pip install pbr==1.8.1
  sudo python2.7 -m pip install psycopg2==2.6.1
  sudo python2.7 -m pip install six==1.10.0
  sudo python2.7 -m pip install wsgiref==0.1.2
  sudo python2.7 -m pip install requests[security]

  # Download code from S3 and set up cluster
  aws s3 cp s3://your-bucket/spark_app.zip /home/hadoop/spark_app.zip
  aws s3 cp s3://your-bucket/process_data.py /home/hadoop/process_data.py
  
  import boto3

  def lambda_handler(json_input, context):
      client = boto3.client('emr', region_name='us-west-2')

      client.run_job_flow(
          Name='YourApp',
          ReleaseLabel='emr-4.1.0',
          Instances={
              'MasterInstanceType': 'm3.xlarge',
              'SlaveInstanceType': 'm3.xlarge',
              'InstanceCount': 21,
              'Ec2KeyName': 'ops',
              'KeepJobFlowAliveWhenNoSteps': False,
              'TerminationProtected': False,
              'Ec2SubnetId': 'subnet-XXXX
          },
          Steps=[
              {
                  'Name': 'YourStep',
                  'ActionOnFailure': 'TERMINATE_CLUSTER',
                  'HadoopJarStep': {
                      'Jar': 'command-runner.jar',
                      'Args': [
                          'spark-submit',
                          '--driver-memory','10G',
                          '--executor-memory','4G',
                          '--executor-cores','4',
                          '--num-executors','20',
                          '/home/hadoop/process_data.py'
                      ]
                  }
              },
          ],
          BootstrapActions=[
              {
                  'Name': 'cluster_setup',
                  'ScriptBootstrapAction': {
                      'Path': 's3://your-bucket/subfolder/setup.sh',
                      'Args': []
                  }
              }
          ],
          Applications=[
              {
                  'Name': 'Spark'
              },
          ],
          Configurations=[
              {
                  "Classification": "spark-env",
                  "Properties": {

                  },
                  "Configurations": [
                      {
                          "Classification": "export",
                          "Properties": {
                              "PYSPARK_PYTHON": "/usr/bin/python2.7",
                              "PYSPARK_DRIVER_PYTHON": "/usr/bin/python2.7"
                          },
                          "Configurations": [

                          ]
                      }
                  ]
              },
              {
                  "Classification": "spark-defaults",
                  "Properties": {
                      "spark.akka.frameSize": "2047"
                  }
              }
          ],
          VisibleToAllUsers=True,
          JobFlowRole='EMR_EC2_DefaultRole',
          ServiceRole='EMR_DefaultRole'
      )
  
  aws emr create-cluster --name "YourApp" --release-label emr-4.1.0 \
  --use-default-roles \
  --ec2-attributes KeyName=YourKey,SubnetId=YourSubnet \
  --applications Name=Spark \
  --configurations https://bucket.s3.amazonaws.com/key/pyspark_py27.json \
  --region us-west-2 --instance-count 21 --instance-type m3.xlarge \
  --bootstrap-action Path="s3://your-bucket/path/to/script" \
  --steps Type=Spark,Name="StepName",ActionOnFailure=TERMINATE_CLUSTER,\
          Args=[--driver-memory,10G,\
                --executor-memory,4G,\
                --executor-cores,4,\
                --num-executors,20,\
                /home/hadoop/process_data.py]
  
  {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "elasticmapreduce:RunJobFlow",
                "iam:PassRole"
            ],
            "Resource": "*"
        }
      ]
    }
  

Closing Thoughts

While there were several hurdles to overcome in order to get this PySpark application running smoothly on EMR, we are now extremely happy with the successful and smooth operation of the daily job. I hope this guide has been helpful for future PySpark and EMR users.

Finally, after taking a one-day Spark course, I have realized that for performance reasons, it likely makes sense to use Spark’s dataframes instead of RDDs for some parts of this job. Stay tuned for a future post on transitioning to dataframes and (hopefully) resulting performance gains!