Sitemap
Photo by fabio on Unsplash

Wrangling a 100TB data set for Machine Learning

--

Disclaimer: All material in this post has been used with permission. Certain details modified for client confidentiality.

Also — Ryan (our Founder) is giving a talk on this topic on June 11th, 2024. If you are in the Boston area, swing by!

The First Major Project

Our first major task at Fan Pier Labs was to process a massive 100-terabyte dataset (100 million images) and prepare it for training machine learning models. Handling such a large volume of data is challenging, but we successfully built a cloud infrastructure capable of processing all that data in just 12 hours per job. Not only could our infrastructure handle this enormous amount of data, but it was also designed to (hopefully) scale indefinitely!

Shameless plug: if you need a hand with managing a massive data set, contact us!

About the Client

We built this project for a YCombinator — backed startup that wanted to train their own model to generate images from user-entered prompts (much like MidJourney or DALL-E). In order to develop this product, they decided to train their own models instead of using a pre-trained base model.

The Starting Point

Our client had a large data set of over 100 million images and was storing them in an S3 bucket. They didn’t have anything set up outside of the S3 bucket.

Optimization Opportunity: Technically we used CloudFlare’s R2 and not AWS S3 for this project. R2 has exactly the same API as AWS S3 (so it was easy swap over) and is roughly 25% cheaper. (R2 pricing, S3 pricing).

The Challenge

In order to prepare the images for training, we had to process each image multiple times to extract a number of additional pieces of information. We used some of this information to filter out images that we didn’t want to train on, and other pieces we used in the training step itself.

Deciding on a Database

First, we had to pick a database to store all of the information about the images. There were four databases that stuck out as good candidates. These were Redshift, Postgres, Spanner and BigTable.

Redshift is well equipped to handle this project because it excels at storing large quantities of data, has a strong ecosystem, and is, of course, well supported by AWS. Just like Spanner and BigTable, Redshift can scale to petabytes of data into a single table. This is possible because Redshift can shard a single large table horizontally across a fleet of servers, instead of relying on vertical scaling like older databases (eg. MySQL) do. (link) Redshift is also a columnar database, which means that it can skip non-relevant data when running a query that processes just one or two columns. (link) However, a columnar database tends to be more difficult to insert data into. Amazon, for instances, recommends against using the INSERT command for large quantities of data, and instead, recommends uploading the data to AWS S3 and then using a special COPY SQL command to copy it into Redshift. (link)

Postgres also stands out because is a very powerful database that has strong ecosystem. Additionally, AWS has great support for running and managing Postgres instances inside of AWS Relational Database Service (RDS). AWS RDS makes it easy to set up a Postgres instance, configure daily backups, and scale up and down the DB when necessary (link). Postgres, however, is a row-oriented database, which means it will always read an entire row of data from disk when you query any column in that row.

Spanner and Bigtable would also be able to handle this workload. However, most of the rest of our client’s infrastructure was already located on Amazon Web Services, so we decided to stick with a database that was also on AWS.

We decided to use Postgres for this project due to its popularity, robust ecosystem, ease of setup with AWS RDS, and excellent support within AWS. However, if we were to undertake this project again, using Redshift might be worth considering. Redshift could potentially be less expensive due to its dynamic pricing model when using Redshift Serverless (link). Additionally, it might execute some queries faster than Postgres because of its columnar storage format. However, the columnar storage could make inserting new rows more challenging. Despite these considerations, Postgres worked great for this project, and we were able to successfully build everything we needed.

Setting up the Database

The first step was to add the S3 Object URLs to the database. We assigned a unique UUID to each image and renamed the images in the storage bucket to match the assigned UUID (plus the image extension).

Structuring the Database

The next technical step is to decide how we should structure the data in the database. We first created the table with only two columns — one to hold the UUID of each image and another to hold the S3 Object URL of each image.

Later, we will add more columns to keep track of the information derived from each image. Stay tuned!

                uuid              |                       s3_path
