Skip to content

Blog


Enabling Efficient Machine Learning Model Serving by Minimizing Network Overheads with gRPC

July 20, 2020

|
Arbaz Khan

Arbaz Khan

The challenge of building machine learning (ML)-powered applications is running inferences on large volumes of data and returning a prediction over the network within milliseconds, which can’t be done without minimizing network overheads. Model serving that is not fast enough will slow down your applications, jam up requests upstream, degrade user experience, and hamper operational efficiency. 

At DoorDash, we confronted similar issues in our gRPC-based model serving setup for search ranking with network overheads taking up to 50 percent of service response time. After investigating the issue, we were able to reduce our overall response time by 50 percent and the network overheads by 33 percent. In this article we discuss how we managed to minimize network overheads related to model serving by enabling client-side load balancing, payload compression, and transport-level request tracing under gRPC’s Kotlin framework

Background

Before we dive into how we reduced latency and network overheads let's quickly review some key context including a short primer on gRPC and why it's important to us. We are also going to briefly discuss our model serving setup at DoorDash before defining network overheads.

What is gRPC and why do we use it?

gRPC is a modern-era server-client communication framework built over the remote procedure call (RPC) paradigm. To get a quick understanding of gRPC, we’d recommend checking out its FAQ section (also look at its design principles if you want to take a deeper dive).

DoorDash migrated to use gRPC from REST as a standard for communication between microservices. The major benefits of gRPC in context of efficient network communication are:

What exactly are network overheads?

Difference between the time taken on the client-side to get a response (response-time) and the time taken on the server-side to process a request (processing-time) can be defined as network overhead. Different factors that go into network overhead include message serialization, deserialization, and time spent in the transport layer waiting to be put in the application layer (request/response queuing). These metrics are critical as they were the primary metric we were aiming to solve. 

Figure 1. Serialization, deserialization, and network queuing are all factors that add to network overhead.

How do we realize model serving at DoorDash?

All of the machine learning use cases at DoorDash such as search and recommendation, fraud detection, optimizing dasher dispatch, and predicting delivery times are served through Sibyl, our prediction service cluster. The cluster is deployed as a Kubernetes service in AWS with each pod in the cluster composed of a Kotlin gRPC server. 

A majority of model serving use cases are an essential part of the DoorDash user experience. All of these use cases mean we have to ensure our system can handle approximately one million predictions a second with a high reliability. In our recent blog article, Meet Sibyl, we talked about the various optimizations that went into building the service. Overall, we optimized for performance over design complexity. These optimizations were:

  • Running models in their native format using a C++ integration to mitigate computational overheads
  • Batching prediction requests in pre-tuned batch sizes for optimal throughput
  • Partially passing feature data with the requests to save on time-fetching at the server

Why is it hard to make model serving efficient at scale?

Next, we will highlight some of the key challenges specific to model serving. We will also highlight the need of addressing network overheads by elaborating upon their negative impact. Lastly, we will talk about scale in context of DoorDash to get a sense of the prevailing challenges.

Characteristic challenges of model serving

Now we will go over some of the key challenges of model severing including how large payloads can slow down model serving, how heavy computations increase response time, and why a faster response time is expected from an ML service in comparison to an independent service. 

Large payloads slow down model serving

Model services characteristically work with large request/response sizes. The size of a request sent to a model service is determined by the number of entities involved and features associated with each entity.

For example, in the restaurant ranking use case, hundreds of stores are included in a single request to be ranked by a search query (search ranking). Furthermore, each store can have several dozens of features passed in with the request, such as ratings, has_photos, and consumer_store_category_match. With this, the raw payload size can be in the range of megabytes (A 100 mbps connection takes 80 ms to transfer a file size of 1 MB).

Heavy computations increase response time 

The size of a feature matrix and prediction algorithm governs the complexity of a single model computation. For example, our order recommendation use case runs a PyTorch model over embeddings of menu items, each of which have an excess of 100 dimensions.

