Pipelines

Create, schedule and monitor workflows by creating scalable pipelines.
Goku Mohandas
· ·
Repository

📬  Receive new lessons straight to your inbox (once a month) and join 30K+ developers in learning how to responsibly deliver value with ML.

Intuition

So far we've implemented the components of our DataOps (cleaning, feature engineering, preprocessing, etc.) and MLOps (optimization, training, evaluation, etc.) workflows as end-to-end Python function calls. This has worked well since our dataset is not large and the fact that we're only dealing with one version of data. But happens when we need to:

• trigger or schedule these workflows as new data arrives?
• scale these workflows as our data grows?
• share these workflows so others can use their outputs?
• monitor these workflows separately?

We'll need to break down our end-to-end ML pipeline into in it's constituent DataOps and MLOps pipelines that be orchestrated and scaled as needed. There are several tools that can help us create these pipelines and orchestrate our workflows such as Airflow, Luigi and even some ML focused options such as KubeFlow Pipelines, Vertex pipelines, etc.. We'll be creating our pipelines using AirFlow because of it's:

• wide adoption in industry and open source
• Python based software development kit (SDK)
• integration with the ecosystem (data ingestion, processing, etc.)
• ability to run locally and scale easily

Note

We'll be running Airflow locally but we can easily scale it by running on a managed cluster platform where we can run Python, Hadoop, Spark, etc. on large batch processing jobs (AWS' EMR, Google Cloud's Dataproc, on-prem hardware, etc.)

Airflow

Before we create our specific pipelines, let's understand and implement Airflow's overarching concepts that will allow us to "author, schedule, and monitor workflows".

Install

To install and run Airflow, we can either do so locally or with Docker and set up a database backend (default is SQLite) and/or establish connections.

Warning

If you do decide to use docker-compose to run Airflow inside Docker containers, you'll want to allocate at least 4 GB in memory.

  1 2 3 4 5 6 7 8 9 10 11 # Configurations export AIRFLOW_HOME=${PWD}/airflow AIRFLOW_VERSION=2.0.1 PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)" CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt" # Install Airflow (may need to upgrade pip) pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}" # Initialize DB (SQLite by default) airflow db init 

This will create an airflow directory with the following components:

 1 2 3 4 5 6 airflow/ ├── logs/ └── airflow.cfg ├── airflow.db ├── unittests.cfg └── webserver_config.py 

We're going to edit the airflow.cfg file to best fit our needs:

# Inside airflow.cfg
enable_xcom_pickling = True  # needed for Great Expectations airflow provider
load_examples = False  # don't clutter webserver with examples


And we'll perform a reset to account for the configuration changes.

 1 airflow db reset 

Now we're ready to initialize our database with an admin user, which we'll use to login to access our workflows in the webserver.

 1 2 3 4 5 6 7 # We'll be prompted to enter a password airflow users create \ --username admin \ --firstname Goku \ --lastname Mohandas \ --role Admin \ --email [email protected] 

Webserver

Once we've created a user, we're ready to launch the webserver and log in using our credentials.

# Launch webserver
export AIRFLOW_HOME=${PWD}/airflow airflow webserver --port 8080 # http://localhost:8080  The webserver allows us to run and inspect workflows, establish connections to external data storage, manager users, etc. through a UI. Similarly, we could also use Airflow's REST API or Command-line interface (CLI) to perform the same operations. However, we'll largely be using the webserver because it's convenient to visually inspect our workflows. We'll explore the different components of the webserver as we learn about Airflow and implement our workflows. Scheduler Next, we need to launch our scheduler, which will execute and monitor the tasks in our workflows. The schedule executes tasks by reading from the metadata database and ensures the task has what it needs to finish running. # Launch scheduler (in separate terminal) export AIRFLOW_HOME=${PWD}/airflow
airflow scheduler


Executor