----------------------------------+-----------------------------------------------------
4283d3832db64599b619a464197b7eb6 | s3://my-bucket/4283d3832db64599b619a464197b7eb6.jpg
a04d043d837844838751d10ac8f9183a | s3://my-bucket/a04d043d837844838751d10ac8f9183a.jpg
60cf95a813f14415a8f0e117523ccac5 | s3://my-bucket/60cf95a813f14415a8f0e117523ccac5.jpg
261bde2535e148b192d7de9c7759cfbb | s3://my-bucket/261bde2535e148b192d7de9c7759cfbb.jpg
6cfd03af21d64db1a095699a5a3caba8 | s3://my-bucket/6cfd03af21d64db1a095699a5a3caba8.jpg
d332c42f405c403bb4d288c0cc538145 | s3://my-bucket/d332c42f405c403bb4d288c0cc538145.jpg
ce880aa84caf47c6976502c3bf54dce2 | s3://my-bucket/ce880aa84caf47c6976502c3bf54dce2.jpg
5569b7179eeb458d9a6ae68ac648caa8 | s3://my-bucket/5569b7179eeb458d9a6ae68ac648caa8.jpg
152ffb1801064af1a7b284661201993e | s3://my-bucket/152ffb1801064af1a7b284661201993e.jpg
1d012178b4d7481089fc48c7066544e2 | s3://my-bucket/1d012178b4d7481089fc48c7066544e2.jpg
(10 rows)

Optimization Opportunity: If you’ve worked with architecting databases before, you might have noticed that it is possible to derive the s3_path from the uuid. Because of this, we don’t need to store these two columns separately, we can store just store the uuid column and then calculate the s3_path when necessary.

let s3_path = "s3://my-bucket/" + uuid + ".jpg"

Therefore, we were able to just store a single column in the DB (the UUIDs of the images). After we remove the s3_path column, our database is just one column.

              uuid
---------------------------------
4283d3832db64599b619a464197b7eb6
a04d043d837844838751d10ac8f9183a
60cf95a813f14415a8f0e117523ccac5
261bde2535e148b192d7de9c7759cfbb
6cfd03af21d64db1a095699a5a3caba8
d332c42f405c403bb4d288c0cc538145
ce880aa84caf47c6976502c3bf54dce2
5569b7179eeb458d9a6ae68ac648caa8
152ffb1801064af1a7b284661201993e
1d012178b4d7481089fc48c7066544e2
(10 rows)

Which additional fields do we need?

Next, we needed to calculate a number of additional columns to hold all sorts of different information about the images. For instance, we needed to filter out low quality images, generate annotations for the images, extract features from the images, and more. We also realized we had some images had snuck into our data set that had extensions other than jpg , so we had to keep track of what extension each image had in a seperate column as well.

Specifically, we wanted to populate columns for:

  • AI-description of the image (GPU required)
  • AI-Aesthetic ranking of the image (GPU required)
  • Determine how many people are in the image
  • Hash of image for de-duplication
  • Size of images in bytes
  • Width of image
  • Height of image
  • OCR text from the image
  • File Extension of each image
  • Extract EXIF data
    — Camera make
    — Camera model
    — Was flash used
    — Apeture
    — Shutter speed
    — Focal Length
    — ISO
    — Location of image (sometimes embedded in EXIF)

Adding additional columns to the Database

In order to support these additional columns, we needed to update the database:

ALTER TABLE my_table ADD COLLUMN extension varchar(4);
ALTER TABLE my_table ADD COLLUMN aesthetic_score int;
etc...

Which resulted in a table like this:


+----------------------------------+-------------+-----------------+--------------+--------+-------+------+----------+--------------+-------------+---------+-----+-----+
| uuid | description | aesthetic_score | people_count | height | width | size | ocr_text | camera_model | camera_make | apeture | iso | etc |
+----------------------------------+-------------+-----------------+--------------+--------+-------+------+----------+--------------+-------------+---------+-----+-----+
| 4283d3832db64599b619a464197b7eb6 | | | | | | | | | | | | |
| a04d043d837844838751d10ac8f9183a | | | | | | | | | | | | |
| 60cf95a813f14415a8f0e117523ccac5 | | | | | | | | | | | | |
| 261bde2535e148b192d7de9c7759cfbb | | | | | | | | | | | | |
| 6cfd03af21d64db1a095699a5a3caba8 | | | | | | | | | | | | |
| d332c42f405c403bb4d288c0cc538145 | | | | | | | | | | | | |
| ce880aa84caf47c6976502c3bf54dce2 | | | | | | | | | | | | |
| 5569b7179eeb458d9a6ae68ac648caa8 | | | | | | | | | | | | |
| 152ffb1801064af1a7b284661201993e | | | | | | | | | | | | |
| 1d012178b4d7481089fc48c7066544e2 | | | | | | | | | | | | |
+----------------------------------+-------------+-----------------+--------------+--------+-------+------+----------+--------------+-------------+---------+-----+-----+

