Skip to content

Blog


Functional Core, Imperative Shell – Using Structured Concurrency to Write Maintainable gRPC Endpoints in Kotlin

July 26, 2022

|
James Lamine

James Lamine

In the process of migrating from a python monolith to a service-oriented architecture using gRPC microservices in Kotlin, DoorDash Engineering had the opportunity to better define patterns for how we implement gRPC endpoints.

In a previous blog post, we discussed functional programming patterns in Kotlin and how they helped our team write cleaner and more effective code. In this post, we will expand on those ideas to show how we write gRPC endpoints using the functional-core, imperative-shell pattern. Combined with Kotlin's structured concurrency features, we can avoid many of the common difficulties of I/O handling, allowing us to focus on business logic.

In particular, our approach allows us to implement APIs which call many external services in parallel without creating an unmaintainable mess of callbacks or futures. By using the functional-core, imperative-shell pattern, we can write most of our gRPC endpoints in pure functional programming style by separating business logic from network I/O.

To illustrate these patterns, we'll walk through the code for a gRPC endpoint that calls three services using the functional-core, imperative-shell pattern. Then we'll build on that example to show how to use structured concurrency to make requests in parallel and lazy-load data.

For the sake of simplicity, this blog post only covers read-only APIs, but you can extend these patterns for write APIs as well.

Summary

In this blog post we will:

  • Introduce the functional-core, imperative-shell pattern
  • Use that pattern to write a gRPC endpoint that calls three downstream services
  • Use structured concurrency to make service calls in parallel and lazy-load data

Introducing the functional-core, imperative-shell pattern

One of the key ideas of functional programming is to use pure functions. A function is pure if, given the same input, it always returns the same output and has no side effects. Network requests aren't pure functions because the result depends on something other than the input - for example, there might be a different result based on network errors. Additionally, network requests can cause side effects such as writing to a database.

We want to write code in a functional programming style because it makes it easier to avoid bugs, but our gRPC service needs to make many network requests, and those aren't pure functions. How do we use functional programming while still doing network I/O?

Our solution is to use the functional-core, imperative-shell pattern. The basic idea of the pattern is to put all of our I/O in one place (the imperative shell), cleanly separating it from the rest of our program (the functional core). The shell fetches data and returns a DataModel that holds the result of each network request. Then we can write our business logic as a pure function that takes the DataModel as input and returns the response. The benefit of this pattern is that by separating the I/O, our business logic can be implemented using functional programming.

There are a number of excellent blog posts and videos around the web which explain the pattern in more detail. In this blog post, I'll demonstrate how we use the pattern in practice and how we leverage Kotlin's structured concurrency features to make our implementation easier.

The difference between business logic and I/O

One of the key advantages of this pattern is that it keeps business logic separate from I/O logic.

Business logic includes:

  • Deciding which pieces of data are needed
  • Transforming the data into a response
  • Returning different responses depending on what networking errors happened
  • If this is a write-API, deciding what data to write

Network I/O logic includes:

  • Making RPC calls to fetch data
  • Building an object to describe what happened as a result of the network request (either the response or error)
  • Configuring retries and timeouts
  • Logic to make multiple RPC requests in parallel

By isolating the two we can write business logic in a functional programming style while still robustly handling network errors.

Anatomy of a gRPC method

Next, let's walk through the code for an example gRPC method. At a high-level, our gRPC method fetches the data model and then transforms it into a response. Conceptually, we think of it as a composition of two functions:

getResponse( getDataModel( request ) ) -> response

In the context of the functional-core, imperative-shell pattern, getDataModel is the imperative shell which performs I/O, and getResponse is the functional core which looks at the results of that I/O and returns the response. Next, let's look at a practical example of how to implement each of these methods.

Code example: fetching data for a delivery