As our scheduler reads from the metadata database, the executor determines what worker processes are necessary for the task to run to completion. Since our default database SQLlite, which can't support multiple connections, our default executor is the Sequential Executor. However, if we choose a more production grade DB option such as PostgresSQL or MySQL, we can choose scalable Executor backends Celery, Kubernetes, etc. For example, running Airflow with Docker uses PostgresSQL as the database and so uses the Celery Executor backend to run tasks in parallel.

DAGs

Workflows are defined by directed acyclic graphs (DAGs), whose nodes represent tasks and edges represent the relationship between the tasks. Direct and acyclic implies that workflows can only execute in one direction and a previous, upstream task cannot run again once a downstream task has started.

DAGs can be defined inside Python workflow scripts inside the airflow/dags directory and they'll automatically appear (and continuously be updated) on the webserver. Inside each workflow script, we can define some default arguments that will apply to all DAGs within that workflow.

 1 2 3 4 # Default DAG args default_args = { "owner": "airflow", } 

Note

There are many more default arguments and we'll cover them as we go through the concepts.

We can initialize DAGs with many parameters (which will override the same parameters in default_args) and in several different ways:

• using a with statement

  1 2 3 4 5 6 7 8 9 10 from airflow import DAG with DAG( dag_id="dataops", description="Data related operations.", default_args=default_args, tags=["dataops"], ) as dag: # Define tasks pass 

• using the dag decorator

  1 2 3 4 5 6 7 8 9 10 11 from airflow.decorators import dag @dag( dag_id="dataops", description="Data related operations.", default_args=default_args, tags=["dataops"], ) def dataops(): # Define tasks pass 

Note

There are many parameters that we can initialize our DAGs with, including a start_date and a schedule_interval. While we could have our workflows execute on a temporal cadence, many ML workflows are initiated by events, which we can map using sensors and hooks to external databases, file systems, etc.

Tasks are the operations that are executed in a workflow and are represented by nodes in a DAG. Each task should be a clearly defined single operation and it should be idempotent, which means we can execute it multiple times and expect the same result. This is important in the event we need to retry a failed task and don't have to worry about resetting the state of our system. Like DAGs, there are several different ways to implement tasks:

• Using Operators

  1 2 3 4 5 6 7 8 9 10 11 12 from airflow.operators.bash_operator import BashOperator @dag( dag_id="example", default_args=default_args, schedule_interval=None, tags=["example"], ) def example(): # Define tasks task_1 = BashOperator(task_id="task_1", bash_command="echo 1") task_2 = BashOperator(task_id="task_2", bash_command="echo 1") 

• Using the tag decorator

  1 2 3 4 5 6 7 8 9 10 11 12 13 @dag( dag_id="example", default_args=default_args, schedule_interval=None, tags=["example"], ) def example(): @task def task_1(): return 1 @task def task_2(x): return x+1 

Note

Though the graphs are directed, we can establish certain trigger rules for each task to execute on conditional successes or failures of the parent tasks.

Operators

The first method of creating tasks involved using Operators, which defines what exactly the task will be doing. Airflow has many built-in Operators such as the BashOperator or PythonOperator, which allow us to execute bash and Python commands respectively.

  1 2 3 4 5 6 7 8 9 10 # BashOperator from airflow.operators.bash_operator import BashOperator task_1 = BashOperator(task_id="task_1", bash_command="echo 1") # PythonOperator from airflow.operators.python import PythonOperator task_2 = PythonOperator( task_id="task_2", python_callable=foo, op_kwargs={"arg1": ...}) 

There are also many other Airflow native Operators (email, S3, MySQL, Hive, etc.), as well as community maintained provider packages (Kubernetes, Snowflake, Azure, AWS, Salesforce, Tableau, etc.), to execute tasks specific to certain platforms or tools.

Note

We can also create our own custom Operators by extending the BashOperator class.

Relationships

Once we've defined our tasks using Operators or as decorated functions, we need to define the relationships between them (edges). The way we define the relationships depends on how our tasks were defined:

• if defined using Operators

 1 2 3 # Task relationships task_1 >> task_2 # same as task_1.set_downstream(task_2) or # task_2.set_upstream(task_1) 

• if defined using decorated functions

 1 2 3 # Task relationships x = task_1() y = task_2(x=x) 

In both scenarios, we'll setting task_2 as the upstream task to task_1. We can create intricate DAGs by using these notations to define the relationships.

 1 2 3 task_1 >> [task_2_1, task_2_2] >> task_3 task_2_2 >> task_4 [task_3, task_4] >> task_5 

XComs

When we use decorated functions, we can see how values can be passed between tasks. But, how can we pass values when using Operators? Airflow uses XComs (cross communications) objects, defined with a key, value, timestamp and task_id, to push and pull values between tasks. When we use decorated functions, XComs are being used under the hood but it's abstracted away, allowing us to pass values amongst Python functions seamlessly. But when using Operators, we'll need to explicitly push and pull the values as we need it.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def _task_1(ti): x = 2 ti.xcom_push(key="x", value=x) def _task_2(ti): x = ti.xcom_pull(key="x", task_ids=["task_1"])[0] y = x + 3 ti.xcom_push(key="y", value=y) @dag( dag_id="example", default_args=default_args, schedule_interval=None, tags=["example"], ) def example2(): # Tasks task_1 = PythonOperator(task_id="task_1", python_callable=_task_1) task_2 = PythonOperator(task_id="task_2", python_callable=_task_2) task_1 >> task_2 

We can also view our XComs on the webserver by going to Admin >> XComs:

Warning

The data we pass between tasks should be small (metadata, metrics, etc.) because Airflow's metadata database is not equipped to hold large artifacts. However, if we do need to store and use the large results of our tasks, it's best to use an external data storage (blog storage, databases, etc.) by creating an interface via hooks.

DAG runs

Once we've defined the tasks and their relationships, we're ready to run our DAGs. We'll start defining our DAG like so:

 1 2 # Define DAG example_dag = example() 