Populating the additional columns (the wrong way!)

Now, we needed to processes these additional columns. The quick and dirty way is to set up a short python script to download a few of the rows that need processing, process those images, and then update the columns. It would look roughly like this:

import boto3
import pg

# Set up database connection
conn = pg.connect({...})
cursor = conn.cursor()
# Set up S3 client
s3 = boto3.client('s3')
s3_bucket_name = 'my-bucket-name'

# DON'T RUN THIS CODE ON MULTIPLE COMPUTERS, IT DOESN'T SCALE
def main():
sql = "SELECT uuid FROM my_table WHERE description is null limit 1000"
rows = pg.run(sql)

for row in rows:
uuid = row['uuid']
s3_path = "s3://my-bucket/" + uuid + ".jpg"
print(f"Processing row ID {row_id} with S3 key {s3_path}")

# Get data from S3
response = s3.get_object(Bucket=s3_bucket_name, Key=s3_key)
data = response['Body'].read().decode('utf-8')

# Process data
processed_data = process_data(data)

# Update database
sql = "UPDATE my_table SET description = %s WHERE uuid = %s"
cursor.execute(sql.SQL(sql), (processed_data, uuid))

However, there’s a major problem with this approach — it doesn’t run fast enough and can not scale. If we run this on an individual computer, it would take weeks to process the entire dataset. Our goal was to be able to process the entire dataset in 12 hours, which required multiple different app servers all working in parallel.

However, a number of issues would arise if we ran this script on multiple different app servers at the same time. Specifically, we would have no coordination between servers and multiple different app servers would end up processing the same images.

This happens because additional app servers may begin processing rows that the first app server is already processing, because there is no way to tell the additional app servers that there is already a server processing those rows (and they should hold off).

To break this issue down a bit more:

  1. Server A starts running the python script
  2. Server A runs the SQL query to return 1000 rows that need processing
  3. Server A begins processing the images.
  4. Server B starts running the python script
  5. Server B runs the SQL query to return 1000 rows that need processing (!!! Many of the same rows being processed by server A are returned !!!)
  6. Server A and server B are now processing the same rows
  7. Server A finishes processing the rows
  8. Server B finishes processing the rows

We needed a way to coordinate between servers so we could avoid wasting computational resources.

Enter Queues

Queues allow us to coordinate between servers to ensure two different servers don’t attempt to process the same rows in the DB.

A queue is a type of database that keeps track of a set of jobs that need to be processed. A number of different app servers can make an API call to fetch a job and the queue will keep track of which jobs haven’t been started, which jobs are currently processing, and which jobs have been completed.

Deciding which queue to use

There’s a number of solid queue databases that are more than capable of solving this problem. We had a very typical use case, and didn’t need any of the advanced features offered by the more complex queues.

Notably, we considered using Celery, AWS SQS, and RabbitMQ. Celery and RabbitMQ offer a number of advanced queue management features. AWS SQS is much simpler has great support from AWS. All of them could provide all the functionally we needed. We ended up going with AWS SQS just because it was within the AWS ecosystem and integrated well with other AWS products.

The only concerning limitation of SQS is that it can only retain each job in the queue for 14 days. In order to avoid the queue deleting data, we needed to ensure the data was processed within 14 days.

Loading the items into the queue

After setting up the queue, we had to figure out how to populate the queue with the data in the database. Unfortunately, there is too much data in Postgres to download and upload to SQS with a single query. This isn’t like Google Drive — we can’t just download and upload a file to a different folder.

The solution here is to break the data in the DB down into manageable buckets and re-upload each bucket to the queue individually. If each bucket of data is small enough, we’ll be able to download each bucket and re-upload it to the queue.