Additionally, there can be multiple model computations within a single request. For example, to optimize Dasher dispatch, we need multiple predictions such as estimating the time for a Dasher to reach a restaurant, the time to prepare food by the restaurant, and making corrections on times quoted by merchants.

Expectation of a faster response time versus an independent service

As you would expect, a slow service degrades the end user experience. More often than not, ML services are not the only services involved in meeting a business use case. For example, to render DoorDash’s homepage, a request flows from your device to a server component at DoorDash, which then makes a call to the feed-service, which fetches content such as a list of stores, carousels, and offers. To get a list of restaurants, the feed-service calls the search-service, which gets the list and ranks it using the sibyl-prediction-service

Meeting a machine learning business case might involve multiple service calls before getting results back to the user. Here, we show a call stack for a restaurant ranking model.

As a result, we require a faster response time for these services (for optimal user experience) in comparison to an independent service, such as the menu-service, returning menu items for a given restaurant. 

Note: The expectation of a faster response time holds true for any service which gets called by another service in serving a business use case.

Pseudo-traffic from experiments

Analytics and Data Science teams conduct shadowing and A/B experiments in production to improve an existing model or explore new AI use cases. These experiments increase traffic and resource use by a factor equal to the number of live experiments and the number of queries sent by these experiments.

Network overheads are not nominal

Typically, application services work with small request payloads and don’t feel the effect of network overheads. But as payload size increases, requests become increasingly slower to transport, serialize, and deserialize. In Figure 3, below, we share one of our use cases in which network overheads superseded request processing time.

Application services with large payload sizes can cause situations where network overheads (response time - processing time) are larger than service processing times.
Figure 3. Application services with large payload sizes can cause situations where network overheads (response time - processing time) are larger than service processing times.

Furthermore, a service whose throughput underweights its rate of incoming requests will suffer from request queuing. In such an event, processing of requests will begin much later than the time when requests were originally sent, thereby contributing to network overheads. The request queue consumes service resources and bottlenecks throughput of upstream services. 

For example, in Figure 2, referenced earlier, if the search-service has a request queue, calls made to its parent, the feed-service, will suffer significant delays and will, in turn, affect the feed-service’s throughput. Thus, request queuing may impede request throughput of the overall system in a super linear fashion and should not be ignored

Challenges of scale at DoorDash

At DoorDash, all ML use cases combinedly pose high volume challenges for model serving. For an insight into the volumes, our search ranking use case alone requires us to handle 600k predictions/sec. And we currently serve over a dozen use cases. 

If the rest of our application services operate at a request rate of X, model serving needs to be able to operate at 10X to 20X because for each user request, there are multiple ML use cases that need to be served. Our overall handling-of-scale expectations currently fall in the range of 1 to 10 million predictions/second. With the increase in use cases and consumer growth, these requirements keep rising.

How do we optimize our gRPC setup for model serving at DoorDash?

Here, we will talk about how we use gRPC to minimize network overheads, helping to limit the impact of problems we talked about above. We will explain why you would need client-side load-balancing and how it can be implemented with a simple server configuration. Next, we will cover the importance of payload compression and how it improved our setup. Finally, we will elaborate upon how you can use gRPC to study your network overheads using transport-level tracing. Since most of this is not covered as developer-ready documentation in gRPC, we will include code-snippets to help materialize our suggestions.

Using client-side load-balancing to evenly distribute request-load

It is imperative that a service should use load-balancing to evenly distribute its available resources to request-load. It is even more important with gRPC because a connection is reused for multiple calls. The heavy compute nature of making predictions could quickly overwhelm a server. One possible implementation is to employ an external load-balancer to route traffic evenly. We instead are able to use Kubernetes cloud-native service discovery, DNS, and client-side load balancing to achieve the same thing with the benefits outlined below. 