As a concrete example, let's write a gRPC method that fetches some information about a delivery. This method takes a delivery ID and returns the name of the customer and Dasher, our name for a delivery driver. It will make three service calls:

  • getDelivery - looks up a delivery by delivery ID, returns the dasher ID and customer ID
  • getCustomer - looks up a customer by customer ID
  • getDasher - looks up a dasher by dasher ID

Defining the API contract using Protobuf

Given the above requirements, we can write a protobuf specifying the request and response format:

service ExampleService {
  rpc GetDeliverySummary(GetDeliverySummaryRequest) returns (GetDeliverySummaryResponse);
}

message GetDeliverySummaryRequest {
  google.protobuf.StringValue delivery_uuid = 1;
}

message GetDeliverySummaryResponse {
  google.protobuf.StringValue delivery_uuid = 1;
  google.protobuf.StringValue customer_name = 2;
  google.protobuf.StringValue dasher_name = 3;
}

Defining a code skeleton

To implement our API, we define four methods and a data model class.

public suspend fun getDeliverySummary(request: GetDeliverySummaryRequest): GetDeliverySummaryResponse {
    try {
        validateRequest(request)
        val dataModel = getDataModel(request)

        return getResponse(dataModel)
    } catch (e: Throwable) {
        throw getErrorStatus(e).asException()
    }
}

The data model describes what happened during IO - it holds the response or error for each RPC call. Our business logic takes those responses or errors and decides what to do with them. We encapsulate all I/O in the getDataModel function - the rest are pure functions that handle the business logic.

Next, let's take a look at the empty signatures of each method so that we can get a better idea of how everything fits together before we implement each method. We need to define four methods plus one model class:

1. Validate the request

/** Validate a request, for example checking that all required fields are present
 *  and in the expected format.
 *  
 *  Throws an InvalidRequestException if the request is invalid.
 */
fun validateRequest(request: GetDeliverySummaryRequest) {

}

2. Get the data model

/** Fetches the data needed for this RPC method. All I/O happens in this method. */
suspend fun getDataModel(request: GetDeliverySummaryRequest): DataModel {

}

3. Convert the data model to a response

/** Converts the DataModel into the grpc response.
 *
 * Throws an exception if the DataModel is missing required data. */
fun getResponse(dataModel: DataModel): GetDeliverySummaryResponse {

}

4. Handle errors

/** Converts an exception into the appropriate gRPC response status. */
fun getErrorStatus(e: Throwable): Status {

}

5. Define the DataModel class

/** Holds the responses (or errors) from any network requests that this RPC method makes. */
data class DataModel()

Initial implementation of our API

Next, we'll go over the initial implementation of each method.

For the sake of simplicity, we don't use Kotlin's structured concurrency features in this initial example. Later we will expand the example to show how to use structured concurrency to make all our downstream RPC requests in parallel.

The DataModel class

The data model describes what happened during I/O. Specifically, it holds the response or error for each network request.

This allows us to separate the description of how to fetch data from the business logic of how to handle errors. For example, we may want to return an error response if some required data fails to load, but fall back on some default value if optional data fails to load.

We use Kotlin's Result<T> class to hold the response or error.

/** Holds the response (or error) from any network requests that this RPC method makes. */
data class DataModel(
    val delivery: Result<DeliveryResponse>,
    val customer: Result<CustomerResponse>,
    val dasher: Result<DasherResponse>
)

Validating the request

The code for request validation ensures that the request contains a valid delivery uuid.

/** Throws an InvalidRequestException if the request is invalid. */
fun validateRequest(request: GetDeliverySummaryRequest) {
    if (!request.hasDeliveryUuid() || request.deliveryUuid.value.isEmpty()) {
        throw InvalidRequestException("delivery_uuid is required")
    }
}

Getting the data model

Now for our I/O code. We get the delivery, using the customerId and dasherId from the response to get the customer and Dasher. Even if our service calls result in errors, getDataModel won't throw an exception because we're using the Result.runCatching and Result.mapCatching functions to catch any exceptions and wrap them in a Response object.