Breaking down the data into discrete buckets

In order to assign each image to a discrete bucket we will return to the UUID that we assigned to each image earlier.

e0028e91a0354ad390af60d4a6a9a51c

Specifically, we are going to assign each image to a bucket based on the first few characters of each UUID. The larger the prefix, the smaller the batches, and visa versa.

For instance, if we used just the first hexidecimal character of the UUID, we would have 16 different buckets (0,1,2,4,5,6,7,8,9,0,a,b,c,d,e,f). If we evently divided our 100 million images into 16 buckets, we would end up with roughly 6.25 million images per bucket.

6.25 million rows is still a bit much to download and upload, so we decided to use the first 4 characters instead of just 1 character. This gave us 65k buckets:

16 (possibilities per hexadecimal character) ^ 4 (characters per batch) = 65,536 batches

The batches would be

0000
0001
0002
0003
...
fffc
fffd
fffe
ffff

Where an UUID that starts with 0002 (eg. 00028e91a0354ad390af60d4a6a9a51c) would go into the 0002 batch, 00038e91a0354ad390af60d4a6a9a51c would go into the 0003 batch, etc. Therefore, if we had 100 million images to divide into ~65k different batches, we would have:

100 million images / 65,536 batches = ~1,526 images per batch

Of course, the UUIDs may not be perfectly evenly distributed, but generally, if if they are correctly randomly generated, each batch should have a fairly consistent amount of rows per bucket.

In addition, if we wanted larger or smaller batches, we could have increased or decreased the number of hexadecimal letters we used to batch. We found that processing ~1,500 images at a time worked well for most of our workloads.

The first way we built this infrastructure is to copy each row from the DB into the queue, but then quickly realized there’s a better way to build this part of the infrastructure.

Optimization Opportunity: We don’t actually need to re-upload the entire database into the queue! Instead, we can just upload the buckets themselves into the queue. Later on, during the main python processing code, we can query the database to determine which images are in which bucket.

This alternate approach significantly reduces the complexity and significantly reduces the number of items we would have to put into SQS (instead of loading 100 million rows into the queue we now only load ~65k).

A script like this can be used to load each bucket into SQS:

import boto3
# Replace 'your_queue_name' with the name of your SQS queue
queue_name = 'your_queue_name'
# Initialize SQS client
sqs = boto3.client('sqs')
# Create a SQS queue
response = sqs.create_queue(
QueueName=queue_name
)
# Get the queue URL
queue_url = response['QueueUrl']
# Function to generate hex strings from 0000 to ffff
def generate_hex_strings():
for i in range(65536):
hex_string = format(i, '04x')
yield hex_string
# Populate the SQS queue with hex strings
for hex_string in generate_hex_strings():
sqs.send_message(
QueueUrl=queue_url,
MessageBody=hex_string
)
print("Hex strings populated to the SQS queue successfully.")

Then, when the servers are running the processing jobs themself, they first query the postgres database to find which rows are in a specific bucket. There’s a number of optimizations that we did in Postgres to ensure this is lookup is blazingly fast.

This approach also allows us to add rows after jobs have already been completed as well. For instance, if we ran the processing job once to process all the rows in the DB, and then later add a number of more images to the data set, we could run the same job again and only the images that have not been processed would be processed.

Optimizing the lookup queries

The quick-and-dirty query for determining which rows are in a bucket is:

select uuid from my_table where uuid like "bucket%";

Note that, unlike the query from earlier, there is specifically no limit clause here. A limit clause would mean some rows in this bucket may be skipped unecessarily.

This approach will be somewhat fast, especially when a B-Tree index is set up on the uuid column in Postgres. However, we wanted to squeeze out every last drop of performance, so we explored other options.

The fastest type of index in Postgres is the hash index. However, the hash index only supports querying by exact matches (eg bucket = 'ffff' ) and doesn’t support like operations.

In order to make use of the hash index, we will have to add additional columns to the database for each bucket that we may want to use for processing. For this project, we configured 3 additional columns — for buckets of a length of 4, 5 and 6. Our updated table looks like this:

               uuid               | bucket_4 | bucket_5 | bucket_6
