Partition Index - Selective Queries On Really Big Tables
In this article I’m going to explain how did we solve the problem of selective MPP (Impala, Presto, Drill, etc.) queries over >10TB tables without performing a full table scan.
This article will describe the idea of what we call “partition index” in a very simple way. The detailed architecture and implementation are subjects for a whole new open-source project.
Partitions are a great optimization if we know which columns we’re going to filter by and what kind of questions are going to be asked on that table.
But sometimes we don’t know what are going to be the most common BI questions on a new kind of data. And sometimes we know them but the huge variety of values for a column makes it impossible to be partitioned by.
So let’s say I have 1,000,000 event-driven sensors (they send data every time a certain event occurs around them). Every hour I get records streamed to my Hive table from only 1,000 of them. After one year I’ll have a really big table.
My table is partitioned by hourly DT (date time). That helps when I want to ask questions that filter on time. But what if I want to perform a certain aggregation on the values I got from a specific sensor in the past year?
I couldn’t partition my table by the sensor ID, because I have 1,000,000 of those and that’s too much.
If I’ll want to perform the aggregation above with Impala/Presto/Drill — a full table scan will occur, and that’ll be too expensive and will probably fail as the whole table (>10TB) doesn’t fit in the memory.
So you get the problem now, analysts need to perform selective queries over really big tables and their queries causes full (or almost full) table scans.
The Solution: Partition Index
We found a simple solution to that problem. We created something we call ‘partition index’.
What is a Partition Index?
To explain the idea I’ll use the sensors example again. We created a dataset that is basically a dictionary of which the key is the sensor ID and the value is a list of all the DTs this sensor ID appear in. That dataset is called partition index.
It looks like this:
Of course there is a much more optimized model for such index but for the simplicity of that article we’ll stick to the dictionary model.
Generating & Maintaining the Index
The process of creating this partition index for the first time is pretty heavy, as it has to process each and every record in the table. It can be done with a relatively simple Spark job.
After that’s done the only thing we need to remember is to keep updating this partition index as new partitions are added to the table.
What Kind Of Tables Need a Partition Index?
Partition Index is for tables with a large number of partitions and diverse values among them.
Its important to note that this is not a solution for all use-cases. For example if your data is not partitioned by DT, or if it is but each partition contains all the IDs (in our example the sensor IDs) — that solution is not going to work.
Using The Partition Index
We created a simple application that keeps the partition index in-memory and can be queried through a REST API. The partition index is loaded to the memory as the service starts for optimal performance.
So now imagine I have a 10TB table, partitioned by DT and I want to perform an aggregation on the values of a specific sensor from the past year. I first query the partition index with the sensor ID and get all the relevant DTs. Now I perform the Impala query while filtering on those DTs and instead of a full table scan, I scan only the relevant partitions (a fraction of the data) and get the answer 100x faster.
We found the partition index really useful, but we wanted our analysts to simply perform a query, without even knowing that the partition index exists.
So our implementation was an application layer in the load balancer between the client and the Impala daemons, that analyzes the query and generates a query that uses the partition index.
Basically the user performs a query to the load balancer:
SELECT avg(s.temperature) FROM sensors s WHERE s.sensor_id = ‘f4c43b5f-b631–48b4-bf1b-22d174a6b6e4’
And in the load balancer we added a code that takes the query and checks if:
- Its on a table that has a partition index.
- It filters by the relevant column (i.e. sensor_id).
Then it uses the partition index to generate and submit an optimized query with the relevant partitions in the where clause:
SELECT avg(s.heat) FROM sensors s WHERE s.sensor_id = ‘f4c43b5f-b631–48b4-bf1b-22d174a6b6e4’ AND dt IN (‘2018031201’, ‘2018021614’, ‘2017101005’, …)
That way, analysts are experiencing 100x faster performance on selective queries over big tables without any change in their workflow.
Partition Index is a component used in an application layer between the client and the MPP engine that makes selective queries run much faster by reading only the relevant partitions.
This idea can be implemented in a various ways but its overall a pretty easy solution. It requires no change of the data and that’s what I like about it.