Ir al contenido
7220

Blog


How We Applied Client-Side Caching to Improve Feature Store Performance by 70%

May 3, 2022

|
Kornel Csernai

Kornel Csernai

At DoorDash, we make millions of predictions every second to power machine learning applications to enhance our search, recommendation, logistics, and fraud areas,  and scaling these complex systems along with our feature store is continually a challenge. To make predictions, we use a microservice called Sibyl Prediction Service (SPS) written in Kotlin and deployed in Kubernetes pods with load balancing to serve all our prediction requests. These requests typically take features as inputs and produce a return value that drives a product decision. A typical feature value will capture the number of items in someone’s order (numerical), be an identifier of the market they are in (categorical), or be a representation of a menu item they are ordering (embedding). We can also represent more complex data types (such as images) as vectors.

These feature values are frequently loaded from our feature store, which is powered by a set of  large Redis clusters (in-memory key-value stores). We use multiple TBs of memory in our Redis clusters, so this service is a significant source of cost and maintenance. Additionally, prediction requests are performed online and need to be served in a matter of milliseconds with strict integrity requirements.

We investigated the idea of adding a caching layer at the microservice level to achieve better request efficiency and ease the load on Redis, expecting it to make our system more scalable. We instrumented SPS with some logging to understand request patterns. After our initial analysis, we found that a large fraction of requests are repeated and wasteful, making a strong case for the efficiency of an in-process caching layer that we proposed.

Our feature store has certain optimizations in place, such as encoding, compression, and batching. We will explain these in detail so you can better understand our experiment setup. Once we decided on the instrumentation steps, we investigated various caching libraries to get the most performance out of the system.

To maintain system stability, we rolled out the caching solution in phases, with rigorous verification steps along the way. After a few iterations, we found that the real-world results closely match our simulations, achieving 70%+ cache hit rate.

How we use ML features to make predictions in Sibyl Prediction Service today

How do features end up in the feature store? Using a centralized method, we take ground truth data from our database and upload them using a specific format to our feature store.

For batch features (such as long-term statistical features), this happens on a periodic basis, such as on a daily basis orchestrated by our internal integration called Fabricator. Real-time features, on the other hand, are uploaded continuously using Kafka queues orchestrated by Riviera, our feature engineering framework.

In both cases, the features are stored in a Redis store in a specific format, which will provide important context for understanding the design principles behind our features caching solution.
Internally, we name features using a convention used to encode the aggregation type, a brief description, and the features type. We then hash these feature names using xxHash to reduce the repetitive storage costs of long strings:

xxHash32(daf_cs_p6m_consumer2vec_emb) = 3842923820

We then associate each of these features with entity_ids, which are internal identifiers of consumers, Dashers (delivery drivers), merchants, deliveries, etc. To be able to retrieve multiple feature values for a given entity ID in one request, we create hashmaps in Redis keyed by the entity ID, as follows:

HSET entity_id xxHash32(feature_name) feature_value

During a prediction request, we retrieve one or more feature values for all of the entity IDs involved:

HMGET entity_id1 xxHash32(feature_name1) xxHash32(feature_name2) …
HMGET entity_id2 xxHash32(feature_name3) …
…

which returns the mapping

{
entity_id1: {feature_name1: feature_value1,
             feature_name2: feature_value2, …}
entity_id2: {feature_name3: feature_value3, …},
…
}

Some nontrivial feature values (such as lists) can be very long, which we then serialize using protocol buffers. All of the traffic sent to the feature store is then compressed using the Snappy compression method.

The design principles behind our feature store can be found in more detail in an earlier post about our feature store.

Manténgase informado con las actualizaciones semanales

Suscríbase a nuestro blog de ingeniería para estar al día de los proyectos más interesantes en los que trabaja nuestro equipo.

The lifecycle of feature store requests

In a prediction request, SPS can receive multiple prediction queries; and for each of those, it takes as input a set of feature names and corresponding entity IDs of interest. Some of these features are provided by the upstream client since it has the context (for example, the current search query term provided by our customer), but most features need to be retrieved from the feature store. Stored features are typically statistical features over a period of time and can be aggregated. For example, the average order size from a merchant in the past seven days.

During the prediction request, the feature values get populated in the following order:

  1. Features passed in from the request
  2. Features retrieved from Redis
  3. Default feature value, if the feature is not in Redis
Figure 1: The process of feature retrieval throughout the lifecycle of a prediction request

We use a set of sharded Redis clusters, where we partition data by use cases. For example, we have a separate shard for the use cases of search and DeepRed (the system at the center of our last-mile logistics platform). This allows us to scale the storage cluster for each use case independently. We make batch requests to Redis for each entity ID, which typically have 10-100 feature names. We retrieve tens of millions of key/value pairs from our Redis clusters on average per second.