----------------------------------+----------+----------+----------
4283d3832db64599b619a464197b7eb6 | 4283 | 4283d | 4283d3
a04d043d837844838751d10ac8f9183a | a04d | a04d0 | a04d04
60cf95a813f14415a8f0e117523ccac5 | 60cf | 60cf9 | 60cf95
261bde2535e148b192d7de9c7759cfbb | 261b | 261bd | 261bde
6cfd03af21d64db1a095699a5a3caba8 | 6cfd | 6cfd0 | 6cfd03
d332c42f405c403bb4d288c0cc538145 | d332 | d332c | d332c4
ce880aa84caf47c6976502c3bf54dce2 | ce88 | ce880 | ce880a
5569b7179eeb458d9a6ae68ac648caa8 | 5569 | 5569b | 5569b7
152ffb1801064af1a7b284661201993e | 152f | 152ff | 152ffb
1d012178b4d7481089fc48c7066544e2 | 1d01 | 1d012 | 1d0121
(10 rows)

Then, we set up a hash index on each of the three bucket columns

CREATE INDEX my_table_bucket_4_hash ON my_table USING HASH (bucket_4);
CREATE INDEX my_table_bucket_5_hash ON my_table USING HASH (bucket_5);
CREATE INDEX my_table_bucket_6_hash ON my_table USING HASH (bucket_6);

Now, we can use those new columns and indicies to speed up our lookup queries:

SELECT uuid FROM my_table where bucket_4 = 'ffff' and job_column is null;

It is possible to confirm that the query is using the hash index by running:

explain SELECT * from my_table where bucket_4 = '0001';

Which should output

postgres=> explain SELECT * from my_table where bucket_4 = '0001';
QUERY PLAN
------------------------------------------------------------------------------------------
Index Scan using my_table_bucket_4_hash on my_table (cost=0.00..8.02 rows=1 width=128)
Index Cond: ((bucket_4)::text = '0001'::text)
(2 rows)

For testing purposes only, you can run this command as well to ensure the index is in use. (Sometimes, especially for small tables, Postgres will decide against using the index even when there is one set up).

SET enable_seqscan = OFF;

The Main Processing Code

Now that the queue and the database are set up, we can start working on the main image processing code. Most of the data processing jobs we built only required a CPU to run, but a few required a NVIDIA GPU, which took extra effort to set up. I’ll talk about how we set up GPUs in a future blog post.

Core Processing Step

Almost every processing job will follow the same list of steps:

  1. Download a bucket from the queue
  2. Find the rows in that bucket by querying the DB
  3. Fetch the images from the DB
  4. Process each image
  5. Upload the results to the DB
  6. Mark the job as completed in the Queue.

Here’s the code for this pattern:

queue = sqs.get_queue_by_name(QueueName=SOURCE_QUEUE)

def process_one_bucket():
# Fetch the bucket from SQS
message = json.loads(queue.receive_messages(MaxNumberOfMessages=1)[0].body)

bucket = message['prefix']

# Query postgres to find all the images with this prefix.
query = 'select uuid from my_table where bucket_4 = ' + bucket + ' and job_column is null';

rows = runPostgresQuery(query)

# Loop through the rows and process each image
# This can be done in parallel for most types of jobs.
for row in rows:

# Fetch image from AWS S3 or Cloudfront R2
buffer = BytesIO()
buffer = s3.download_fileobj("images-for-training", uuid, buffer)
buffer.seek(0)

new_column_info = process_image(buffer)

# Upload results to a server
cur = postgres.cursor()
update_query = cur.execute('UPDATE my_table SET ' + column_name + ' = %s WHERE uuid = %s', (new_column_info, row.uuid))
postgres.commit()

# Delete the SQS queue message
# When all items in the queue have been successfully completed.
message.delete()

Deploying on a fleet of servers

The next step is to deploy our server to a fleet of servers. We used docker and kubernetes to deploy a fleet of servers. This allowed us to spin up ~20 app servers to process a job and then turn them down 12 hours later when the jobs were all processed.

