Pipelines
Repository
📬 Receive new lessons straight to your inbox (once a month) and join 30K+ developers in learning how to responsibly deliver value with ML.
Intuition
So far we've implemented the components of our DataOps (cleaning, feature engineering, preprocessing, etc.) and MLOps (optimization, training, evaluation, etc.) workflows as Python function calls. This has worked well since our dataset is not large and because we're only dealing with one version of data. But happens when we need to:
- trigger or schedule these workflows as new data arrives?
- scale these workflows as our data grows?
- share these workflows so others can use their outputs?
- monitor these workflows individually?
We'll need to break down our end-to-end ML pipeline into in it's constituent DataOps and MLOps pipelines that be orchestrated and scaled as needed. There are several tools that can help us create these pipelines and orchestrate our workflows such as Airflow, Prefect, Luigi and even some ML focused options such as KubeFlow Pipelines, Vertex pipelines, etc.. We'll be creating our pipelines using AirFlow because of it's:
- wide adoption of the open source platform in industry
- Python based software development kit (SDK)
- integration with the ecosystem (data ingestion, processing, etc.)
- ability to run locally and scale easily
We'll be running Airflow locally but we can easily scale it by running on a managed cluster platform where we can run Python, Hadoop, Spark, etc. on large batch processing jobs (AWS' EMR, Google Cloud's Dataproc, on-prem hardware, etc.). This is typically set up by your DevOps teams and you should be able to add your workflows accordingly using operators.
Airflow
Before we create our specific pipelines, let's understand and implement Airflow's overarching concepts that will allow us to "author, schedule, and monitor workflows".
Install
To install and run Airflow, we can either do so locally or with Docker and set up a database backend (default is SQLite) and/or establish connections.
Warning
If you do decide to use docker-compose to run Airflow inside Docker containers, you'll want to allocate at least 4 GB in memory.
# Configurations
export AIRFLOW_HOME=${PWD}/airflow
AIRFLOW_VERSION=2.0.1
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# Install Airflow (may need to upgrade pip)
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
# Initialize DB (SQLite by default)
airflow db init
This will create an airflow
directory with the following components:
airflow/
├── logs/
└── airflow.cfg
├── airflow.db
├── unittests.cfg
└── webserver_config.py
We're going to edit the airflow.cfg file to best fit our needs:
# Inside airflow.cfg
enable_xcom_pickling = True # needed for Great Expectations airflow provider
load_examples = False # don't clutter webserver with examples
And we'll perform a reset to implement these configuration changes.
airflow db reset
Now we're ready to initialize our database with an admin user, which we'll use to login to access our workflows in the webserver.
# We'll be prompted to enter a password
airflow users create \
--username admin \
--firstname Goku \
--lastname Mohandas \
--role Admin \
--email [email protected]
Webserver
Once we've created a user, we're ready to launch the webserver and log in using our credentials.
# Launch webserver
export AIRFLOW_HOME=${PWD}/airflow
airflow webserver --port 8080 # http://localhost:8080
The webserver allows us to run and inspect workflows, establish connections to external data storage, manager users, etc. through a UI. Similarly, we could also use Airflow's REST API or Command-line interface (CLI) to perform the same operations. However, we'll be using the webserver because it's convenient to visually inspect our workflows.

We'll explore the different components of the webserver as we learn about Airflow and implement our workflows.
Scheduler
Next, we need to launch our scheduler, which will execute and monitor the tasks in our workflows. The schedule executes tasks by reading from the metadata database and ensures the task has what it needs to finish running.
# Launch scheduler (in separate terminal)
export AIRFLOW_HOME=${PWD}/airflow
airflow scheduler
Executor
As our scheduler reads from the metadata database, the executor determines what worker processes are necessary for the task to run to completion. Since our default database SQLlite, which can't support multiple connections, our default executor is the Sequential Executor. However, if we choose a more production-grade database option such as PostgresSQL or MySQL, we can choose scalable Executor backends Celery, Kubernetes, etc. For example, running Airflow with Docker uses PostgresSQL as the database and so uses the Celery Executor backend to run tasks in parallel.
DAGs
Workflows are defined by directed acyclic graphs (DAGs), whose nodes represent tasks and edges represent the data flow relationship between the tasks. Direct and acyclic implies that workflows can only execute in one direction and a previous, upstream task cannot run again once a downstream task has started.