The getDataModel function is in charge of making service calls but not filtering or transforming those responses - that counts as business logic and we want to keep business logic out of getDataModel. So even though we only need the name field from the Dasher, the data model holds the entire Dasher RPC response.

/** Fetches the data needed for this RPC method. All I/O happens in this method. */
suspend fun getDataModel(request: GetDeliverySummaryRequest): DataModel {
    val delivery = Result.runCatching {
        DeliveryService.getDelivery(request.deliveryUuid.value)
    }

    val customer = delivery.mapCatching {
        if (!it.hasCustomerId()) {
            throw IllegalStateException("Delivery does not have a customer id")
        }

        CustomerService.getCustomer(it.customerId.value)
    }

    val dasher = delivery.mapCatching {
        if (!it.hasDasherId()) {
            throw IllegalStateException("Delivery does not have a dasher id")
        }

        DasherService.getDasher(it.dasherId.value)
    }

    return DataModel(
        delivery = delivery,
        customer = customer,
        dasher = dasher
    )
}

Converting the data model to a response

Now that we've handled I/O, the business logic for transforming the data model into a response is straightforward. Since all of the data is required, we're using Result.getOrThrow to throw an exception for failed RPC requests.


/** Converts the DataModel into the grpc response.
 *
 * Throws an exception if the DataModel is missing required data. */
fun getResponse(dataModel: DataModel): GetDeliverySummaryResponse {
    val response = GetDeliverySummaryResponse.newBuilder()

    response.deliveryUuid = dataModel.delivery.getOrThrow().deliveryUuid
    response.customerName = dataModel.customer.getOrThrow().name
    response.dasherName = dataModel.dasher.getOrThrow().name

    return response.build()
}

One might ask why we need a separate Result<> object for each DataModel field. If we're just throwing an exception every time there's an error result, why not have a single Result<DataModel> object instead of multiple Result<> fields?

For this simple example, separate Result<> fields aren't necessary since each RPC response is required. But what happens if the product requirements change and we want to fail gracefully by making the dasherName field optional? With a single Result<DataModel>, we'd have to refactor the DataModel, getDataModel and getResponse methods.

But with separate Result<> fields for each RPC call, we only need to change one line:

fun getResponse(dataModel: DataModel): GetDeliverySummaryResponse {
    val response = GetDeliverySummaryResponse.newBuilder()

    response.deliveryUuid = dataModel.delivery.getOrThrow().deliveryUuid
    response.customerName = dataModel.customer.getOrThrow().name
    response.dasherName = dataModel.dasher.getOrNull()?.map { it.name }

    return response.build()
}

The key here is that deciding how to handle different errors is business logic. By keeping that business logic in the getResponse method, we can implement new product requirements with minimal code changes.

Mapping exceptions to the appropriate gRPC response

Our RPC method can throw exceptions in two places - validateRequest and getResponse. The getErrorStatus method returns the appropriate gRPC response code depending on the exception.

Note that in order for this to work well, we need to be sure to define custom exception classes for each of our RPC calls.

/** Converts an exception into the appropriate gRPC response status. */
fun getErrorStatus(e: Throwable): Status {
    return when(e) {
        is DeliveryNotFoundException -> {
            Status.NOT_FOUND.withDescription("Delivery not found").withCause(e)
        }
        is CustomerNotFoundException -> {
            Status.NOT_FOUND.withDescription("Customer not found").withCause(e)
        }
        is DasherNotFoundException -> {
            Status.NOT_FOUND.withDescription("Dasher not found").withCause(e)
        }
        is GetDeliveryException -> {
            Status.INTERNAL.withDescription("Error getting delivery").withCause(e)
        }
        is GetCustomerException -> {
            Status.INTERNAL.withDescription("Error getting customer").withCause(e)
        }
        is GetDasherException -> {
            Status.INTERNAL.withDescription("Error getting dasher").withCause(e)
        }
        is InvalidRequestException -> {
            Status.INVALID_ARGUMENT.withDescription(e.message).withCause(e)
        }
        else -> {
            Status.INTERNAL
                .withDescription("Error: ${e.message}")
                .withCause(e)
        }
    }
}