If we refresh our webserver page (http://localhost:8080/), the new DAG will have appeared.

Manual

Our DAG is initially paused since we specified dags_are_paused_at_creation = True inside our airflow.cfg configuration, so we'll have to manually execute this DAG by clicking on it >> unpausing it (toggle) >> triggering it (button). To view the logs for any of the tasks in our DAG run, we can click on the task >> Log.

Note

We could also use Airflow's REST API (will configured authorization) or Command-line interface (CLI) to inspect and trigger workflows (and a whole lot more). Or we could even use the trigger_dagrun Operator to trigger DAGs from within another workflow.

# CLI to run dags
airflow dags trigger dataops
airflow dags trigger mlops


Interval

Had we specified a start_date and schedule_interval when defining the DAG, it would have have automatically executed at the appropriate times. For example, the DAG below will have started two days ago and will be triggered at the start of every day.

  1 2 3 4 5 6 7 8 9 10 from datetime import timedelta from airflow.utils.dates import days_ago @dag( dag_id="example", default_args=default_args, schedule_interval=timedelta(days=1), start_date=days_ago(2), tags=["example"], ) 

Warning

Depending on the start_date and schedule_interval, our workflow may have been triggered several times and Airflow will try to catchup to the current time. We can avoid this by setting catchup=False when defining the DAG. However, if we did want to run particular runs in the past, we can backfill what we need.

We could also specify a cron expression for our schedule_interval parameter or even use cron presets.

Note

Airflow's Scheduler will run our workflows one schedule_interval from the start_date. For example, if we want our workflow to start on 01-01-1983 and run @daily, then the first run will be immediately after 01-01-1983T11:59.

Sensors

While it may make sense to execute many data processing workflows on a scheduled interval, machine learning workflows may require more nuanced triggers. We shouldn't be wasting compute by running executing our DataOps and MLOps pipelines just in case we have new data. Instead, we can use sensors to trigger workflows when some external condition is met. For example, we can initiate data processing when annotated data appears in a database, or when a specific file appears in a file system.

Note

There's so much more to Airflow (monitoring, Task groups, smart senors, etc.) so be sure to explore them as you need them by using the official documentation.

DataOps

Now that we've reviewed Airflow's major concepts, we're ready to create the DataOps pipeline for our application. It involves a series of tasks, starting from extracting the data, validating it and storing it at the right place for others to use for downstream workflows and applications.

 1 2 # Task relationships extract_data >> [validate_projects, validate_tags] >> compute_features >> cache 

To keep things simple, we'll continue to keep our data as a local file:

 1 2 3 4 5 # Extract data from DWH, blog storage, etc. extract_data = BashOperator( task_id="extract_data", bash_command=f"cd {config.BASE_DIR} && dvc pull", ) 

... but in a real production setting, our data can come from a wide variety of sources. For example, we have a dataset prepared for the purposes of this course but where does this data originate from and where does it end up before we're ready to use it for machine learning?

Extraction

When we talk about data that we want to process on in our DataOps pipelines, we need to think about both it's origin and where it lives.

• Database: most applications (including the old Made With ML that our dataset comes from) have a database to store and read information from. Typical choices are PostgreSQL, MySQL, MongoDB, Cassandra, etc. The specific choice depends on the schemas, data scale, etc. We can also have auxiliary data coming from other services around our main application such as user analytics (ex. Google Analytics or Segment), financial transactions (ex. Stripe), CRM services (ex. SalesForce), etc. Data can also be taken from the origin database and moved to another specialized database after it goes through labeling/annotation and QA pipelines.

• Warehouse: data is often moved from the database to a data warehouse (DWH) for the added benefits of a powerful analytics engine, front-end clients, etc. to make it very easy for downstream developers to efficiently use the data at scale. Typical choices are Google BigQuery, Amazon RedShift, SnowFlake, Hive, etc.

Note

While both databases and data warehouses hold data, they're different kinds of processing systems. A database is an online transaction processing (OLTP) system because it's typically used for day-to-day CRUD (create, read, update, delete) operations connected to the main application. Whereas, a data warehouse is an online analytical processing (OLAP) system because it's used ad-hoc querying on aggregate views of the data. Data moves from a database to a warehouse through ETL (extract, transform, load) pipelines, which the warehouse will use to drive business insights, which can drive change back to the database (schemas, values, etc.).

Typically we'll use sensors to trigger workflows when a condition is met or trigger them directly from the external source via API calls, etc. Our workflows can communicate with the different platforms by establishing a connection and then using hooks to interface with the database, data warehouse, etc.

Validation

The specific process of where and how we extract our data can be bespoke but what's important is that we have a continuous integration to execute our workflows. A key aspect to trusting this continuous integration is validation at every step of the way. We'll once again use Great Expectations, as we did in our testing lesson, to validate our incoming data before processing it.

With the Airflow concepts we've learned so far, there are many ways to use our data validation library to validate our data. Regardless of what data validation tool we use (ex. Great Expectations, TFX, AWS Deequ, etc.) we could use the BashOperator, PythonOperator, etc. to run our tests. Luckily, Great Expectations has a recommended Airflow Provider package. This package contains a GreatExpectationsOperator which we can use to execute specific checkpoints as tasks.

Recall from our testing lesson that we used the following CLI commands to perform our data validation tests:

 1 2 great_expectations checkpoint run projects great_expectations checkpoint run tags 

We can perform the same operations as Airflow tasks within our DataOps workflow, either as the bash commands above using the BashOperator or with the custom Great Expectations operator like below:

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator # Validate projects validate_projects = GreatExpectationsOperator( task_id="validate_projects", checkpoint_name="projects", data_context_root_dir="great_expectations", fail_task_on_validation_failure=True, ) # Validate tags validate_tags = GreatExpectationsOperator( task_id="validate_tags", checkpoint_name="tags", data_context_root_dir="great_expectations", fail_task_on_validation_failure=True, ) 

And we want both tasks to pass so we set the fail_task_on_validation_failure parameter to True so that downstream tasks don't execute if they fail.

Note

Reminder that we previous set the following configuration in our airflow.cfg file since the output of the GreatExpectationsOperator is not JSON serializable.

# Inside airflow.cfg
enable_xcom_pickling = True


Compute

Once we have validated our data, we're ready to compute features. We have a wide variety of Operators to choose from depending on the tools we're using for compute (ex. Python, Spark, DBT, etc.). And of course we can easily scale all of this by running on a managed cluster platform (AWS EMR, Google Cloud's Dataproc, on-prem hardware, etc.). For our task, we'll just need a PythonOperator to execute our feature engineering CLI command:

 1 2 3 4 5 6 # Compute features compute_features = PythonOperator( task_id="compute_features", python_callable=cli.compute_features, op_kwargs={"params_fp": Path(config.CONFIG_DIR, "params.json")} ) 

Cache

When we establish our DataOps pipeline, it's not something that's specific to any one application. Instead it's its own repository that's responsible for extracting, transforming and loading (ETL) data for downstream pipelines who are dependent on it for their own unique applications. This is one of the most important benefits of not doing an end-to-end ML application because it allows for true continued collaboration. And so we need to cache our computed features to a central feature store, database or data warehouse (depending on whether ML task involves entity features that change over time). This way, downstream developers can easily access features and use them to build applications without having to worry about doing much heavy lifting with data processing.

 1 2 3 4 5 # Feature store cache = BashOperator( task_id="cache", bash_command=f"cd {config.BASE_DIR}/features && feast materialize-incremental {END_TS}", ) 

Note

Learn all about what features stores are, why we need them and how to implement them in our feature stores lesson.

MLOps (model)

Once we have our features in our feature store, we can use them for MLOps tasks responsible for model creating such as optimization, training, validation, serving, etc.

 1 2 3 4 # Task relationships optimization >> train_model >> evaluate_model >> [improved, regressed] improved >> [set_monitoring_references, deploy_model, notify_teams] regressed >> [notify_teams, file_report] 

Extract data

The first step is to extract the relevant historical features from our feature store/DB/DWH that our DataOps workflow cached to. We'd normally need to provide a set of entities and the timestamp to extract point-in-time features for each of them.

 1 2 3 4 5 # Extract features extract_features = PythonOperator( task_id="extract_features", python_callable=cli.get_historical_features, ) 

Training

Once we have our features, we can use them to optimize and train the best models. Since these tasks can require lots of compute, we would typically run this entire pipeline in a managed cluster platform which can scale up as our data and models grow.

 1 2 3 4 5 # Optimization optimization = BashOperator( task_id="optimization", bash_command="tagifai optimize", ) 
 1 2 3 4 5 # Training train_model = BashOperator( task_id="train_model", bash_command="tagifai train-model", ) 

Note

An interesting trend in continual learning involves algorithmic advancements where models build on their knowledge without retraining from scratch and suffering from catastrophic forgetting. Check out this survey (rev. 04/2021) for a review of methods: A continual learning survey: Defying forgetting in classification tasks.

Evaluation

It's imperative that we evaluate our trained models so we can be confident in it's ability. We've extensively covered model evaluation in our testing lesson so here we'll talk about what happens after evaluation. We want to execute a chain of tasks depending on whether the model improved or regressed. To do this, we're using a BranchPythonOperator for the evaluation task so it can return the appropriate task id.

 1 2 3 4 5 6 7 from airflow.operators.python import BranchPythonOperator # Evaluate evaluate_model = BranchPythonOperator( # BranchPythonOperator returns a task_id or [task_ids] task_id="evaluate_model", python_callable=_evaluate_model, ) 

This Operator will execute a function whose return response will be a single (or a list) task id.

def _evaluate_model():
if improvement_criteria():
return "improved"  # improved is a task id
else:
return "regressed"  # regressed is a task id

 1 2 3 4 5 6 7 8 9 # Improved or regressed improved = BashOperator( task_id="improved", bash_command="echo IMPROVED", ) regressed = BashOperator( task_id="regressed", bash_command="echo REGRESSED", ) 

The returning task ids can correspond to tasks that are simply used to direct the workflow towards a certain set of tasks based on upstream results. In our case, we want to log a report in the event of a regression.

 1 2 3 4 # Task relationships extract_data >> optimization >> train >> evaluate >> [improved, regressed] improved >> serve regressed >> report 

Serving

If our model passed our evaluation criteria then we can deploy and serve our model. Again, there are many different options here such as using our CI/CD Git workflows to deploy the model wrapped as a scalable microservice or for more streamlined deployments, we can use a purpose-build model server such as as MLFlow, TorchServe, RedisAI or Nvidia's Triton inference server. These servers have a registry with an API layer to seamlessly inspect, update, serve, rollback, etc. multiple versions of models. Typically a model artifact will be loaded to an inference app which directly interfaces to fulfill any requests from the user-facing application.

 1 2 3 4 5 # Serve serve_model = BashOperator( task_id="serve_model", # push to GitHub to kick off serving workflows bash_command="echo served model", # or to a purpose-built model server, etc. ) 

MLOps (update)

Once we've validated and served our model, how do we know when and how it needs to be updated? We'll need to compose a set of workflows that reflect the update policies we want to set in place.

 1 2 3 # Task relationships monitoring >> update_policy_engine >> [_continue, inspect, improve, rollback] improve >> compose_retraining_dataset >> retrain 

Monitoring

Our inference application will receive requests from our user-facing application and we'll use our versioned model artifact to return inference responses. All of these inputs, predictions and other values will also be sent (batch/real-time) to a monitoring workflow that ensures the health of our system. We have already covered the foundations of monitoring in our monitoring lesson but here we'll look at how triggered alerts fit with the overall operational workflow.

Policies

Based on the metrics we're monitoring using various thresholds, window sizes, frequencies, etc., we'll be able to trigger events based on our update policy engine.

• continue: with the currently deployed model without any updates. However, an alert was raised so it should analyzed later to reduce false positive alerts.
• improve: by retraining the model to avoid performance degradation causes by meaningful drift (data, target, concept, etc.).
• inspect: to make a decision. Typically expectations are reassessed, schemas are reevaluated for changes, etc.
• rollback: to a previous version of the model because of an issue with the current deployment. Typically these can be avoided using robust deployment strategies (ex. dark canary).

Retraining

If we need to improve on the existing version of the model, it's not just the matter of fact of rerunning the model creation workflow on the new dataset. We need to carefully compose the training data in order to avoid issues such as catastrophic forgetting (forget previously learned patterns when presented with new data).

• labeling: new incoming data may need to be properly labeled before being used (we cannot just depend on proxy labels).
• active learning: we may not be able to explicitly label every single new data point so we have to leverage active learning workflows to complete the labeling process.
• QA: quality assurance workflows to ensure that labeling is accurate, especially for known false positives/negatives and historically poorly performing slices of data.
• augmentation: increasing our training set with augmented data that's representative of the original dataset.
• sampling: upsampling and downsampling to address imbalanced data slices.
• evaluation: creation of an evaluation dataset that's representative of what the model will encounter once deployed.

Once we have the proper dataset for retraining, we can kickoff the featurization and model training workflows where a new model will be trained and evaluated before being deployed and receiving new inference requests. In the next lesson, we'll discuss how to combine these pipelines together to create a continual learning system.

References

To cite this lesson, please use:

 1 2 3 4 5 6 @article{madewithml, author = {Goku Mohandas}, title = { Pipelines - Made With ML }, howpublished = {\url{https://madewithml.com/}}, year = {2021} }