apiVersion: apps/v1
kind: Deployment
metadata:
name: processing-code
spec:
strategy:
type: Recreate
replicas: 2
selector:
matchLabels:
app.kubernetes.io/name: processing-code
template:
metadata:
labels:
app.kubernetes.io/name: processing-code
spec:
imagePullSecrets:
- name: ghcr-deploy
containers:
- name: processing-code-2
image: ghcr.io/my_organization/my_repo/worker
env:
- name: MY_SECRETS
valueFrom:
secretKeyRef:
name: my-secrets
key: secrets
resources:
limits:
cpu: 16
memory: 86Gi
# Disable if a GPU is not necessary
nvidia.com/gpu: 1
# Disable this if a GPUs not necessary
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: gpu.nvidia.com/class
operator: In
values:
- A40 # (or Quadro_RTX_4000, etc)

Storing the secrets with Kubernetes

When deploying code with kubernetes, we had to store secrets that the code will need to access AWS, R2, etc. These are usually not stored in git for security reasons and also usually not uploaded alongside all the code to the cloud.

Fortunately, there’s usually easy ways to upload secrets. For some other deployment options, such as AWS Beanstalk, there is an easy UI where we can upload these secrets. Kubernetes requires that we run a few kubectl commands on the CLI to upload the secrets.

It is possible to upload each secrete separately with kubectl, but instead of doing that, we decided to just add all of the secrets to a single JSON object, base64 encode it, and then upload the base64 encoded string as one secret.

We can use this command to encode the local secrets file (keys.json) as base64 and upload it to the kubectl server.

kubectl create secret generic my-secrets --from-literal=secrets=(cat keys.json | base64)

If you want to check to see if it uploaded correctly, you can use this command to download it

kubectl get secret my-secrets -o jsonpath="{.data.secrets}" | base64 --decode | base64 --decode

You can also use this command to delete the secret

kubectl delete secret my-secrets

https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/#define-container-environment-variables-using-secret-data

Completed jobs

Now that we have build a system to deploy app servers, process jobs, and upload the results to postgres, we have the following table:

                uuid              | extension | bucket_5 | bucket_6
----------------------------------+-----------+----------+----------
4283d3832db64599b619a464197b7eb6 | jpg | 4283d | 4283d3
a04d043d837844838751d10ac8f9183a | jpg | a04d0 | a04d04
60cf95a813f14415a8f0e117523ccac5 | jpg | 60cf9 | 60cf95
261bde2535e148b192d7de9c7759cfbb | jpg | 261bd | 261bde
6cfd03af21d64db1a095699a5a3caba8 | jpg | 6cfd0 | 6cfd03
d332c42f405c403bb4d288c0cc538145 | jpg | d332c | d332c4
ce880aa84caf47c6976502c3bf54dce2 | jpg | ce880 | ce880a
5569b7179eeb458d9a6ae68ac648caa8 | jpg | 5569b | 5569b7
152ffb1801064af1a7b284661201993e | png | 152ff | 152ffb
1d012178b4d7481089fc48c7066544e2 | png | 1d012 | 1d0121
(10 rows)

And thats it! The data is now ready for processing.

After all the data was ready in Postgres, we ended up setting up additional queues with the same bucket-based design for the machine learning team as well. Then, after pulling a bucket from the queue, their codebase would query the database for rows that matched both the bucket and additional dynamic criteria of images they wanted to train on.

Conclusion

Overall, the project was a great success. We hit all of the goals we set and were able to process 100 terabytes of data (100 million images) in 12 hours. Also, it was interesting to build out all this infrastructure and we very rewarding once we had it all working.

Thanks for reading!! 🎉

Stay tuned for part 2, where I’ll talk about how to monitor this entire infrastructure deployment with Datadog and Grafana. I’ll explain how to get continuously updating graphs with aggregate statistics on the job outputs, app server health, and even the estimated time of completion of active jobs.

In Part 3, I’ll discuss how we integrated NVIDIA GPUs for some specific jobs and how we squeezed out every last drop of performance from these expensive chips.

Hire us!! 🚀

We’ve got decades of experience working with venture backed startups and specialize in Web Development, AWS Infrastructure and Machine Learning. If you need a hand with managing a massive data set, contact us!

--

--

Fan Pier Labs
Fan Pier Labs

Written by Fan Pier Labs

Helping startups with Web Development, AWS Infrastructure, and Machine Learning since 2019.

Responses (1)