Using structured concurrency to improve our gRPC method

Now, let's walk through how we can improve our initial example using structured concurrency. One of Kotlin's key features is support for coroutines, which are similar to lightweight threads. Structured concurrency is a technique of arranging coroutines using a parent/child relationship. This gives us the benefit of asynchronous I/O without the readability problems of futures and callbacks.

To showcase when to use structured concurrency, let's extend the initial example with some additional requirements:

  1. Fetch data in parallel to reduce latency
  2. Don't fetch the Dasher for pickup orders

Requirement 1: Fetching data in parallel

Fetching data in parallel decreases latency but it complicates the code, especially for RPCs that call many services. For example, you might need the result of one service call to make another service call, but those calls depend on other calls as well. How can we control which RPC calls to make in parallel without our code becoming a mess?

Structured concurrency makes this easy. We use supervisorScope, async() and await() to make requests in parallel using coroutines. We don't need to worry about what order to make requests in - just chain together coroutines and Kotlin will make requests in parallel when possible.

In this blog post, we want to focus on practical examples of how we use structured concurrency as opposed to a general introduction to the concept. If you're not familiar with coroutines, supervisorScope, async() or await(), there are some great guides that cover the topic. For the sake of brevity, I'll defer to those guides to introduce these topics.
First, we make a slight tweak to the DataModel so that each field is a Deferred<Result<T>> instead of just Result<T>.

/** Holds the response (or error) from any network requests that this RPC method makes. */
data class DataModel(
    val delivery: Deferred<Result<DeliveryResponse>>,
    val customer: Deferred<Result<CustomerResponse>>,
    val dasher: Deferred<Result<DasherResponse>>
)

Now we can modify getDataModel to chain together coroutines:

/** Fetches the data needed for this RPC method. All I/O happens in this method. */
suspend fun getDataModel(request: GetDeliverySummaryRequest): DataModel {
    return supervisorScope {
        val delivery = async {
            Result.runCatching {
                DeliveryService.getDelivery(request.deliveryUuid.value)
            }
        }

        val customer = async {
            delivery.awaitResult().mapCatching {
                if (!it.hasCustomerId()) {
                    throw IllegalStateException("Delivery does not have a customer id")
                }

                CustomerService.getCustomer(it.customerId.value)
            }
        }

        val dasher = async {
            delivery.awaitResult().mapCatching {
                if (!it.hasDasherId()) {
                    throw IllegalStateException("Delivery does not have a dasher id")
                }

                DasherService.getDasher(it.dasherId.value)
            }
        }

        DataModel(
            delivery = delivery,
            customer = customer,
            dasher = dasher
        )
    }
}

/**
 * Convenience function to await a Deferred<Result>.
 * If the call to await() throws an error, it wraps the error as a failed Result.
 */
suspend fun <T> Deferred<Result<T>>.awaitResult(): Result<T> {
    return try {
        this.await()
    } catch (e: Throwable) {
        Result.failure(e)
    }
}

Requirement 2: Don't fetch the Dasher for pickup orders

Where should we add the logic to not fetch the Dasher for pickup orders? It's tempting to put it in the getDataModel function, but that's not quite right. The decision of whether or not to use data from the Dasher is business logic. How can we keep this business logic out of the getDataModel function?

The answer is to use lazy evaluation, only making network calls at the point where we try to read the result from the DataModel. Instead of starting our coroutines immediately, we mark them as lazy. That way Kotlin won't execute the coroutines until you call .await() on the Deferred. If we move the calls to .await() to the getResponse function, then we won't make any network calls unless our business logic uses the data.

One caveat is that coroutineScope won't exit until all coroutines have been completed, but with lazy loading some coroutines might never be started so our app will hang indefinitely. The solution is to cancel any unused coroutines after calling getResponse.