Why use client-side for load-balancing?

  1. It prevents a request hop to the external load-balancer.
  2. Your load-balancer may not support gRPC traffic because of its HTTP/2 protocol. For instance, the AWS network load-balancer doesn’t support HTTP/2 traffic.

The cost of using client-side load-balancing is added code complexity at the client-side. Next, we illustrate how we avoid that cost by using a simple implementation. 

An easy method to enable client-side load-balancing

We use short-lived connections to the server that reset a client connection every 10 seconds to realize load-balancing. This can be implemented using a server configuration that can be set using gRPC’s NettyServerBuilder and maxConnectionAge as below:

val server = NettyServerBuilder
   .forPort(ServiceConfig.PORT)

    ...
   .maxConnectionAge(ServiceConfig.MAX_CONNECTION_AGE, TimeUnit.SECONDS)
   .build()

With this configuration and MAX_CONNECTION_AGE set to 10 seconds, we get a decent resource distribution. Figure 4, below, shows a graph demonstrating how our 30 pod-clusters almost homogeneously use CPU over time under (periodically) varying request loads:

Our server configuration results in nearly collinear CPU utilization over time for 30 pods in a cluster.
Figure 4. Our server configuration results in nearly collinear CPU utilization over time for 30 pods in a cluster.

What are alternative ways to implement client-side load-balancing?

If the above configuration doesn’t work for you, the ideal solution involves setting up a service mesh by using a sidecar container on the client-side. The sidecar container is responsible for routing traffic emanating from a client to a service in a load-balanced manner. Linkerd is a Kubernetes-specific solution, while Istio is a platform-independent option designed to achieve load-balancing.

Using payload compression to reduce network packet sizes 

As payload sizes grow, compression is the natural solution to help reduce the packet sizes. Smaller packets are faster to transport but compression adds overheads of serialization and deserialization. That’s why applicability and choices of a compression algorithm need to be evaluated before putting them to use.

We did not settle down with gRPC’s default compression

After running benchmark tests using a few algorithms on one of our characteristic requests, we found Zstandard (zstd) algorithm performs the best in terms of end-to-end service latency. The reduction in latency from zstd over gzip (the default compression option from gRPC) was over 12 percent. Below is a snippet from our benchmark test comparing raw latency (in milliseconds) for LZ4, Snappy, gzip, zstd, and no compression.

In comparison among five compression algorithms, zstd showed the lowest end-to-end latency.
Figure 5a. In comparison among five compression algorithms, zstd showed the lowest end-to-end latency.
After instituting zstd (green line), we see significantly reduced latency in our production cluster.
Figure 5b. After instituting zstd (green line), we see significantly reduced latency in our production cluster.

How to enable compression with gRPC?

Since the gRPC framework itself provides very little documentation, here’s a few of our code snippets to enable zstd compression to help you get started:

import com.github.luben.zstd.ZstdInputStream
import com.github.luben.zstd.ZstdOutputStream
import io.grpc.Codec
import java.io.InputStream
import java.io.OutputStream

// Implement io.grpc.Codec interface for zstd compression using zstd library
class ZStdGrpcCodec : Codec {
   override fun getMessageEncoding(): String {
       return "zstd"
   }

   override fun decompress(inputStream: InputStream): InputStream {
       return ZstdInputStream(inputStream)
   }

   override fun compress(outputStream: OutputStream): OutputStream {
       return ZstdOutputStream(outputStream)
   }
}

// Register zstd codec to a factory enabling gRPC to automatically use it for compression/decompression
object CompressorRegistryFactory {

   fun newCompressorRegistry(): io.grpc.CompressorRegistry {
       val registry = CompressorRegistry.getDefaultInstance()
       registry.register(ZStdGrpcCodec())
       return registry
   }

   fun newDecompressorRegistry(): io.grpc.DecompressorRegistry {
       return DecompressorRegistry.getDefaultInstance()
               .with(ZStdGrpcCodec(), true)
   }
}

