An ETL (extract, transform, load) pipeline is a fundamental type of workflow in data engineering. The goal is to take data that might be unstructured or difficult to use or access and serve a source of clean, structured data.
It’s also very straightforward and easy to build a simple pipeline as a Python script. The full source code for this exercise is here.
An ETL pipeline consists of three general components:
Extract — get data from a source such as an API. In this exercise, we’ll only be pulling data once to show how it’s done. In many cases we’ll need to poll the API at regular intervals to get new data (called batching), which we do by creating scheduled ETL workflows. We might also be working with streaming (continuous) data sources that we manage through messaging systems.
Transform — structure, format, or clean the data, depending on what we need it for and how it needs to be delivered. Data transformation can happen at many stages in the lifecycle of the data, and different users may need to transform it for different reasons. In this exercise, we’ll be formatting data from JSON responses to pandas dataframes and writing them to CSV.
Load— write the data to an external destination where it can be used by another application. In this exercise, we’ll be writing each table we create to CSV. In a real-world data pipeline we would write to databases or other data stores, store our files in different formats, or send data to different locations around the world.
We’re going to request records from The Movie Database API. Before we get started, you will need to obtain an API key to make requests. You can find the instructions for obtaining a key here.
Once you have your key, you will need to make sure you don’t put it directly into your source code. One quick way to do this is to create a file called config.py
in the same directory you will be creating your ETL script in. Put this into the file:
#config.py
api_key = <YOUR API KEY HERE>
If you’re publishing your code anywhere, you should put your config.py
into a .gitignore or similar file to make sure it doesn’t get pushed to any remote repositories. You can also store the API key as an environment variable, or use another method to hide it. The goal is to protect it from being exposed in the ETL script.
Now create a file called tmdb.py
and import the necessary items.
import pandas as pd
import requests
import config
Here’s how we send a single GET request to the API. In the response, we receive a JSON record with the movie_id
we specify:
API_KEY = config.api_key
url = 'https://api.themoviedb.org/3/movie/{}?api_key={}'.format(movie_id, API_KEY)
r = requests.get(url)
For this exercise, we’re going to request 6 movies with movie_id
ranging from 550 to 555. We create a loop that requests each movie one at a time and appends the response to a list.
response_list = []
API_KEY = config.api_key
for movie_id in range(550,556):
url = 'https://api.themoviedb.org/3/movie/{}?api_key={}'.format(movie_id, API_KEY)
r = requests.get(url)
response_list.append(r.json())
We now have a list of long, unwieldy JSON records delivered to us from the API. Create a pandas dataframe from the records using from_dict()
:
df = pd.DataFrame.from_dict(response_list)
If all goes well, you should have a cleanly formatted dataframe with 6 rows and 38 columns.
We aren’t going to be working with all of these columns in this dataset, so let’s select the ones we are going to use. Here are the ones we’re interested in for this exercise:
budget
id
imdb_id
genres
original_title
release_date
revenue
runtime
We create a list of column names called df_columns
that allows us to select the columns we want from the main dataframe.
df_columns = ['budget', 'genres', 'id', 'imdb_id', 'original_title', 'release_date', 'revenue', 'runtime']
Notice what the genres
column looks like:
It’s a column of lists of JSON records, which is hard to read or quickly understand in this format. We want to expand this column out so we can easily see and make use of the internal records.
One way to do this is by exploding out the column of lists into one-hot categorical columns. This is done by creating a single column for each categorical value and setting the row value to 1 if the movie belongs to that category and 0 if it doesn’t. We’ll see the results of this shortly.
We’ll only be using the name
property, not the id
. We’ll be able to access the id
value in a separate table.
genres_list = df['genres'].tolist()
flat_list = [item for sublist in genres_list for item in sublist]
We want to create a separate table for genres and a column of lists to explode out. We’ll create a temporary column called genres_all
as a list of lists of genres that we can later expand out into a separate column for each genre.
result = []
for l in genres_list:
r = []
for d in l:
r.append(d['name'])
result.append(r)
df = df.assign(genres_all=result)
Here’s where we create the genres table:
df_genres = pd.DataFrame.from_records(flat_list).drop_duplicates()
This gives us a table of the genre properties name
and id
. We attach the list of genre names onto our df_columns
list as shown below and remove the original genres
column from the list.
The last two lines are where we use the explode()
and crosstab()
functions to create the genre columns and join them onto the main table. This takes a column of lists and turns it into a set of columns of frequency values.
df_columns = ['budget', 'id', 'imdb_id', 'original_title', 'release_date', 'revenue', 'runtime']
df_genre_columns = df_genres['name'].to_list()
df_columns.extend(df_genre_columns)
s = df['genres_all'].explode()
df = df.join(pd.crosstab(s.index, s))
We now have the following columns in our movies table:
Notice the genre columns to the right. If a movie belongs to a genre, the row value is 1, and if not, the value is 0. Now it’s easy for us to filter on specific genres and to quickly tell if a movie belongs to a genre or not.
Finally we’ll expand out the datetime column into a table. Pandas has built-in functions to extract specific parts of a datetime. Notice we need to convert the release_date
column into a datetime first.
df['release_date'] = pd.to_datetime(df['release_date'])
df['day'] = df['release_date'].dt.day
df['month'] = df['release_date'].dt.month
df['year'] = df['release_date'].dt.year
df['day_of_week'] = df['release_date'].dt.day_name()
df_time_columns = ['id', 'release_date', 'day', 'month', 'year', 'day_of_week']
Here’s the dataframe defined by that code:
We ended up creating 3 tables for the tmdb
schema that we’ll call movies
, genres
, and datetimes
. We export our tables by writing them to file. This will create 3 .csv files in the same directory that our script is in.
df[df_columns].to_csv('tmdb_movies.csv', index=False)
df_genres.to_csv('tmdb_genres.csv', index=False)
df[df_time_columns].to_csv('tmdb_datetimes.csv', index=False)
That’s it! We’ve created our first ETL pipeline. We built a structured schema from a list of JSON records and transformed our data into a clean, usable format. You can experiment with many different ways to explore and structure this data, as we have only scratched the surface here and used a small part of what we had available.