The feature store’s scalability problem

Our feature store is a massive and very costly database that needs to support billions of requests we make each day to retrieve ML features. We store features for DoorDash customers, merchants, Dashers, and deliveries. These features can either be batch or real time features and account for billions of rows in our feature store.

Our primary way of storing data is Redis, which fits our needs well since there is a strong requirement for low latency predictions. These predictions need to pull in hundreds of features values for each prediction. At the same time, this cluster has to be highly available for our prediction requests so we use a large number of instances to balance the load. Since Redis is an in-memory store, we need to host our data in servers with large amounts of memory. Having servers with lots of memory in turn drives up our compute costs.

Cost efficiency is an important aspect of any platform, and DoorDash’s prediction platform is no different. The feature store is a large contributor to our costs, so tackling this scalability problem has been the focus of our team. To this end, we ventured to explore the possibility of using alternative storage systems (that might be slower but more cost effective) for our features and explore caching. We hypothesized that most requests to our feature store are repeated and therefore are wasteful, making caching an attractive way forward.

By implementing caching, we expected to significantly improve: 

  • Latency: the time it takes to perform predictions. Faster predictions can unlock more use cases and lead to a better end-user experience.
  • Reliability: Consistently performing predictions in a timely manner means that our predictions are higher quality since they fall back to a baseline less often.
  • Scalability: As the scale of DoorDash grows and we are using machine learning for more use cases, we can make predictions in a more cost-effective manner.

Ultimately, even if we don’t realize the above gains immediately, caching is a very promising path forward for us because it unlocks storage backends that are slower but more cost effective than Redis.

How caching can help us improve online prediction scalability and reliability

Before embarking on a large migration project, we wanted to gather more information to estimate the return on investment on our idea. Building out a functional testing environment in production was a larger investment and comes with the risk of negatively impacting production traffic, so instead we first set out to validate our idea using simulation.

Our main idea was to capture live production network traffic originating from our prediction microservice to our Redis cluster. We instrumented SPS in a nonintrusive way to get an idea of what requests are sent to Redis. We used the well-known tool pcap to capture network packets and save them to a file, and then rearranged these packets to the correct order with a tool called tcpflow.

These TCP packets looked like this:

HMGET st_1426727 2496559762 1170383926 3383287760 1479457783 40892719 60829695 2912304797 1843971484

In this example, HMGET will request eight different feature values pairs from the hashmap corresponding to the st_1426727 entity_id. The keys are xxhash’ed values of feature names in 32-bit integer format.

Once we had a large enough sample of these HMGET commands, we ran a custom Python script to parse the requests and simulate various caching scenarios with the goal of estimating cache hit rate.

import sys
from functools import lru_cache
hits = misses = 0
cache = set()
 
@lru_cache(maxsize=100000)
def lru_get(x):
   pass
 
def hit_rate(w):
   if not w:
       return 0.0
   return sum(w) / len(w) * 100.0
 
with open(sys.argv[1], encoding="utf8") as f:
   for x in f:
       x = x.strip()
       parts = x.split()
       if not parts:
           continue
       if (parts[0] != "HMGET"):
           continue
       for i in range(2, len(parts)):
           key = (parts[0], parts[1], parts[i])
 
           lru_get(key)
 
           if key in cache:
               hits += 1
           else:
               cache.add(key)
               misses += 1
 
lru_info = lru_get.cache_info()
lru_hitrate = lru_info.hits / (lru_info.hits + lru_info.misses) * 100.0
print("Infinite memory: {hits}, Misses: {misses}, Cumulative Hit Rate: {hits/(hits+misses)*100.0}")
print(f"LRU with size {LRU_SIZE}: {lru_info.hits}, Misses: {lru_info.misses}, Cumulative Hit Rate: {lru_hitrate}")

With this framework, we were able to run various simulations offline to find out how the cache would behave.

Figure 2: Simulated partial cache hit rate for the last 10,000 requests over five minutes in a single pod for varying sizes of  LRU

This experiment provided some valuable insights. In our benchmarks, we’ve seen that we can achieve close to 70% cache hit rate with a cache capacity of 1,000,000. We also discovered that the cache gets saturated after 15 minutes of production traffic.

To summarize, our simulation methodology was as follows:

  1. Run tcpdump on production hosts to gather outgoing traffic to Redis for 15 minutes.
  2. Parse Redis requests data to get the list of keys retrieved.
  3. Simulate caching by looking at repeated keys.
  4. Plot graphs that show the results of caching over time.

How we implemented and measured the effects of pod-local caching on prediction performance