At the client-side, to be able to send compressed requests, declare CompressorRegistryFactory in a similar manner and then initialize the stub using it:

private val stub: GreeterCoroutineStub = GreeterCoroutineStub(channel)
compressorRegistryFactory.newCompressorRegistry();
stub = stub.withCompression(environment.sibylPredictionCompressorName());

How to find the right algorithm for your needs?

Since effects of compression are dependent on the input, you should run benchmarks on your requests to determine the best algorithm. One can quickly test performance on a given file using lzbench to shortlist algorithms for benchmark tests. Then, you should run benchmark tests on your services to get accurate comparison measures.

Using transport-level tracing to investigate network overheads

gRPC’s tracing capabilities aim to bring service developers closer to the transport layer. It is possible to intercept a message and know exactly when a client call entered the application layer or when a response was sent to the transport layer

Why should a service developer worry about transport?

Network overheads can add significant delays to service latency. Instrumentation at the application layer is not sufficient to investigate these delays. For instance, response may be generated by the service in 20ms but sent on the wire after 100ms. Or it could be sent on the wire almost immediately but received by the client after 100ms. If you only rely upon time measurements inside your method call, it won’t be possible to tell which service (caller or callee) to investigate to mitigate latency slowdowns.

How do we use gRPC tracing for visibility into a request flow?

We use all the available hooks in gRPC tracing to get visibility into transport at client-side and server-side for our production traffic. These tracing measurements are then reported to Wavefront, our observability tool. Figure 6, below, illustrates these hooks in action at server-side reporting the cumulative latency breakdown at a high level:

Figure 6. Using Wavefront, we get a snapshot of cumulative latency
Figure 6. Using Wavefront, we get a snapshot of cumulative latency

With this information, we are able to get a rough estimate of different components of the network overhead:

  • Network delay + request queuing ~ inboundMessage - requestCreated
  • Request deserialization ~ inboundMessageRead - inboundMessage
  • Response serialization ~ outboundMessageSent - predictionsMade
  • Network delay + response queuing ~ clientReceivedResponse - outboundMessageSent

Caution: For request deserialization, gRPC's documentation says that the event inboundMessage is fired as soon as the stream knows about the message, but doesn’t have a further guarantee as to whether the message is deserialized or not.

From the metrics shown in Figure 6, for example, we can tell that transporting and deserializing a request (inboundMessageRead - serverCallStarted) took ~2ms and another ~2ms to serialize response and send over wire (outboundMessageSent - predictionsMade). In all, it took us 5.6ms to make predictions, while the client sees the response 11ms after making the request. Thus, our network overhead is ~4.4ms.

Some sample code-snippets below can help you get started:

// define a factory to help create request tracers for each request
class ServerRequestTracerFactory() : ServerStreamTracer.Factory() {
   override fun newServerStreamTracer(fullMethodName: String?, headers: Metadata?): ServerStreamTracer {
       return ServerRequestTracer(fullMethodName, headers)
   }
}

