Introduction
When I started working in the field of machine learning engineering many years ago the process of designing machine learning systems seemed like black magic since there weren't a lot of public material on the subject.
However, last couple of years several good books on machine learning system design has been published, such as Designing Machine Learning Systems and Machine Learning Design Patterns. This e-book is complementary to these, providing additional design patterns that I have found useful during my career when designing machine learning systems.
Training workflow
A common structure for a training pipeline is visualized below:
graph LR A[Data ingestion] --> B[Data validation] B --> C[Feature engineering] C --> D[Train model] D --> E[Model analysis]
I first saw this structure in the TFX project, but it might originate somewhere else. We could simply use Tensorflow Extended (TFX) for our training workflow or implement this structure in another framework, such as Kubeflow Pipelines.
Data ingestion
In this task we load the data needed for training our machine learning model from an external system, e.g. a feature-store or a data warehouse. Typically in this step we also split the data into separate sets: training, validation and test.
Data validation
In this task we will validate that our data follows some pre-defined structure. One common case is that the data follows a given schema. For example, we might expects that:
- That the column 'prices' should be of type float and should not be negative.
- That the column 'country' should be of type string and can only have the following values: ['US', 'CA', 'MX']
During this step we can also compute statistics about the data, such as histogram of numerical columns. These statics will be useful during serving when are trying to detection training-serving skews.
We can also compare the distribution between the training set and the validation & test set. This can be useful to detection any issues creating when splitting the data.
For example, image that we decide to train on data from users in USA and evaluate on data from users in Canada and Mexico. However, the behaviour of users in USA might be completely different from users in Canada and Mexico and therefore our model might perform poorly on the validation set. Comparing the distribution between training and validation can give us useful information how we need to change our splitting strategy.
Feature engineering
In this task we will do the feature creation and the feature pre-processing.
By feature creation I mean constructing the actual features. For example, calculating a rolling average of bought items for a given users during the last 14 days. Note that in general we would like to do this outside of the training workflow, see decoupling feature creation from training & serving workflow.
However there might be special cases where we have to do it in the training workflows. As an example, image that as part of the inference request we get a list of outlier days that the user has provided and we have a rolling average feature. In this case, we need to calculate the rolling average after removing the outliers, otherwise they will influence this feature. Since this feature needs to be calculated at "runtime" for inference it makes sense to do the same for training, allowing us to re-use the functionality and avoid skews.
But in general this should be avoided if possible, as described in decoupling feature creation from training & serving workflow. And sometimes we could re-frame the feature to avoid having to compute it at "runtime". In the example above, we could potentially use median instead of mean to remove the need for the user to provide the outlier days.
The second part is feature pre-processing. Here we transform the features into a more desirable format. For example, we could transform a column of words to integers through tokenization, Z-normalize a numerical column etc. A common pattern is to include this logic in the model, see embed pre & post processing logic in the model.
However, we might do the pre-processing in this task and then embed this into the model at a later time. The reason why we might want to do that is because this processing can be computationally heavy, especially if there is a large amount of data. This is actually what Tensorflow transform does.
Train model
In this step we have assembled everything needed to start training our machine learning model. To start with, me might begin with hyperparameter tuning where we try to find a good combination of hyper-parameters (e.g. learning-rate, batch-size etc).
Once those has been discovered we can do our final training with the "best" hyper-parameters we found.
Model analysis
The final step in our workflow is the model analysis. Here we will inspect the models performance, such as accuracy, F1-score, root-mean squared error etc. Furthermore, it's common to check the performance on different slices of the data to find potential imbalances in performance between the slices.
For example, let's say our data comes from users in two different regions: US and EU. Now it can be valuable to check what the accuracy of the model is for users in US and EU. If there is a large discrepancy between these two groups it can flag that we might have an issue with the model. As an example, the average accuracy is good for the two groups, but the model performs much worse for users in EU then in US. TensorFlow Model Analysis is a tool that helps us do this model analysis.
Decoupling feature creation from training & serving workflow
This design pattern aims to decouple the feature creation from the training and serving workflows.
It allows for separation of concerns; it let people with expertise in data engineering to focus creating scalable and robust feature creation pipelines while experts in model builders can focus on the modelling parts.
Decrease the risk of training-serving skews. A common cause of bugs in machine learning systems are difference in data processing between training and serving.
For example, imagine that we are building a machine learning system for recommending products to users. During our modelling phase we have discovered that the price is an important feature. However, accidentally in our training pipeline we have defined the price in American dollars (USD), while in the serving workflow we have defined it in the native currency (i.e. for Swedish users we define it in Swedish Krona). This would not necessarily break our system but our predictions would most be worse then if we had implemented both in the same currency, e.g. USD.
This might seem like a obvious example, but these types of issues crop up all the time. I have seen that simply unifying the feature creation between training and serving workflows substantially increase the accuracy of system.
A common term within machine learning systems are feature stores. Does that mean we always need to include a feature store in our systems? Not necessarily.
For example, imagine that we have the recommendation problem where we will pre-compute the recommendation before hand, i.e. doing batch inference a head of time and store the predictions in a database. When we want to serve these recommendations, we simply do a lookup in our database. Furthermore, assume that the cardinality of users & products are not too big. Then for our feature creation we could simply store these features in a data warehouse / datalake, where each row contains all the feature for users, articles, the the combination of feature & users.
Below is an example table with this structure:
article_id | user_id | article_type | user_previous_purchases | user_article_affinity |
---|---|---|---|---|
1582 | 5768 | electronics | 192.0 | 0.8 |
1582 | 9922 | electronics | 29.0 | 0.3 |
1988 | 9922 | clothes | 29.0 | 0.1 |
Whether we should implement this ourself like above or use an external platform like Feast is context dependent. The above approach is good since we don't have to learn a new tool and integrate with it. We can simply use our preferred data warehouse / datalake solution.
However, if we instead would like to do online inference the approach above has most likely too high latency for feature retrieval. To improve the latency of retrieval, we could store the features for inference in a key-value store, like Redis. However, adding these capabilities would require a lot of engineering efforts and simply using a third-party system like Feast is most likely easier.
Embed pre & post processing logic in the model
Before we feed the data into our machine learning model we usually have pre-processing steps. For example, we could transform a column of words to integers through tokenization, Z-normalize a numerical column etc. It's very important that these steps gets applied in the same way for training as in serving.
For example, assume we have a feature thats follows a normal distribution with mean 5 and standard deviation of 2. Before feeding this column into our machine learning model (say a decision tree) we Z-normalize the column to mean 0 and standard deviation of 1. Now, during training of our decision tree we have made a split, s.t. if the value is smaller then 0.5 then we take the left branch, otherwise the right. Now if we forget to apply this Z-normalization on the feature almost all of the data points will be larger 0.5, causing the model to behave completely differently.
So how do we make sure that the pre-processing steps gets applied in a similar fashion between training and serving? A common pattern is to embed this step into the model object. Now when we save the model after training in order to be used for serving, this step gets automatically included. An example of this approach is sklearn pipelines. Below is an example of a sklearn pipeline where we Z-normalize the feature "revenue" as part of our pipeline:
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LinearRegression
num_cols = ["revenue"]
numerical_pipeline = Pipeline(steps=[
("Z-normalize", StandardScaler())
])
col_transform = ColumnTransformer(transformers=[
("num_pipeline", numerical_pipeline, num_cols),
]
)
model = LinearRegression()
model_pipeline = Pipeline(steps=[
("col_trans", col_transform),
("model", model)
])
# The serialized model will contain the z-normalize step
with open("model.pkl","wb") as fp:
pickle.dump(model_pipeline, fp)
Frameworks such as Tensorflow and PyTorch also have similar support for this pattern. In this case, we will embed the pre-processing steps into the computational graph. Then when we save the model these steps will be included automatically, since it is a part of computational graph.
When we have a large amount of data for training it can be a good idea to do the pre-processing outside of the model (preferably with a distributed computing framework) and later embed the pre-processing into the model object before we save it for serving. This is actually what Tensorflow transform does. Here the pre-processing step is done with Apache Beam and once we have finished training we embed these steps into the computational graph which gets exported.
This pattern can also be applied to the post-processing step. For example, imagine that we want to make sure our predictions are non-negative. We could add a post-processing step where we take the maximum of our predictions and 0. To make sure that this gets applied in a similar fashion in both training and serving we should embed this step into our model object, as we did for pre-processing.
Model registry
A central part of any machine learning infrastructure is the model registry. The registry is mainly responsible to keep track of trained models and their versions. There are principally two parts to keep track of:
- Model binary
- Dependencies for the model
The model binary is a serialized model produced during the training phase. It can be a pickled scikit-learn model, tensorflow model of SavedModel format etc. These can be saved in object storage, like s3 or gcs.
The second part is the dependency needed to load and serve the model. For example, if we trained our scikit-learn model with version 1.3.2 we should use the same version during inference as well. The reason for using the same version for training and inference are twofold. Firstly, to avoid incompatibility between versions. For example, sometimes the internal implementation of the same same model (e.g. LinearRegression) changes between versions which cause our model to not be load-able. Secondly is the potential of updated implementation of certain calculation. For example, imaging that the library maintainer for scikit-learn updated the calculation of sample mean which we use in our model. Now the mean calculation is different between the training and serving, causing train-serving skew.
Now the question arises of how to manage these dependencies. There are three common ways:
- Docker container
- Pinned dependencies
- Export the model to an self-contained format
With the docker container approach, we define a dockerfile which specify certain library versions needed to training and serving our model. This container is then built and uploaded to our docker registry. In our model registry, we add a reference to this specific build, ensuring that the same library versions are installed during training and serving. This approach is used in VertexAI's model registry.
Usually you will have certain libraries that should only be installed for training and vise-versa for serving. This can be achieved with a multi-staged build:
FROM python:3.12-bullseye AS base
# Install dependencies used in both training & serving
pip install scikit-learn==1.3.2
FROM base AS train
# Install dependencies used only in training, like a hyper-parameter tuning library
pip install hyperopt==0.2.7
FROM base AS serve
# Install dependencies used only in serving, like a rest web framework
pip install fastapi==0.109.0
The second approach is simply pinning the dependencies in text-format, like YAML. These versions are created during our training process, ensuring that we use the same version during training and serving. This approach is supported in MLFlow:
dependencies:
- scikit-learn==1.3.2
Lastly, we could also export the model to a self-contained model format. When exporting a model to this format, we should be able to set a version for the different operations (e.g. batch norm) used within the model. This ensures that the computations are the same for training & serving.
One example of this format is ONNX. With ONNX we can simply set the target version when exporting a model. This version will be saved within the model binary. When loading the model for inference, the inference runtime will make sure that appropriate operations are used during inference calls.
You can convert models from PyTorch, Tensorflow, sklearn and more easily with provided converts. Below is an example of exporting a sklearn model to ONNX:
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from skl2onnx import to_onnx
iris = load_iris()
X, y = iris.data, iris.target
X = X.astype(np.float32)
X_train, X_test, y_train, y_test = train_test_split(X, y)
clr = RandomForestClassifier()
clr.fit(X_train, y_train)
onx = to_onnx(clr, X[:1], target_opset=21) # Here we the sklearn model to ONNX, with opset=21
with open("rf_iris.onnx", "wb") as f:
f.write(onx.SerializeToString())
Serving patterns
Once a model is trained we want to make it available for serving. There are typically three common ways of doing it:
- Batch serving
- Online serving
- Hybrid serving
Each of them have their own advantages and disadvantages.
Batch serving
During batch serving we typically received a large number of rows and want to do inference on them. One example where this arises is that we want to make forecast of sales for different products on a weekly basis so that we can allocate appropriate volumes to different stores. The prediction job is executed every Saturday at midnight. Here we make make daily predictions for seven days, for each product and store. Image we have 100 stores, each with 1000 products. Therefore we would need to make 7,000,000 predictions. Here the batch-serving patterns fits well, since the number of rows are quite large and there is no tight latency requirements (it probably doesn't matter if the job takes 60 minutes or 90 minutes).
Now for actually serving the requests to our users we typically save these prediction in a key-value cache (e.g. Redis). And when a user request the forecasted sale for a given store, product and date we simply look it up in the cache and return the value, as depicted below:
graph LR A[Batch job] --> B[Key-Value cache] B --> C[User request]
One advantage of batch-serving is that we receive all rows at once and therefore it is easier to make efficient use of the hardware. Also, since we pre-compute these the speed of receiving predictions when a user request them will be very quick since fetching items from a key-value store is fast (typically ~1 ms).
One disadvantage is that we need to pre-compute the prediction for all possible combinations of feature, which grows exponentially. For example, imagine we instead wanted to forecast per hour. That would now require 16,800,000 predictions. And if we wanted more predictions columns, such as country, the number of rows we need to pre-compute grows quickly.
Note that pre-computing the predictions is not always be possible. For example, imagine above that one of the features to our model would be the product description, which would be free-flowing text of type string. We could simply not pre-compute all possible descriptions a head of time since the number of possible combination is enormous. So we would need to drop this feature from the model in this case in order to pre-compute the predictions, which might be an acceptable solution, depending on how important this feature is for the model performance.
Online serving
Hybrid approach
Online serving structure
When deploying our machine learning model for online inference we typically divide into two parts, our backend service and our machine learning inference service. Below is a diagram of the high-level overview of this setup:
graph LR A[User] <--> B[Backend service] B <--> C[Machine learning inference service]
Our backend service is responsible authenticate the request, fetching necessary data from the feature store etc. The machine learning inference service will be tasked with doing the actual inference of the machine learning model. It can be broken up into three parts; pre-process, predict and post-process.
The pre-process step include steps like validating that the required data is available for doing the request, that the data types are correct etc. We try to avoid including model specific pre-processing step here, see Embed pre & post processing logic in the model for clarification.
The predict part is where we do the actual model inference. We send in the data produces from pre-process step into the model to get the predictions.
Lastly, we have the post-process step. Here we transform the outputted predictions into a format which can be used by the system as a whole. For example, we might transformed predicted class-ids into class names, such as 0 into the name 'Dog'.
Now with these tree steps, how should they be deployed? As one service or spread out into multiple? There is a few different approaches for doing this which we will delve into now with their own advantages and disadvantages.
Group all steps together in one service
The first approach we will discuss is to group all the steps into one service. Below is an example code structure:
class InferenceHandler:
def __init__(self, model_path):
# load the model, e.g. with pickle
pass
def pre_process(self, x):
# define the pre-process logic in this function
pass
def predict(self, x):
# define how the model's predict function should be called in this function
pass
def post_process(self, x):
# define the post-process logic in this function
pass
def infer(self, x):
# in this function we glue all of the above step together
return self.post_process(self.predict(self.pre_process(x)))
The class above would then be included in a web-server which exposes an end-point that allow users to call it through HTTP or gRPC.
The advantages of this approach that it is simple. You only need to deploy one service, making the deployment process a lot smoother. Furthermore, keeping the different steps in synchronisation is a lot easier. For example, imaging we wanted to update our predict function to accept batches of inputs (i.e. from input shape of (num_features) to (num_samples, num_features)). This change would require us to also update the pre & post-processing steps. Now, if these were deployed separately, we would need re-deploy all the services which is more cumbersome.
VertexAI model serving and Ray Serve are examples of frameworks that uses this approach.
Separate each step into a service
The next approach is to separate each step into a service. These would then communicate over HTTP / gRPC. Below is a diagram of the communication flow:
graph LR A[External service] --> B[Pre-process service] B --> C[Predict service] C --> D[Post-process service] D --> A
Now you might ask "why would we want to do this"? To answer that question, lets imagine the following case-study:
Let's say we have a large convolutional neural network that we want to deploy. To speed up the inference part we decide to run the model on a GPU. Furthermore, our pre-processing step is quite time-consuming, e.g. we want crop the image and convert from RGB to HSV. Moreover, let's say we are getting an increase in request so we need to scale-up this solution. If all the steps are grouped together, we need deploy additional replicas of the same service, regardless of which part is the bottleneck. Furthermore, during the pre & post-processing step our GPU is idle which is wasteful since GPU's are expensive. Now imagine these were separate steps, then we could scale them separately depending on which part was the bottleneck.
Another advantage of this separation is that we can co-locate our models on a model inference server, such as triton-inference-server and tensorflow serving. Now we could load multiple replicas of a model and/or multiple models. This is advantages since it allow us to utilize our GPU better.
Lastly, most of these inference servers allows us to off-load models that haven't been used in a while, and load it once a request comes for that specific model (of course with a latency cost). This feature can be helpful if we need to serve a large number of models. For example, imagine that we have a model that forecasts how much we will sell of an item in a given country. Through our experiments we have determined that having one model per country performed better then a global one. Furthermore, assume that we operate in many different countries. If we have included all steps in one service, we would have to deploy one of these services for each country (and potentially more if the number of requests are large). However when the steps are separated, we could co-locate all of the models on an inference server, potentially getting away with fewer deployments.
Kserve and Seldon are examples of frameworks that uses this approach.
When to use which approach?
So now naturally the question arises: "Which of these approaches should I take?"
I would choose "grouping all steps together" if the following condition were meet:
- Number of served models are small for the application
- The model doesn't require an accelerator, such as GPUs, TPUs etc
Otherwise I would go "separate each step".
Data validation for inference
During our training workflow we produce the data validation artifact which describes the data schema and the dataset distribution, see training workflow. Now during serving we want to validate that our serving data has the same data schema and comes from a similar distribution to our training data in order to avoid training-serving skew. Comparing distributions can be done with measures such as Jensen–Shannon divergence.
Below we describe how to do this validation for batch & online inference.
Batch inference
A typical batch-inference workflow has the following structure:
graph LR A[Data ingestion] --> B[Data validation] B --> C[Model inference]
In the data validation task we will perform the validation. That is, compare the data schema for the ingested data for inference and the produced data schema during training. We will also compare the data distribution between the two datasets. If the schemas & data distributions are the same we will continue to the model inference task, otherwise we might fail the workflow and/or notify the developers that something is wrong.
Online inference
Doing the data validation for online inference is a bit trickier. First of, most metrics to detect distribution differences requires a sizeable amount of data. For online inference we might only have at hand one or a couple of data-points (i.e. batched requests) at hand and computing this difference is not meaningful. Also doing this data validation would introduce more latency to our system since it's an additional step we need to perform.
So how can we solve this? One approach is take the requests we get in the online inference service and forward them to an additional service, called data validation service. Here we will store the requests and once we have reached an adequate amount of data we will trigger a batch data validation job where we compare the data schema and distribution for these requests with the ones produced by the training workflow. If the schema & data distribution are different, we will notify the developers that something is wrong. We could also run this as a cron job such that we do the data validation on new requests every X hours or days.
Furthermore, if the amount of requests is large we could sample these requests randomly such that we only do the data validation on a percentage of the requests.
Below is an overview of this architecture:
graph LR A[User] <--> B[Online inference service] B --> C[Data validation service]
Distribution skew versus drift
Training-serving skew occurs when the data distributions for training and serving is different from the beginning. As an example, imaging that one of our input feature is the price of a product. During training, we have assumed that the price is in Euros. However, during serving we accidentally input prices in USD, which causes the price feature to have different distributions between training and serving.
Sometimes however the distributions are the same in the begging but drifts apart over time. Imagine the price feature described above. Now instead assume that the price is in Euros for both training and serving. But due to inflation, the prices increases over time. Therefore, the distributions drifts apart, causing model performance to degrade. This is called a distribution drift.
Metrics to determine distribution skew / drift
In a supervised learning setting, we are typically trying to estimate \( P(Y|X) \) where \( X \) is our features and \( Y \) is the target. That is, we are trying to estimate the probability distribution of the target variable, given our features. For a classification problem, we get the discrete probability distribution over the class given a feature input.
Using Bayes' theorem we get that:
\[ P(Y|X) = \frac{P(X|Y) P(Y)}{P(X)} \]
Now, if any of the probabilities \( P(Y|X) \), \( P(X|Y) \), \( P(Y) \) or \( P(X) \) changes between training and serving the model performance usually degrades. Since \( P(X|Y) \) is neither estimated during training or observed directly it's usually skipped for monitoring distribution drifts. So during serving we cam compare:
- \( P(X) \), called covariance drift.
- \( P(Y)\) , called label drift.
- \( P(Y|X) \), called concept drift.
The difference in \( P(X) \) between training and serving can always be compared since we observe the features before being send into our model.
\( P(Y) \) and \( P(Y|X) \) is a bit more tricky to observe potentially. This is because we need access to the ground truth of the target values, which is not always possible to obtain. This usually depends on the task at hand. Imaging the following two systems:
A machine learning system that is trying to predict whether a user will click on an ad or not. Now in our internal system, we will most likely keep track of whether the user clicked on it or not. Therefore we can estimate \( P(Y) \) and \( P(Y|X) \) and compare it between training and serving.
A machine learning system trying to predict whether an image is a dog or a cat. Now unless the user gives us feedback on if the image was correctly labelled or not, we cannot determine the ground truth. Therefore, estimating \( P(Y) \) and \( P(Y|X) \) during serving will be impossible, unless we label the incoming images ourself.
Infrastructure setup
Developing machine learning models requires a lot of trial and error. For example, we might try different features, model architectures and so on. However, while having models in production we also want to separate these environments such that we don't accidentally make changes in production, which can cause an outage of our service. Therefore it's recommended to separate these clearly.
The different environments should also be completely separated. This includes model serving, feature creation job, training jobs and so on.
What do we mean by environments here? In Google Cloud & Amazon Web Services this could be different projects. In kubernetes this could be different namespaces.
Different environments
Sandbox
The sandbox environment is used for testing out new ideas where we are unsure whether they will work out or not. For example, we might introduce a new feature that we hope will increase the accuracy of the model. This would require us to change the feature creation job, as described in Decoupling feature creation from training & serving workflow. Now this could cause issues if we accidentally introduce a bug into the job if we ran it in the production environment. Therefore, it's recommended to try these experimental changes in our sandbox environment.
Staging
The staging environment is simply used to verify changes before they go into production. Staging should mirror production as much as possible.
Production
Production is used to serve actual users. For example, let say you have a model that serves recommendation for a video sharing site. When a user opens the app and sees video recommendations these comes from our production environments.
Before changes goes into production we should be sure that they work, which we should test in staging.
Promotion to production
If this would have been a non machine learning service, we could simply deploy the code to production. What makes machine learning products a bit more complicated is that we have two parts that needs to go into production, code and a trained model. So how can we do this?
There are two common patterns for this:
Completely separated
Here we have completely separated staging & production. This means that we will train our model in staging for testing. If everything works as expected, we can run the training workflow again in production in order to get the production model, which will be used for serving. Below is a overview of this setup:
graph TB subgraph Production H[Training pipeline] --> G[Model registry] G --> J[Model endpoint] end subgraph Staging D[Training pipeline] --> E[Model registry] E --> F[Test endpoint] end subgraph Sandbox A[Training pipeline] --> B[Model registry] end
The drawback of this approach is that we need to train the model twice, once in staging and once in production. If the training job is computationally heavy this can become expensive. The advantage is that we have two completely separate environment, making accidental changes less likely.
Transit model from staging to production
The other approach is to train the model in staging. Furthermore, we will validate that the model works as expected in staging. Once we are confident that the model is works as we want, we can transfer the model into the production environment (with the corresponding serving code). Once in production, it will be used for serving.
graph TD subgraph Staging D[Training pipeline] --> E[Model registry] E --> F[Test endpoint] end subgraph Production E -->|copy| H[Model endpoint] end subgraph Sandbox A[Training pipeline] --> B[Model registry] end
The drawback is that we couple staging & production; however we only need to train the model once.