Here's the code:

public suspend fun getDeliverySummary(request: GetDeliverySummaryRequest): GetDeliverySummaryResponse {
    try {
        validateRequest(request)

        return supervisorScope {
            try {
                // All network calls happen here
                val dataModel = getDataModel(request)

                // All business logic and transformation of data happens here
                getResponse(dataModel)
            } finally {
                // If any of the lazy coroutines are not started or completed, cancel them
                // Without this, the supervisorScope will never exit if one of the coroutines was never started.
                this.coroutineContext.cancelChildren()
            }
        }
    } catch (e: Throwable) {
        throw getErrorStatus(e).asException()
    }
}

/** Fetches the data needed for this RPC method. All I/O happens in this method. */
suspend fun getDataModel(request: GetDeliverySummaryRequest, coroutineScope: CoroutineScope): DataModel {
    return coroutineScope.run {
        val delivery = async(start = CoroutineStart.LAZY) {
            Result.runCatching {
                DeliveryService.getDelivery(request.deliveryUuid.value)
            }
        }

        val customer = async(start = CoroutineStart.LAZY) {
            delivery.awaitResult().mapCatching {
                if (!it.hasCustomerId()) {
                    throw IllegalStateException("Delivery does not have a customer id")
                }

                CustomerService.getCustomer(it.customerId.value)
            }
        }

        val dasher = async(start = CoroutineStart.LAZY) {
            delivery.awaitResult().mapCatching {
                if (!it.hasDasherId()) {
                    throw IllegalStateException("Delivery does not have a dasher id")
                }

                DasherService.getDasher(it.dasherId.value)
            }
        }

        DataModel(
            delivery = delivery,
            customer = customer,
            dasher = dasher
        )
    }
}

/** Converts the DataModel into the grpc response.
 *
 * Throws an exception if the DataModel is missing required data. */
suspend fun getResponse(dataModel: DataModel): GetDeliverySummaryResponse {
    val response = GetDeliverySummaryResponse.newBuilder()

    // Calculate each response field as a separate coroutine.
    // Since we're using lazy evaluation in the DataModel, data isn't fetched until we call awaitResult() here.
    // Calculating each response field as a separate coroutine ensures that we fetch data with maximum concurrency.
    // If we were to await() each Deferred serially, the requests would be triggered serially instead of in parallel.
    coroutineScope {
        launch {
            response.deliveryUuid = dataModel.delivery.awaitResult().getOrThrow().deliveryUuid
        }

        launch {
            if (!dataModel.deliveryResponse.awaitResult().getOrThrow().isPickupOrder()) {
                response.dasherName = dataModel.dasher.awaitResult().getOrThrow().name
            }
        }

        launch {
            response.customerName = dataModel.customer.awaitResult().getOrThrow().name
        }
    }

    return response.build()
}

Conclusion

By separating business logic from I/O, it's easy to quickly understand what an RPC method is doing. The DataModel will tell us what services our RPC method calls. The assessors of each data model property show us how the data is used.

We're also able to perform I/O efficiently, parallelizing requests while taking advantage of lazy loading to avoid unnecessary network calls. By adopting the functional-core, imperative-shell pattern and using Kotlin's structured concurrency features, you can write gRPC methods that are efficient, error-free, and easy to modify in response to changing product requirements.

Related Jobs

Location
San Francisco, CA; Seattle, WA
Department
Engineering
Location
San Francisco, CA; Sunnyvale, CA; Los Angeles, CA; Seattle, WA; New York, NY
Department
Engineering
Location
San Francisco, CA; Sunnyvale, CA; Seattle, WA
Department
Engineering
Location
San Francisco, CA; Sunnyvale, CA
Department
Engineering
Location
San Francisco, CA; Sunnyvale, CA; Los Angeles, CA; Seattle, WA; New York, NY
Department
Engineering