Skip to content

Feature Store


Using a feature store to connect the DataOps and MLOps pipelines to enable collaborative teams to develop efficiently.
Goku Mohandas
· ·
Repository ยท Notebook

๐Ÿ“ฌ  Receive new lessons straight to your inbox (once a month) and join 30K+ developers in learning how to responsibly deliver value with ML.

Intuition

Let's motivate the need for a feature store by chronologically looking at what challenges developers face in their current workflows.

  1. Isolation: feature development in isolation (for each unique ML application) can lead to duplication of efforts (setting up ingestion pipelines, feature engineering, etc.).
    • Solution: create a central feature repository where the entire team contributes maintained features that anyone can use for any application.
  2. Skew: we may have different pipelines for generating features for training and serving which can introduce skew through the subtle differences.
    • Solution: create features using a unified pipeline and store them in a central location that the training and serving pipelines pull from.
  3. Values: once we set up our data pipelines, we need to ensure that our input feature values are up-to-date so we aren't working with stale data, while maintaining point-in-time correctness so we don't introduce data leaks.
    • Solution: retrieve input features for the respective outcomes by pulling what's available when a prediction would be made.

Note

Point-in-time correctness refers to mapping the appropriately up-to-date input feature values to an observed outcome at \(t_{n+1}\). This involves knowing the time (\(t_n\)) that a prediction is needed so we can collect feature values (\(X\)) at that time.

pivot

When actually constructing our feature store, there are several core components we need to have to address these challenges:

  • data ingestion: ability to ingest data from various sources (databases, data warehouse, etc.) and keep them updated.
  • feature definitions: ability to define entities and corresponding features
  • historical features: ability to retrieve historical features to use for training.
  • online features: ability to retrieve features from a low latency origin for inference.

Each of these components is fairly easy to set up but connecting them all together requires a managed service, SDK layer for interactions, etc. Instead of building from scratch, it's best to leverage one of the production-ready, feature store options such as Feast, Hopsworks, Tecton, Rasgo, etc. And of course, the large cloud providers have their own feature store options as well (Amazon's SageMaker Feature Store, Google's Vertex AI, etc.)

Feast

All the code accompanying this lesson can be found in this notebook.

We're going to leverage Feast as the feature store for our application for it's ease of local setup, SDK for training/serving, etc.

1
2
3
# Install Feast
pip install feast==0.10.5 -q
pip freeze | grep feast
feast==0.10.5

We're going to create a feature repository at the root of our project. Feast will create a configuration file for us and we're going to add an additional features.py file to define our features.

1
2
3
feast init --minimal --template local features
cd features
touch features.py
Creating a new Feast repository in /Users/goku/Documents/madewithml/mlops/features.

Note

Traditionally, the feature repository would be it's own isolated repository that other services will use to read/write features from but we're going to simplify it and create it directly in our application's repository.

