zeotap makes large scale, deterministic data assets easily available within the digital advertising ecosystem and other industries. Our data engineering team transforms unstructured raw data that we receive from a multitude of data partners into structured readily queryable data which can then be monetized through different distribution channels. In this article, I will begin with a high-level introduction to our data pipeline and elaborate on how we use Apache Oozie as a distributed controller for our data platform with some customisations to make it work for our use case.
Our whole data platform runs on Amazon Web Services (AWS) making heavy use of a diverse set of tools and services provided by AWS. We receive data from various data partners every day in designated S3 buckets in pre-decided formats. We then process it on AWS Elastic MapReduce (EMR) using Apache Spark through various stages in the pipeline. The processed data is uploaded to Amazon Redshift Clusters (an OLAP MPP Database) and made available for efficient querying by different teams as well as by various internal APIs that we have built on top of it. All of this is scheduled and monitored through Apache Oozie that comes along with Amazon’s EMR service.
Apache Oozie is a workflow scheduler for the Hadoop ecosystem that can schedule various MapReduce jobs across an EMR cluster with the help of yarn resource manager. Each Oozie workflow consists of a directed acyclic graph of control flow nodes called actions. A workflow can be executed once using Oozie controller or scheduled periodically using an Oozie Coordinator. An Oozie workflow is described in an XML file. Those are usually not a favourite to work with, but in Oozie they are supported with some really powerful abstractions such as workflow parametrization, fork-join action nodes for parallel execution, a rich library of EL (Expression Language) expressions and email notifications so we can quickly get over it.
In order to set up our data pipeline in Oozie, we composed each stage of the pipeline as a sub-workflow consisting of one or more Spark actions. We also made use of HDFS actions to create the necessary folder structure and shell actions to ensure that the expected data is in fact available. We wrapped all the sub-workflows in yet another parent workflow to run them in sequence and perform the bookkeeping operations as needed. The decoupling of a parent workflow with sub-workflows for each stage enabled us to run one or more stages independently for testing and debugging purposes.
We parametrized the parent workflow to process data for every data partner using Oozie variables. We set up Oozie coordinators to run the corresponding parent workflow every day at specific times of the day. Here, we made use of EL expressions to compute paths to a folder in an S3 bucket containing the latest data for a given data partner. Finally, we added another parent workflow consisting of a Spark action for aggregating data across all data partners followed by a Shell action to upload aggregated data to Redshift.
We wanted to use data triggers in our data pipeline and run a workflow as soon as all the input data would be available. Oozie’s synchronous data triggers didn’t work for our use case as a workflow can only be triggered at a fixed time interval. To circumvent the issue, we simply wrote a Shell script polling data availability every 5 minutes to check whether processed data had been written by the previous workflow.
We use the autoscaling feature of AWS EMR for task nodes to minimize our cost which created an issue at the final stage of the pipeline. While the upload shell script is running, AWS scales down the cluster due to low memory requirements leading to failure of the YARN container running the script. Even when YARN brilliantly restarts the container on a different node, it delays the pipeline unnecessarily.
Looking deeper into the issue, we found out that Oozie runs two containers for each shell action, a driver container and a task container. AWS EMR schedules the driver container only on core nodes whereas task containers can run anywhere in the cluster. As we do not use auto-scaling for core nodes in the EMR cluster, the driver container continues to execute, whereas the task container dies when EMR scales down the cluster. As the driver container keeps running, it restarts the task container on failure.
To resolve the issue, we wanted to figure out a way to schedule the task container on core nodes as well. Interestingly, Oozie allowed us to uberize containers for shell actions. This meant an action could be executed inside the driver container without running any task containers. As the driver container would be scheduled only on core nodes, we would not face the issue anymore.
This is achieved by setting the property oozie.action.launcher.mapreduce.job.ubertask.enable to true in the Oozieconfiguration module.
Oozie shared libraries don’t have the appropriate Spark libraries required to run a Spark action job. We resolved the issue by manually adding/replacing the following libraries to the Oozie shared library folder (Oozie version 4.3.0 on EMR emr-5.6.0) –
While running the pipeline, we observed that the HDFS actions available in Oozie do not work for EMRFS (HDFS equivalent library for S3) due to the unavailability of the EMRFS assembly JAR in the Oozie local libraries. To deal with this issue, we simply added a soft link to the EMRFS assembly JAR already available on the EMR Master node in the Oozie local jar directory.
This is of course just a sneak peak of all the challenges we face while dealing with data at zeotap. The fun part of all of this is that on top of being constantly challenged, we end up overcoming all of this together with the rest of the team members while learning from each other and building the team’s relationship. This definitely helps with keeping motivation high, and one can just feel it in the air.