As the first layer of caching, we set out to implement an in-memory, pod-local cache for our SPS microservice. The idea was to implement the cache logic around the existing feature retrieval logic – returning early if there is a value stored in the local cache.

Since we have billions of distinct feature values being processed through SPS, we cannot afford to store all of those values in each of our pods. We have to limit the size of our cache and when it becomes full, evict some values (preferably the ones we don’t need). Therefore, we needed to come up with a cache eviction scheme. There are many different cache invalidation schemes – for our use case, we chose an LRU invalidation scheme.

SPS is implemented in Kotlin and so we sought out a way to integrate an LRU cache with our microservice. Since SPS is a multithreaded application serving a huge throughput of requests, we needed to have an implementation that can operate with high concurrency. Therefore, the cache implementation had to be thread-safe. There are different strategies to achieve thread safety, but implementations typically use read-write locks.

Another consideration was observability: We needed to know how our cache performs in a real-world environment and whether it matches our expectations set in the offline experiments. At DoorDash, we use Prometheus to log metrics, which can be viewed on Grafana graphs. For the LRU cache, we wanted to log latency, hit rate, cache size, memory usage, request count, and cache correctness.

Efficient caching solutions in Kotlin

We set up a base interface for our cache in Kotlin:

interface Cache<K, V> {
    // Returns the value associated with the key, or null if the value is not in the cache.
    operator fun get(key: K): V?

    // Inserts a key/value pair into the cache, overwriting any existing values
    fun put(key: K, value: V)

    // Deletes a specific key in the cache
    fun delete(key: K)

    // Deletes all keys in the cache
    fun clear()

    // Returns the estimated size of the cache
    fun size(): Int
}

A basic, inefficient, and unbounded implementation of a cache might look as follows:

class UnboundedCache<K, V> : Cache<K, V> {
    val store: MutableMap<K, V> = Collections.synchronizedMap(mutableMapOf())
    override fun get(key: K): V? {
        return store[key]
    }
    override fun put(key: K, value: V) {
        assert(value != null)
        store[key] = value
    }
    override fun delete(key: K) {
        store.remove(key)
    }
    override fun clear() {
        store.clear()
    }
    override fun size(): Int {
        return store.size
    }
}

We can add LRU caching to the class by using LinkedHashMap with a custom removeEldestEntry:

class LRUCache<K, V> (private val name: String, private val capacity: Int, private val loadFactor: Float = 0.75f) : Cache<K, V> {
    private var store: MutableMap<K, V>

    override fun size(): Int {
        return store.size
    }

    init {
        assert(capacity > 0)
        store = object : LinkedHashMap<K, V>(capacity, loadFactor, true) {
            override fun removeEldestEntry(eldest: MutableMap.MutableEntry<K, V>): Boolean {
                return size() > capacity
            }
        }
        store = Collections.synchronizedMap(store)
    }

    override fun get(key: K): V? {
        val value = store[key]
        return value
    }

    override fun put(key: K, value: V) {
        assert(value != null)
        store[key] = value
    }

    override fun delete(key: K) {
        store.remove(key)
    }

    override fun clear() {
        store.clear()
    }
}

In the above code, the Collections.synchronizedMap(store) clause makes the class thread-safe. However, it will not perform well in a high concurrency environment, because we lock the whole internal linked list on each lookup and insertion. Therefore, this code is purely for illustration purposes and should not be used in a production application.

For production use cases, we experimented with multiple open-source libraries and found the Caffeine library (written in Java) to work best. It provides a clean API and is highly performant. It has great benchmarks compared to other open-source libraries.

Integrating the cache library with SPS

Once we had the caching library set up, the next step was to modify the SPS feature retrieval logic. The idea was to intercept the feature retrieval call and check the cache for feature values before sending out the requests to Redis. We then retrieved the missing values from Redis and populated our cache before returning them to the caller.

Figure 3 : Redis feature retrieval with caching.

While the majority of feature values are unchanged over short periods of time, there are some real-time features that get updated more frequently. We had to implement a cache eviction strategy to maintain prediction integrity. For batch features, we mark the features that get uploaded to Redis by their upload time and poll this information periodically in SPS. When a feature is uploaded, we evict all feature values for that particular feature. This eviction process can be wasteful in some cases; however, we found that it strikes a good tradeoff to maintain integrity.

Safely rolling out the live caching abstraction in a business-critical environment

Since SPS is a business-critical service, we had to pay special attention to making sure introducing complexity through caching does not interfere with regular production requests. We had to be very careful not to increase latency or use incorrect features when making predictions, as an increase in load can lead to severely degraded user experience. Therefore, we thoroughly instrumented our service with metrics and used a dry-run rollout process. Internally, we track various metrics such as availability, latency, and prediction accuracy, which we extended with cache characteristics such as hit rate, cache size, and correctness.