The initialized feature repository (with the additional file we've added) will include:

features/
โ”œโ”€โ”€ feature_store.yaml  - configuration
โ””โ”€โ”€ features.py         - feature definitions

We're going to configure the locations for our registry and online store in our feature_store.yaml file.

  • registry: contains information about our feature repository, such as data sources, feature views, etc. Since it's in a database, instead of a Python file, it can very quickly be accessed in production.
  • online store: DB (SQLite for local) that stores the (latest) features for defined entities (users, projects, etc.) to be used for online inference.

If all definitions look valid, Feast will sync the metadata about Feast objects to the registry. This step is necessary because the production feature serving infrastructure won't be able to access Python files in the feature repository at run time, but it will be able to efficiently and securely read the feature definitions from the registry.

1
2
3
4
5
6
# Inside feature_store.yaml
project: features
registry: ../stores/feature/registry.db
provider: local
online_store:
    path: ../stores/feature/online_store.db

Components

We're going to start creating the components of the feature store, one at a time, using Feast.

Data ingestion

The first step is to establish connections with our data sources (databases, data warehouse, etc.). Feast requires it's data sources to either come from a file (Parquet), data warehouse (BigQuery) or data stream (Kafka / Kinesis). We'll convert our generated features file from the DataOps pipeline (features.json) into a Parquet file, which is a column-major data format that allows fast feature retrieval and caching benefits (contrary to row-major data formats such as CSV where we have to traverse every single row to collect feature values).

Read more about these data sources in our pipelines and infrastructure lessons.

1
2
3
import pandas as pd
from pathlib import Path
from tagifai import config, utils
1
2
3
4
# Load features to df
features_fp = Path(config.DATA_DIR, "features.json")
features = utils.load_dict(filepath=features_fp)
df = pd.DataFrame(features)
1
2
# Format timestamp
df.created_on = pd.to_datetime(df.created_on)
1
2
3
4
5
6
# Convert to parquet
df.to_parquet(
    Path(config.DATA_DIR, "features.parquet"),
    compression=None,
    allow_truncated_timestamps=True,
)

Feature definitions

Now that we have our data source prepared, we can define our features for the feature store.

1
2
3
4
5
from datetime import datetime
from feast import Entity, Feature, FeatureView, ValueType
from feast.data_source import FileSource
from google.protobuf.duration_pb2 import Duration
from tagifai import config

The first step is to define the location of the features (FileSource in our case) and the timestamp column for each data point.

1
2
3
4
5
6
# Read data
START_TIME = "2020-02-17"
project_details = FileSource(
    path=str(Path(config.DATA_DIR, "features.parquet")),
    event_timestamp_column="created_on",
)

Next, we need to define the main entity that each data point pertains to. In our case, each project has a unique ID with features such as text and tags.

1
2
3
4
5
6
# Define an entity
project = Entity(
    name="id",
    value_type=ValueType.INT64,
    description="project id",
)

Finally, we're ready to create a FeatureView that loads specific features (features), of various value types, from a data source (input) for a specific period of time (ttl).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# Define a Feature View for each project
project_details_view = FeatureView(
    name="project_details",
    entities=["id"],
    ttl=Duration(
        seconds=(datetime.today() - datetime.strptime(START_TIME, "%Y-%m-%d")).days * 24 * 60 * 60
    ),
    features=[
        Feature(name="text", dtype=ValueType.STRING),
        Feature(name="tags", dtype=ValueType.STRING_LIST),
    ],
    online=True,
    input=project_details,
    tags={},
)

Once we've defined our feature views, we can apply it to push a version controlled definition of our features to the registry for fast access. It will also configure our registry and online stores that we've defined in our feature_store.yaml file.

cd features
feast apply
Registered entity id
Registered feature view project_details
Deploying infrastructure for project_details

Historical features

Once we've registered our feature definition, along with the data source, entity definition, etc., we can use it to fetch historical features. This is done via joins using the provided timestamps using pandas for our local setup or BigQuery, Hive, etc. as an offline DB for production.

1
2
import pandas as pd
from feast import FeatureStore
1
2
3
4
5
6
# Identify entities
project_ids = [1, 2, 3]
now = datetime.now()
timestamps = [datetime(now.year, now.month, now.day)]*len(project_ids)
entity_df = pd.DataFrame.from_dict({"id": project_ids, "event_timestamp": timestamps})
entity_df.head()
id event_timestamp
0 1 2021-06-07
1 2 2021-06-07
2 3 2021-06-07
1
2
3
4
5
6
7
# Get historical features
store = FeatureStore(repo_path=Path(config.BASE_DIR, "features"))
training_df = store.get_historical_features(
    entity_df=entity_df,
    feature_refs=["project_details:text", "project_details:tags"],
).to_df()
training_df.head()
event_timestamp id project_details__text project_details__tags
0 2021-06-07 00:00:00+00:00 1 Machine Learning Basics A practical set of not... [code, tutorial, keras, pytorch, tensorflow, d...
1 2021-06-07 00:00:00+00:00 2 Deep Learning with Electronic Health Record (E... [article, tutorial, deep-learning, health, ehr]
2 2021-06-07 00:00:00+00:00 3 Automatic Parking Management using computer vi... [code, tutorial, video, python, machine-learni...

Online features

For online inference, we want to retrieve features very quickly via our online store, as opposed to fetching them from slow joins. However, the features are not in our online store just yet, so we'll need to materialize them first.

1
2
3
cd features
CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S")
feast materialize-incremental $CURRENT_TIME
Materializing 1 feature views to 2021-06-07 13:14:52-07:00 into the sqlite online store.
project_details from 2020-02-17 13:14:53-08:00 to 2021-06-07 13:14:52-07:00:
100%|โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 2030/2030 [00:00<00:00, 14949.12it/s]

This has moved the features for all of our projects into the online store since this was first time materializing to the online store. When we subsequently run the materialize-incremental command, Feast keeps track of previous materializations and so we'll only materialize the new data since the last attempt.

1
2
3
4
5
6
7
# Get online features
store = FeatureStore(repo_path=Path(config.BASE_DIR, "features"))
feature_vector = store.get_online_features(
    feature_refs=["project_details:text", "project_details:tags"],
    entity_rows=[{"id": 1000}],
).to_dict()
feature_vector
1
2
3
4
5
6
7
8
{'project_details__tags': [['code',
   'course',
   'tutorial',
   'video',
   'natural-language-processing',
   'low-resource']],
 'id': [1000],
 'project_details__text': ['CMU LTI Low Resource NLP Bootcamp 2020 A low-resource natural language and speech processing bootcamp held by the Carnegie Mellon University Language Technologies Institute in May 2020.']}

Note

We can establish all of these components by running Feast on Kubernetes as well, which I highly recommend when dealing with data streams (Kafka / Kinesis).

Architecture

Batch processing

The feature store we implemented above assumes that our task requires batch processing. This means that inference requests on specific entity instances can use features that have been materialized from the offline store. Note that they may not be the most recent feature values for that entity.

batch processing
  1. Application data is stored in a database and/or a data warehouse, etc. And it goes through the DataOps pipeline to validate the data and engineer the features.
  2. These features are written to the offline store which can then be used to retrieve historical training data to train a model with. In our local set up, this was join via Pandas DataFrame joins for given timestamps and entity IDs but in a production setting, something like Google BigQuery or Hive would receive the feature requests.
  3. Once we have our training data, we can start the MLOps pipeline to optimize, train and validate a model.
  4. We can incrementally materialize features to the online store so that we can retrieve an entity's feature values with low latency. In our local set up, this was join via SQLite for a given set of entities but in a production setting, something like Redis or DynamoDB would be used.
  5. These online features are passed on to the deployed model to generate predictions that would be used downstream.

Warning

Had our entity (projects) had features that change over time, we would materialize them to the online store incrementally. But since they don't, this would be considered over engineering but it's important to know how to leverage a feature store for entities with changing features over time.

Stream processing

Some applications may require stream processing where we require near real-time feature values to deliver up-to-date predictions at low latency. While we'll still utilize an offline store for retrieving historical data, our application's real-time event data will go directly through our data streams to an online store for serving.

stream processing
  1. Real-time event data enters our running data streams (Kafka / Kinesis, etc.) where they can be processed to generate features.
  2. These features are written to the online store which can then be used to retrieve online features for serving at low latency. In our local set up, this was join via SQLite for a given set of entities but in a production setting, something like Redis or DynamoDB would be used.
  3. Streaming features are also written from the data stream to the batch data source (data warehouse, db, etc.) to be processed for generating training data later on.
  4. Historical data will be validated and used to generate features for training a model. This cadence for how often this happens depends on whether there are data annotation lags, compute constraints, etc.

Note

There are a few more components we're not visualizing here such as the unified ingestion layer (Spark), that connects data from the varied data sources (warehouse, DB, etc.) to the offline/online stores, or low latency serving (<10 ms). We can read more about all of these in the official Feast Documentation.

Additional functionality

Additional functionality that many feature store providers are currently (or recently) trying to integrate within the feature store platform include:

  • transform: ability to directly apply global preprocessing or feature engineering on top of raw data extracted from data sources.
    • Current solution: apply transformations as a separate Spark, Python, etc. workflow task before writing to the feature store.
  • validate: ability to assert expectations and identify data drift on the feature values.
    • Current solution: apply data testing and monitoring as upstream workflow tasks before they are written to the feature store.
  • discover: ability for anyone in our team to easily discover features that they can leverage for their application.
    • Current solution: add a data discovery engine, such as Amundsen, on top of our feature store to enable others to search for features.

Reproducibility

Though we could continue to version our training data with DVC whenever we release a version of the model, it might not be necessary. We generated our historical training data to train a model using a specific set of entities, feature definitions, timestamps, etc. This request, combined with the time-travel feature available in many databases and data warehouses, will allow to reproduce the exact data that was used to train our model. We can effectively execute our command on the exact prior state of the system which requires that we only keep track of the requests and timestamp of the execution.

Resources