Ravi Manjunatha
6 min readOct 18, 2021

--

Spark on EMR Data Pipeline with Apache Airlfow

Large enterprises with big on-premise footprint of Hadoop clusters powering Spark, prefer Amazon EMR as their cloud migration solution. This lift and shift approach ensures, the developers and the end user community do not get hazzled by new ways of doing things. They get easy options to scale their clutsers based on the workload and can decouple their storage and compute as well. This is a game changer in terms of the support and maintenance of the Hadoop cluster and the use of resevred and spot instances can reduce the cost by 40 -50%.

AWS provides multiple options to orchestrate these pipelines such as Lambda, AWS Data Pipeline and Managed Apache Airflow.

In this article, i will share how Apache Airflow can be used to build data pipelines using data from S3, process the data from Spark application luanched in EMR and how the processed data can be stored back to S3.

Following are the pre-requistes,

  1. Create Apache Airflow managed instance on AWS using the Managed Apache Airflow (MWAA) detailed article regarding this can be found here.
  2. Create a bucket in S3 with ‘Source’ folder (where source data will be stored), ‘Destination’ Folder (where processed data from EMR will be stored) , ‘Dags’ folder (where the Airflow code will be saved), ‘Scripts folder ( where the Pyspark script will be saved).
  3. We will use the Spark case study with pyspark code as in the AWS EMR tutorial here. The input data is a modified version of Health Department inspection results in King County, Washington, from 2006 to 2020. The script processes food establishment inspection data and returns a results file in your S3 bucket. The results file lists the top ten establishments with the most “Red” type violations.

4. Source dataset, Airflow Code and Pyspark code used for this demo can be found in this git link.

Case Study :

A sample case study for this demo is as follows, data from multiple internal and external sources could land in S3 (data lake). This raw data needs to be processed by using Spark on EMR. The processed data should then be stored back to S3. Athena or Redshift can be used to query the processed data in S3.EMR cluster itself should be operated in a transient way.

Solution :

Airflow will be used to luanch EMR cluster with Spark installed. The cluster will be luanched in step mode. As a first step, data from S3 will be copied to EMR, in the second step, PySpark will process the The Pyspark script will then process the raw data in S3. The processed data will be stored back in S3. Once this is completed, the EMR cluster will be terminated.

The steps illustraing this solution are mentioned below,

  1. Make sure you have the Managed Airflow set up, the source data is in source folder of S3 bucket, Airflow code is in dags folder of S3 bucket, Pyspark code is in scripts folder of S3 bucket. Your Airflow environemnt in AWS should look like this,

2. Since Airflow should luanch EMR and then terminate it, Airflow Role should have full access on EMR. Make sure you attach this policy for the Airflow Role,

3. Create these default roles for EMR to be accessed and EMR to access files in S3,

4. Unpause the Airflow Dag, since it is the first run, the Dag will start to run automatically,

5. You can now head to the EMR console, to see the EMR Cluster getting created,

6. It takes about 5–6 minutes for the Cluster to be set up. I have taken On-Demand Instance of EC2 as i had a limit on the Spot instances.

7. Once the Cluster is ready, the cluster state changes from ‘Starting’ to ‘Running’

8. Each of the steps now start to execute one after the other as shown below,

9. If you click on the Log of the ‘Watch Step’ of the Airflow Job, you can notice the step poking each of the EMR steps to check for completion,

10. Once all the Steps are completed, the Cluster is termintaed, by the ‘terminate emr cluster’ step of the Airflow Job.

11. One can see all traffic lights turned to green, in the Airflow UI,

12. The output file is loaded the output folder under the S3 bucket,

The Top 10 violations can be seen as below,

13. You can now load the output file from S3 to Athena or Redhshift and then to Quicksight or Tableau based on your use case.

14. Make sure you delete the Airflow Enviornment once you are done with the poc or demo or you no longer require it. Check all associated VPCs, NAT Gateways of the Airflow environment. It costs real money !

15. In Real world scenario, you can schedule the Airflow job to run at pre-defined intervals, so the entire flow is automated. Your actual data could be in a SFTP server or in a RDBMS. Add a copy step to this Airflow code, to copy the file from the SFTP server or RDBMS server to S3 and then resuse the rest of the steps from this Airflow DAG.

16. If all goes well, the time taken for the entire set up right from creating Airflow Environemnt to the DAG and lunaching EMR cluster and processing and terminating should be completed in about an hour’s time. Of course, this is the happy path scenario, we may run into erros, please feel free to reach out to me for any issues. Happy to Help!

--

--