As the first step of the caching experiment, we marked only a certain set of features as cacheable. This allow list allowed us to roll caching out feature by feature and not have to worry about real-time features initially.

We then added a code path parallel to the regular Redis requests that simulates cache population and retrieval and also compares the final result with what the production Redis request would yield. This parallel process was run in an asynchronous thread to not cause a latency overhead in the critical flow of requests. In our production setting, we found that Caffeine greatly outperformed Java’s LinkedHashMap implementation and Guava, consistent with our earlier benchmarks.

During this dry-run process, we observed how the cache size grew as we were receiving requests. The dry run was also helpful in determining what cache hit rate we could expect. Initially the cache hit rate was very low due to various implementation bugs, so our rollout precautions already paid off.

We compared expected and cached feature values over a longer period of time to make sure the feature values returned were as expected. These checks were immensely helpful in identifying gaps in feature value refreshing logic.

When we were confident enough that we could serve cached requests live, we still controlled the percentage of requests sent to the cache and limited the size of the in-memory cache. We followed this process so that we did not overload the production environment.

After these series of tests, we launched the caching live and observed an improvement in latency and reliability. It’s also worth noting that the cache hit rate characteristics closely match what we saw in our simulation.

Figure 4: Cache hit rate for one of our SPS clusters. The sharp drop represents a deployment, while the smaller drops are due to feature refreshing. Steady cache hit rate is at around 78%.

Further improving overall feature retrieval performance in SPS

Implementing the local cache is the first step in a comprehensive caching strategy. To maximize the caching performance and correctness, we plan to implement additional improvements to address some inefficiencies:

Cold start problem: In a production environment, the microservice processes often get restarted. This can be due to a resource allocation or an ordinary service deployment. When the process restarts, it starts with an empty cache, which can severely affect performance. Shadowing production traffic to a newly started process and “warming up” its cache can solve the cold start problem.

Deserialization overhead: We are repeatedly decompressing and deserializing the raw Redis responses in our cache. While the cost is generally small, it can still add up, especially for large objects such as embeddings. We can modify our workflow to instead store deserialized objects in memory:

Figure 5: Optimized caching workflow with deserialized objects stored in memory. The major difference is the elimination of the decompression step and the type of the saved object in the cache.

Cache sharding: Another potential optimization is to improve the cache hit rate by sharding the caches instead of storing data independently on each pod, effectively as replicas. A useful heuristic can be to shard requests by an application-specific ID. The intuition is that similar requests will overwhelmingly reuse the same features. For example, we can shard ETA (estimated time of arrival) prediction requests by the location of the merchant and expect a lot of the features for that merchant to be likely present in the cache associated with the shard of that merchant, as compared to a generic cache. This method, if implemented correctly, can effectively multiply the cache capacity by the number of pods we have.

Real-time features: Real-time features require special attention, since they are not uploaded in one big batch but rather gradually over time. To track updates of real-time features, we can have Riviera write the updated features to both our feature store and a separate Kafka queue. With that in place, SPS can read from that queue and update its cache.

Figure 6: Real-time feature cache refreshing with Kafka queues

Conclusión

At DoorDash, we successfully rolled out a caching layer for our gigascale prediction and feature store infrastructure. Caching methods have been successfully deployed in low-level (such as CPUs) and high-level (network request) environments for a variety of applications. The concept is simple, but implementing it requires domain knowledge and experimentation. We demonstrated that with the right abstractions and benchmarks, it can be gradually and safely rolled out in a large-scale production environment.

Feature stores are commonly used in machine learning operations and many teams are now facing scalability challenges. Caching is an effective way of improving scalability, latency, and reliability while maintaining integrity. By adding caching layers, the ground truth data can be stored at a more cost-effective and scalable level without sacrificing latency.

Agradecimientos

I would like to give special thanks to

  • Brian Seo for adding Fabricator upload status marking,
  • Kunal Shah for driving the development of Riviera and Fabricator,
  • Arbaz Khan for the development of our feature store and guidance on SPS
  • Hien Luu for general advice and support on our feature caching initiative.

About the Author

Trabajos relacionados

Ubicación
Nueva York, NY
Departamento
Ingeniería
Ubicación
Sunnyvale, CA; San Francisco, CA
Departamento
Ingeniería
Ubicación
Toronto, ON
Departamento
Ingeniería
Ubicación
New York, NY; San Francisco, CA; Sunnyvale, CA; Los Angeles, CA; Seattle, WA
Departamento
Ingeniería
Ubicación
San Francisco, CA; Sunnyvale, CA
Departamento
Ingeniería