DAGs can be defined inside Python workflow scripts inside the airflow/dags
directory and they'll automatically appear (and continuously be updated) on the webserver.
mkdir airflow/dags
touch airflow/dags/example.py
Before we start creating our DataOps and MLOps workflows, we'll learn about Airflow's concepts via an example DAG outlined in airflow/dags/example.py
Inside each workflow script, we can define some default arguments that will apply to all DAGs within that workflow.
1 2 3 4 |
|
There are many more default arguments and we'll cover them as we go through the concepts.
We can initialize DAGs with many parameters (which will override the same parameters in default_args
) and in several different ways:
-
using a with statement
1 2 3 4 5 6 7 8 9 10 11 12
from airflow import DAG with DAG( dag_id="example", description="Example DAG", default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["example"], ) as example: # Define tasks pass
-
using the dag decorator
1 2 3 4 5 6 7 8 9 10 11 12 13
from airflow.decorators import dag @dag( dag_id="example", description="Example DAG", default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["example"], ) def example(): # Define tasks pass
There are many parameters that we can initialize our DAGs with, including a
start_date
and aschedule_interval
. While we could have our workflows execute on a temporal cadence, many ML workflows are initiated by events, which we can map using sensors and hooks to external databases, file systems, etc.
Tasks
Tasks are the operations that are executed in a workflow and are represented by nodes in a DAG. Each task should be a clearly defined single operation and it should be idempotent, which means we can execute it multiple times and expect the same result and system state. This is important in the event we need to retry a failed task and don't have to worry about resetting the state of our system. Like DAGs, there are several different ways to implement tasks:
-
Using Operators
1 2 3 4 5 6 7 8 9 10 11 12 13 14
from airflow.operators.bash_operator import BashOperator @dag( dag_id="example", description="Example DAG", default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["example"], ) def example(): # Define tasks task_1 = BashOperator(task_id="task_1", bash_command="echo 1") task_2 = BashOperator(task_id="task_2", bash_command="echo 1")
-
Using the tag decorator
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
@dag( dag_id="example", description="Example DAG", default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["example"], ) def example(): @task def task_1(): return 1 @task def task_2(x): return x+1
Though the graphs are directed, we can establish certain trigger rules for each task to execute on conditional successes or failures of the parent tasks.
Operators
The first method of creating tasks involved using Operators, which defines what exactly the task will be doing. Airflow has many built-in Operators such as the BashOperator or PythonOperator, which allow us to execute bash and Python commands respectively.
1 2 3 4 5 6 7 8 9 10 |
|
There are also many other Airflow native Operators (email, S3, MySQL, Hive, etc.), as well as community maintained provider packages (Kubernetes, Snowflake, Azure, AWS, Salesforce, Tableau, etc.), to execute tasks specific to certain platforms or tools.
We can also create our own custom Operators by extending the BashOperator class.
Relationships
Once we've defined our tasks using Operators or as decorated functions, we need to define the relationships between them (edges). The way we define the relationships depends on how our tasks were defined:
-
if defined using Operators
1 2 3
# Task relationships task_1 >> task_2 # same as task_1.set_downstream(task_2) or # task_2.set_upstream(task_1)
-
if defined using decorated functions
1 2 3
# Task relationships x = task_1() y = task_2(x=x)
In both scenarios, we'll setting
task_2
as the downstream task totask_1
.
Note
We can even create intricate DAGs by using these notations to define the relationships.
1 2 3 |
|

XComs
When we use decorated functions, we can see how values can be passed between tasks. But, how can we pass values when using Operators? Airflow uses XComs (cross communications) objects, defined with a key, value, timestamp and task_id, to push and pull values between tasks. When we use decorated functions, XComs are being used under the hood but it's abstracted away, allowing us to pass values amongst Python functions seamlessly. But when using Operators, we'll need to explicitly push and pull the values as we need it.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
|
We can also view our XComs on the webserver by going to Admin >> XComs:

Warning
The data we pass between tasks should be small (metadata, metrics, etc.) because Airflow's metadata database is not equipped to hold large artifacts. However, if we do need to store and use the large results of our tasks, it's best to use an external data storage (blog storage, databases, etc.) by creating an interface via hooks.
DAG runs
Once we've defined the tasks and their relationships, we're ready to run our DAGs. We'll start defining our DAG like so:
1 2 |
|
If we refresh our webserver page (http://localhost:8080/), the new DAG will have appeared.
Manual
Our DAG is initially paused since we specified dags_are_paused_at_creation = True
inside our airflow.cfg configuration, so we'll have to manually execute this DAG by clicking on it > unpausing it (toggle) > triggering it (button). To view the logs for any of the tasks in our DAG run, we can click on the task > Log.

Note
We could also use Airflow's REST API (will configured authorization) or Command-line interface (CLI) to inspect and trigger workflows (and a whole lot more). Or we could even use the trigger_dagrun
Operator to trigger DAGs from within another workflow.
# CLI to run dags
airflow dags trigger dataops
airflow dags trigger mlops
Interval
Had we specified a start_date
and schedule_interval
when defining the DAG, it would have have automatically executed at the appropriate times. For example, the DAG below will have started two days ago and will be triggered at the start of every day.
1 2 3 4 5 6 7 8 9 10 11 |
|
Warning
Depending on the start_date
and schedule_interval
, our workflow should have been triggered several times and Airflow will try to catchup to the current time. We can avoid this by setting catchup=False
when defining the DAG. We can also set this configuration as part of the defualt arguments:
1 2 3 4 |
|
However, if we did want to run particular runs in the past, we can manually backfill what we need.
We could also specify a cron expression for our schedule_interval
parameter or even use cron presets.
Airflow's Scheduler will run our workflows one
schedule_interval
from thestart_date
. For example, if we want our workflow to start on01-01-1983
and run@daily
, then the first run will be immediately after01-01-1983T11:59
.
Sensors
While it may make sense to execute many data processing workflows on a scheduled interval, machine learning workflows may require more nuanced triggers. We shouldn't be wasting compute by running executing our DataOps and MLOps pipelines just in case we have new data. Instead, we can use sensors to trigger workflows when some external condition is met. For example, we can initiate data processing when a new batch of annotated data appears in a database or when a specific file appears in a file system, etc.
There's so much more to Airflow (monitoring, Task groups, smart senors, etc.) so be sure to explore them as you need them by using the official documentation.
DataOps
Now that we've reviewed Airflow's major concepts, we're ready to create the DataOps pipeline for our application. It involves a series of tasks, starting from extracting the data, validating it and storing it at the right place for others to use for downstream workflows and applications.

Extraction
To keep things simple, we'll continue to keep our data as a local file:
1 2 3 4 5 |
|
In a real production setting, our data can come from a wide variety of data management systems.
Typically we'll use sensors to trigger workflows when a condition is met or trigger them directly from the external source via API calls, etc. Our workflows can communicate with the different platforms by establishing a connection and then using hooks to interface with the database, data warehouse, etc.
Validation
The specific process of where and how we extract our data can be bespoke but what's important is that we have a continuous integration to execute our workflows. A key aspect to trusting this continuous integration is validation at every step of the way. We'll once again use Great Expectations, as we did in our testing lesson, to validate our incoming data before processing it.
With the Airflow concepts we've learned so far, there are many ways to use our data validation library to validate our data. Regardless of what data validation tool we use (ex. Great Expectations, TFX, AWS Deequ, etc.) we could use the BashOperator, PythonOperator, etc. to run our tests. Luckily, Great Expectations has a recommended Airflow Provider package. This package contains a GreatExpectationsOperator
which we can use to execute specific checkpoints as tasks.
Recall from our testing lesson that we used the following CLI commands to perform our data validation tests:
great_expectations checkpoint run projects
great_expectations checkpoint run tags
We can perform the same operations as Airflow tasks within our DataOps workflow, either as the bash commands above using the BashOperator or with the custom Great Expectations operator like below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
And we want both tasks to pass so we set the fail_task_on_validation_failure
parameter to True
so that downstream tasks don't execute if they fail.
Note
Reminder that we previous set the following configuration in our airflow.cfg file since the output of the GreatExpectationsOperator is not JSON serializable.
# Inside airflow.cfg
enable_xcom_pickling = True
Compute
Once we have validated our data, we're ready to compute features. We have a wide variety of Operators to choose from depending on the tools we're using for compute (ex. Python, Spark, DBT, etc.). And of course we can easily scale all of this by running on a managed cluster platform (AWS EMR, Google Cloud's Dataproc, on-prem hardware, etc.). For our task, we'll just need a PythonOperator to execute our feature engineering CLI command:
1 2 3 4 5 6 |
|
Cache
When we establish our DataOps pipeline, it's not something that's specific to any one application. Instead it's its own repository that's responsible for extracting, transforming and loading (ETL) data for downstream pipelines who are dependent on it for their own unique applications. This is one of the most important benefits of not doing an end-to-end ML application because it allows for true continued collaboration. And so we need to cache our computed features to a central feature store, database or data warehouse (depending on whether ML task involves entity features that change over time). This way, downstream developers can easily access features and use them to build applications without having to worry about doing much heavy lifting with data processing.
1 2 3 4 5 6 |
|
Learn all about what features stores are, why we need them and how to implement them in our feature stores lesson.
1 2 |
|
MLOps (model)
Once we have our features in our feature store, we can use them for MLOps tasks responsible for model creating such as optimization, training, validation, serving, etc.

Extract data
The first step is to extract the relevant historical features from our feature store/DB/DWH that our DataOps workflow cached to.
1 2 3 4 5 |
|
We'd normally need to provide a set of entities and the timestamp to extract point-in-time features for each of them but here's what a simplified version to extract the most up-to-date features for a set of entities would look like:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
|
Training
Once we have our features, we can use them to optimize and train the best models. Since these tasks can require lots of compute, we would typically run this entire pipeline in a managed cluster platform which can scale up as our data and models grow.
1 2 3 4 5 |
|
1 2 3 4 5 |
|
Evaluation
It's imperative that we evaluate our trained models so we can be confident in it's ability. We've extensively covered model evaluation in our testing lesson, so here we'll talk about what happens after evaluation. We want to execute a chain of tasks depending on whether the model improved or regressed. To do this, we're using a BranchPythonOperator for the evaluation task so it can return the appropriate task id.
1 2 3 4 5 6 7 |
|
This Operator will execute a function whose return response will be a single (or a list) task id.
1 2 3 4 5 |
|
1 2 3 4 5 6 7 8 9 |
|
The returning task ids can correspond to tasks that are simply used to direct the workflow towards a certain set of tasks based on upstream results. In our case, we want to serve the improved model or log a report in the event of a regression.
Serving
If our model passed our evaluation criteria then we can deploy and serve our model. Again, there are many different options here such as using our CI/CD Git workflows to deploy the model wrapped as a scalable microservice or for more streamlined deployments, we can use a purpose-build model server such as as MLFlow, TorchServe, RedisAI or Nvidia's Triton inference server. These servers have a registry with an API layer to seamlessly inspect, update, serve, rollback, etc. multiple versions of models. Typically a model artifact will be loaded to an inference app which directly interfaces to fulfill any requests from the user-facing application.
1 2 3 4 5 |
|
1 2 3 4 |
|
MLOps (update)
Once we've validated and served our model, how do we know when and how it needs to be updated? We'll need to compose a set of workflows that reflect the update policies we want to set in place.

Monitoring
Our inference application will receive requests from our user-facing application and we'll use our versioned model artifact to return inference responses. All of these inputs, predictions and other values will also be sent (batch/real-time) to a monitoring workflow that ensures the health of our system. We have already covered the foundations of monitoring in our monitoring lesson but here we'll look at how triggered alerts fit with the overall operational workflow.
Policies
Based on the metrics we're monitoring using various thresholds, window sizes, frequencies, etc., we'll be able to trigger events based on our update policy engine.
continue
: with the currently deployed model without any updates. However, an alert was raised so it should analyzed later to reduce false positive alerts.improve
: by retraining the model to avoid performance degradation causes by meaningful drift (data, target, concept, etc.).inspect
: to make a decision. Typically expectations are reassessed, schemas are reevaluated for changes, slices are reevaluated, etc.rollback
: to a previous version of the model because of an issue with the current deployment. Typically these can be avoided using robust deployment strategies (ex. dark canary).
Retraining
If we need to improve on the existing version of the model, it's not just the matter of fact of rerunning the model creation workflow on the new dataset. We need to carefully compose the training data in order to avoid issues such as catastrophic forgetting (forget previously learned patterns when presented with new data).
labeling
: new incoming data may need to be properly labeled before being used (we cannot just depend on proxy labels).active learning
: we may not be able to explicitly label every single new data point so we have to leverage active learning workflows to complete the labeling process.QA
: quality assurance workflows to ensure that labeling is accurate, especially for known false positives/negatives and historically poorly performing slices of data.augmentation
: increasing our training set with augmented data that's representative of the original dataset.sampling
: upsampling and downsampling to address imbalanced data slices.evaluation
: creation of an evaluation dataset that's representative of what the model will encounter once deployed.
Once we have the proper dataset for retraining, we can kickoff the featurization and model training workflows where a new model will be trained and evaluated before being deployed and receiving new inference requests. In the next lesson, we'll discuss how to combine these pipelines together to create a continual learning system.
1 2 3 |
|
References
To cite this lesson, please use:
1 2 3 4 5 6 |
|