At Zeotap we source data from countless enterprise partners to then refine, blend and convert it to high-performing data segments to be used for targeting. Last year the number of profiles under management grew 10X from 250M to 2.5Bn.
An introduction to our data pipeline was given here – Oozie! It’s pouring data!. In this article, we will now further elaborate on our profile store, its purposes and our choice on tech. Our profile store is where every data feed finds it place. It is the heart of our advertising intelligence.
Profile – all attributes of a user ranging from demographic to interest and intent.
Profile store – a denormalized store of all user profiles
Segment – a collection of users which belong to a particular group satisfying a criterion.
We already have dozens of data points for each user and this number is constantly growing. Some regions have data in more than 2 Bn rows and there are by now in excess of 7000 queries to treat the data.
Following are some of the significant data segment requirements of this database / store:
The below table depicts the overall structure of our data profile store:
There are three main types of columns in this table:
Assuming that the table name is profileStore and that the attributes are the ones listed above, typical queries to build a meaningful user segment for the advertising market looks like this.
Segment creation and export queries
SELECT zuid FROM profileStore WHERE age > 25 AND gender = ‘Male’
SELECT zuid FROM profileStore WHERE country = ‘IND’ AND gender = ‘Female’
Segment revenue attribution queries
SELECT gender_dp AS ldpid, COUNT(*) AS COUNT FROM profileStore WHERE zuid IS NOT NULL AND gender_dp IS NOT NULL AND gender = 'Female' GROUP BY gender_dp
The query above will calculate the proportion of attributes shared by each data partner from the final database for a specific user segment – called “All female users in the profileStore” in this case. Imagine this running for 500+ columns and you can picture the complexity and scale of this task.
Segment size estimation queries
We estimate the size of a specific segment by using a representative data sample from the profile store. The query on this sample would look like this:
select count(distinct zuid) from profileStoreSample where age > 25 AND gender = ‘Male’
Important to note here is that since the filters/criteria used to create a segment can vary enormously between applications, any of the hundreds of columns with the respective AND / OR / Other combinations can be relevant.
The result of any query then, is a list of ZUIDs. The fewer filters we apply, the bigger the size of the segment. A simple segment comprising only males can easily be in the billions. As a result of such a scale, we need an efficient way to export these results to an easily accessible stored format.
Now that we have described our technical and to some degree commercial use cases and some sample query patterns, let’s take a look at the core technology behind the ProfileStore: our Column Oriented Databases and MPP (Massively Parallel Processing).
Column-oriented databases store all the values of a particular column together on one disk. When a query is needed, only a few disk blocks which contain the required columns are fetched to calculate the result. This requires significantly less disk I/O as opposed to row-oriented databases. A couple of smart techniques such as compression and late materialization are additionally used. They further reduce disk I/O and improve speed.  
The main difference between row and column-oriented storage types is shown below:
Figure 1: Difference in storage mechanism for row and column-oriented database for a sample table
In Massively Parallel Processing (MPP) databases, data is partitioned across multiple servers or nodes with each server/node having memory/processors to treat the data. All communication is done via a network interconnect. Hence, disk-level sharing or contention are of no concern which is why this approach is also referred to as a ‘shared-nothing’ architecture. Leveraging parallelism, the queries can run multiple times faster than on a single host.
During the scaling journey we’ve carefully chosen Redshift as our ProfileStore. Redshift is a database provided by AWS for data warehousing and OLAP workloads and stores its data as laid out above in a column oriented fashion on disks. It is also a MPP database with a shared-nothing architecture.
At zeotap, using Redshift goes along with the following main benefits:
Redshift is a fully managed database by AWS. Operability is easier than maintaining an owned equivalent database such as Greenplum.
Bulk Load and Unload
Data can be copied from S3 to Redshift in multiple formats such as CSV, AVRO, GZIP using a COPY command. Since most of our data is stored in S3, this is very convenient. Also, loading plain CSV data into Redshift is much faster than AVRO. 
Redshift also has a feature to automatically store the query results to an S3 location using the UNLOAD command. This suits us very well since the results from our queries are usually very large and need to be consumed across multiple channels.
You can directly query AWS redshift using Postgres extensions such as dblink and postgres_fdw. This is useful in one of our products which is constantly sampling data from Redshift to estimate segment sizes. For more details about extension, take a look at our dedicated blog post on note: .
The largest cluster we currently have comes from the US. This cluster is running with 10 nodes and also has the largest number of user records. We use dc2.large types of nodes since they come with SSD and are faster compared to a Dense Storage type of nodes.
While Redshift is generally a very good fit, one of its current advantages is also one of its biggest limitations. And that is that it does not support data structures such as JSON or Map. As a result, table modelling is limited by a flat structure of columns using trivial types. To support a one-to-many relationship, we have to model two separate tables with a loosely imposed Foreign key and use JOIN to combine multiple layers of results. When one of the tables is much smaller than another, Redshift uses HASH JOIN and otherwise MERGE JOIN to compute the results. One problem we have observed is that MERGE JOIN is slow when both tables are sufficiently large to put the memory to a challenge.
Another limitation is that with growing amounts of data, the time needed to COPY – as the only way to upload data into Redshift – processing times increase as well. Therefore, it is not too suitable for streaming or near-real time use cases.
Now luckily, at zeotap we don’t like to live with limitations, which is why we’ll dive in detail into how we overcame these challenges using other databases such as Presto (SQL query engine for files stored on HDFS or S3) in our next blog article. Stay tuned and reach out if you also never settle with the average. It’s how innovation is born and mind and skillset evolve.
About the author:
Chaitanya Bendre, Senior Software Engineer at zeotap, is one of our colleagues who handles tremendous amounts of data on a daily basis. He works on our core data engineering platform, zeoCore, which ingests and refines data from multiple partners to distribute it across a wide range of buying and other application platforms. After completing his degree in Engineering at BITS Pilani, Chaitanya had the opportunity to work with companies like Goldman Sachs and Juspay on cloud computing and data platform related projects. His areas of interests include data engineering, functional programming and distributed systems.