// override ServerStreamTracer to add hooks to gRPC transport-level events
class ServerRequestTracer(
   private val fullMethodName: String?,
   private val headers: Metadata?
) : ServerStreamTracer() {
   private val startTime = Clock.defaultClock().tick - requestInitializedTime()

   private fun requestInitializedTime(): Long {
       // communicate the time when request was initialized using headers and read it here at the server-side
       val requestInitializedTime = headers?.get(RequestInitializedTimeHeader)
       val timeDiffMs = System.currentTimeMillis() - requestInitializedTime.toLong()
           // factor in inaccuracies of currentTimeMillis or time-sync issues
       return max(TimeUnit.NANOSECONDS.convert(timeDiffMs, TimeUnit.MILLISECONDS), 0L)
   }
   // report event to real-time monitoring tool using startTime and current timestamp
   private fun reportEvent() { .. }

   override fun serverCallStarted(callInfo: ServerCallInfo<*, *>) {
       reportEvent("serverCallStarted")
   }

   override fun outboundMessage(seqNo: Int) {
       reportEvent("outboundMessage")
   }

   override fun inboundMessage(seqNo: Int) {
       reportEvent("inboundMessage")
   }

   // similarly implement other gRPC tracing hooks such as outboundMessageSent, etc.
 
   companion object {
       // header definition
       private val RequestInitializedTimeHeader: Metadata.Key<String> = Metadata.Key.of(
               "x-request-start",
               Metadata.ASCII_STRING_MARSHALLER
       )
   }
}
// Initialize the server with the trace factory defined above
val server = NettyServerBuilder
   .forPort(ServiceConfig.PORT)
   ... 
   .addStreamTracerFactory(ServerRequestTracerFactory(sibylLogger, sibylStats))
   .build()

At the client-side, you merely have to ensure passing the header x-request-start with the value set to the time when the request was created. 

There are similar gRPC hooks available at the client-side to infer time spent in request serialization and response deserialization. These hooks can be enabled via a ClientInterceptor as illustrated in this gRPC CensusTracingModule.java documentation.

gRPC tracing led us to reduce our overall response time by 50 percent

When we initially rolled out our sibyl-prediction-service, we saw large unexplained network overheads. There was a sizable gap (~100ms) in latency reported at client-side vs latency reported at server-side. Using gRPC tracing, we noticed that most of this time (~50ms) was spent between predictionsMade and outboundMessageSent.

This observation made it evident that the issue was server-side, and upon closer investigation we discovered one of our logging calls outside predictionsMade scope was slowing us down. After removing the logging call, we cut the gap between predictionsMade and outboundMessageSent by 94 percent. This result led us to restructure our logging throughout the code-base, which caused our overall response time to drop by 50 percent.

Additionally, gRPC tracing also helped us in latency optimization experiments, such as comparing compression algorithms with a finer granularity since it can report time taken during serialization/deserialization.

Conclusion 

Through the testing and experiences described above, we learnt that network overheads can significantly slow down services and it’s important to address them. The impact is even more profound when working with large payloads typical when working with ML model serving, handling large traffic volumes, and having an expectation to get a response within milliseconds.

We used the use case of model serving methods at DoorDash to elaborate upon how network overheads can be minimized with the gRPC framework. We discussed how to efficiently and evenly distribute load in a service cluster. We also discussed the impact of compression in reducing network delays. 

Last but not least, we elaborated upon gRPC tracing and its benefits in providing a fine understanding of the time spent in a service call and evaluating latency optimization experiments. We also learnt that gRPC empowers application developers by bringing them closer to the transport layer, giving them novel opportunities for optimization.

As you might have noticed, these learnings extend beyond model serving and can be applied to any general setup of internal services. Setting a small connection age is sufficient to get you started on load-balancing. Using the right algorithm for compression can help you work with large request sizes or remove overheads when batching requests. gRPC’s transport-level tracing abilities can help you identify request queuing, serialization, and deserialization overheads.

Future work

In the near future, as we expand our scale of operations and work with multiple other machine learning use cases, we will encounter new challenges related to performance and we hope to explore other avenues for optimization.

Acknowledgements:

Thanks to Param Reddy and Hien Luu for your involvement in discovering these findings.

About the Author

Related Jobs

Job ID: 2959709
Location
São Paulo, Brazil
Department
Engineering
Location
San Francisco, CA; Seattle, WA; Sunnyvale, CA
Department
Engineering
Job ID: 2946763
Location
San Francisco, CA; Seattle, WA
Department
Engineering
Job ID: 2947418
Location
San Francisco, CA; Sunnyvale, CA; Los Angeles, CA; Seattle, WA; New York, NY
Department
Engineering
Location
San Francisco, CA; New York, NY
Department
Engineering