Ir al contenido
7737

Blog


Functional Core, Imperative Shell - Uso de la concurrencia estructurada para escribir puntos finales gRPC mantenibles en Kotlin

26 de julio de 2022

|
James Lamine

James Lamine

En el proceso de migración de un monolito python a una arquitectura orientada a servicios utilizando microservicios gRPC en Kotlin, DoorDash Engineering tuvo la oportunidad de definir mejor los patrones de cómo implementamos los puntos finales gRPC.

En una entrada anterior del blog, discutimos los patrones de programación funcional en Kotlin y cómo ayudaron a nuestro equipo a escribir código más limpio y eficaz. En esta entrada, ampliaremos esas ideas para mostrar cómo escribimos endpoints gRPC utilizando el patrón funcional-core, imperative-shell. Combinado con las características de concurrencia estructurada de Kotlin, podemos evitar muchas de las dificultades comunes del manejo de E/S, permitiéndonos centrarnos en la lógica de negocio.

En particular, nuestro enfoque nos permite implementar APIs que llaman a muchos servicios externos en paralelo sin crear un lío imposible de mantener de callbacks o futuros. Al utilizar el patrón de núcleo funcional y shell imperativo, podemos escribir la mayoría de nuestros puntos finales gRPC en un estilo de programación funcional puro, separando la lógica de negocio de la E/S de red.

Para ilustrar estos patrones, recorreremos el código de un endpoint gRPC que llama a tres servicios utilizando el patrón funcional-core, imperative-shell. A continuación, nos basaremos en ese ejemplo para mostrar cómo utilizar la concurrencia estructurada para realizar solicitudes en paralelo y lazy-load de datos.

En aras de la simplicidad, esta entrada de blog sólo cubre las API de sólo lectura, pero puede ampliar estos patrones para escribir APIs también.

Resumen

En esta entrada del blog lo haremos:

  • Introducir el patrón de núcleo funcional y shell imperativo
  • Utilice este patrón para escribir un endpoint gRPC que llame a tres servicios descendentes
  • Utilizar la concurrencia estructurada para realizar llamadas a servicios en paralelo y cargar datos de forma perezosa.

Introducción al patrón de núcleo funcional y shell imperativo

Una de las ideas clave de la programación funcional es utilizar funciones puras. Una función es pura si, dada la misma entrada, siempre devuelve la misma salida y no tiene efectos secundarios. Las peticiones de red no son funciones puras porque el resultado depende de algo más que de la entrada - por ejemplo, puede haber un resultado diferente basado en errores de red. Además, las peticiones de red pueden causar efectos secundarios como escribir en una base de datos.

Queremos escribir código con un estilo de programación funcional porque así es más fácil evitar errores, pero nuestro servicio gRPC necesita hacer muchas peticiones de red, y éstas no son funciones puras. ¿Cómo podemos utilizar la programación funcional sin dejar de hacer E/S de red?

Nuestra solución es utilizar el patrón núcleo funcional, shell imperativo. La idea básica del patrón es poner toda nuestra E/S en un solo lugar (el shell imperativo), separándola limpiamente del resto de nuestro programa (el núcleo funcional). El shell obtiene los datos y devuelve un DataModel que contiene el resultado de cada solicitud de red. Entonces podemos escribir nuestra lógica de negocio como una función pura que toma el DataModel como entrada y devuelve la respuesta. El beneficio de este patrón es que al separar la E/S, nuestra lógica de negocio puede ser implementada usando programación funcional.

Hay una serie de excelentes entradas de blog y vídeos en la web que explican el patrón con más detalle. En esta entrada del blog, voy a demostrar cómo usamos el patrón en la práctica y cómo aprovechamos las características de concurrencia estructurada de Kotlin para hacer nuestra implementación más fácil.

Manténgase informado con las actualizaciones semanales

Suscríbase a nuestro blog de ingeniería para estar al día de los proyectos más interesantes en los que trabaja nuestro equipo.

Diferencia entre lógica empresarial y E/S

Una de las principales ventajas de este patrón es que mantiene la lógica de negocio separada de la lógica de E/S.

La lógica empresarial incluye:

  • Decidir qué datos se necesitan
  • Transformar los datos en una respuesta
  • Devolución de respuestas diferentes en función de los errores de red que se hayan producido
  • Si se trata de una API de escritura, decidir qué datos escribir

La lógica de E/S de red incluye:

  • Llamadas RPC para obtener datos
  • Construir un objeto para describir lo que ocurrió como resultado de la solicitud de red (ya sea la respuesta o el error).
  • Configuración de reintentos y tiempos de espera
  • Lógica para realizar varias peticiones RPC en paralelo

Al aislar los dos, podemos escribir lógica de negocio en un estilo de programación funcional y, al mismo tiempo, gestionar con solidez los errores de red.

Anatomía de un método gRPC

A continuación, vamos a ver el código de un método gRPC de ejemplo. A alto nivel, nuestro método gRPC obtiene el modelo de datos y luego lo transforma en una respuesta. Conceptualmente, pensamos en ello como una composición de dos funciones:

getResponse( getDataModel( request ) ) -> response

En el contexto del patrón funcional-núcleo, imperativo-cáscara, getDataModel es el shell imperativo que realiza la E/S, y getResponse es el núcleo funcional que mira los resultados de esa E/S y devuelve la respuesta. A continuación, vamos a ver un ejemplo práctico de cómo implementar cada uno de estos métodos.

Ejemplo de código: obtención de datos para una entrega

Como ejemplo concreto, escribamos un método gRPC que obtenga información sobre una entrega. Este método toma un ID de entrega y devuelve el nombre del cliente y Dasher, nuestro nombre para un conductor de entrega. Hará tres llamadas al servicio:

  • getDelivery - busca una entrega por ID de entrega, devuelve el ID de la entrega y el ID del cliente
  • getCustomer - busca un cliente por su ID de cliente
  • getDasher - busca un dasher por su ID

Definición del contrato API mediante Protobuf

Dados los requisitos anteriores, podemos escribir un protobuf especificando el formato de solicitud y respuesta:

service ExampleService {
  rpc GetDeliverySummary(GetDeliverySummaryRequest) returns (GetDeliverySummaryResponse);
}

message GetDeliverySummaryRequest {
  google.protobuf.StringValue delivery_uuid = 1;
}

message GetDeliverySummaryResponse {
  google.protobuf.StringValue delivery_uuid = 1;
  google.protobuf.StringValue customer_name = 2;
  google.protobuf.StringValue dasher_name = 3;
}

Definir un esqueleto de código

Para implementar nuestra API, definimos cuatro métodos y una clase de modelo de datos.

public suspend fun getDeliverySummary(request: GetDeliverySummaryRequest): GetDeliverySummaryResponse {
    try {
        validateRequest(request)
        val dataModel = getDataModel(request)

        return getResponse(dataModel)
    } catch (e: Throwable) {
        throw getErrorStatus(e).asException()
    }
}

El modelo de datos describe lo que ocurrió durante la IO: contiene la respuesta o el error de cada llamada RPC. Nuestra lógica de negocio toma esas respuestas o errores y decide qué hacer con ellos. Encapsulamos toda la E/S en la función getDataModel - el resto son funciones puras que manejan la lógica de negocio.

A continuación, vamos a echar un vistazo a las firmas vacías de cada método para que podamos tener una mejor idea de cómo todo encaja antes de implementar cada método. Necesitamos definir cuatro métodos más una clase modelo:

1. Validar la solicitud

/** Validate a request, for example checking that all required fields are present
 *  and in the expected format.
 *  
 *  Throws an InvalidRequestException if the request is invalid.
 */
fun validateRequest(request: GetDeliverySummaryRequest) {

}

2. Obtener el modelo de datos

/** Fetches the data needed for this RPC method. All I/O happens in this method. */
suspend fun getDataModel(request: GetDeliverySummaryRequest): DataModel {

}

3. Convertir el modelo de datos en una respuesta

/** Converts the DataModel into the grpc response.
 *
 * Throws an exception if the DataModel is missing required data. */
fun getResponse(dataModel: DataModel): GetDeliverySummaryResponse {

}

4. Gestión de errores

/** Converts an exception into the appropriate gRPC response status. */
fun getErrorStatus(e: Throwable): Status {

}

5. Definir la clase DataModel

/** Holds the responses (or errors) from any network requests that this RPC method makes. */
data class DataModel()

Implementación inicial de nuestra API

A continuación, repasaremos la implementación inicial de cada método.

Por simplicidad, no usamos las características de concurrencia estructurada de Kotlin en este ejemplo inicial. Más adelante ampliaremos el ejemplo para mostrar cómo utilizar la concurrencia estructurada para realizar todas nuestras peticiones RPC descendentes en paralelo.

La clase DataModel

El modelo de datos describe lo ocurrido durante la E/S. En concreto, contiene la respuesta o el error de cada solicitud de red.

Esto nos permite separar la descripción de cómo obtener los datos de la lógica de negocio de cómo manejar los errores. Por ejemplo, es posible que queramos devolver una respuesta de error si algunos datos necesarios no se cargan, pero recurrir a algún valor predeterminado si los datos opcionales no se cargan.

We use Kotlin's Result<T> class to hold the response or error.

/** Holds the response (or error) from any network requests that this RPC method makes. */
data class DataModel(
    val delivery: Result<DeliveryResponse>,
    val customer: Result<CustomerResponse>,
    val dasher: Result<DasherResponse>
)

Validación de la solicitud

El código para la validación de solicitudes garantiza que la solicitud contiene un uuid de entrega válido.

/** Throws an InvalidRequestException if the request is invalid. */
fun validateRequest(request: GetDeliverySummaryRequest) {
    if (!request.hasDeliveryUuid() || request.deliveryUuid.value.isEmpty()) {
        throw InvalidRequestException("delivery_uuid is required")
    }
}

Obtener el modelo de datos

Ahora para nuestro código de E/S. Obtenemos la entrega, utilizando el customerId y dasherId de la respuesta para obtener el cliente y Dasher. Incluso si nuestras llamadas de servicio resultan en errores, getDataModel no lanzará una excepción porque estamos utilizando la función Result.runCatching y Result.mapCatching para capturar cualquier excepción y envolverla en una función Response objeto.

En getDataModel se encarga de hacer llamadas al servicio pero no de filtrar o transformar esas respuestas - eso cuenta como lógica de negocio y queremos mantener la lógica de negocio fuera de getDataModel. Así, aunque sólo necesitemos el campo nombre del Dasher, el modelo de datos contiene toda la respuesta RPC del Dasher.

/** Fetches the data needed for this RPC method. All I/O happens in this method. */
suspend fun getDataModel(request: GetDeliverySummaryRequest): DataModel {
    val delivery = Result.runCatching {
        DeliveryService.getDelivery(request.deliveryUuid.value)
    }

    val customer = delivery.mapCatching {
        if (!it.hasCustomerId()) {
            throw IllegalStateException("Delivery does not have a customer id")
        }

        CustomerService.getCustomer(it.customerId.value)
    }

    val dasher = delivery.mapCatching {
        if (!it.hasDasherId()) {
            throw IllegalStateException("Delivery does not have a dasher id")
        }

        DasherService.getDasher(it.dasherId.value)
    }

    return DataModel(
        delivery = delivery,
        customer = customer,
        dasher = dasher
    )
}

Conversión del modelo de datos en una respuesta

Ahora que hemos gestionado la E/S, la lógica de negocio para transformar el modelo de datos en una respuesta es sencilla. Dado que todos los datos son necesarios, estamos utilizando Result.getOrThrow para lanzar una excepción en caso de peticiones RPC fallidas.


/** Converts the DataModel into the grpc response.
 *
 * Throws an exception if the DataModel is missing required data. */
fun getResponse(dataModel: DataModel): GetDeliverySummaryResponse {
    val response = GetDeliverySummaryResponse.newBuilder()

    response.deliveryUuid = dataModel.delivery.getOrThrow().deliveryUuid
    response.customerName = dataModel.customer.getOrThrow().name
    response.dasherName = dataModel.dasher.getOrThrow().name

    return response.build()
}

Cabe preguntarse por qué necesitamos un Result<> object para cada DataModel campo. Si sólo estamos lanzando una excepción cada vez que hay un resultado de error, ¿por qué no tener un único Result<DataModel> en lugar de varios objetos Result<> fields?

Para este sencillo ejemplo, separe Result<> fields no son necesarios ya que cada respuesta RPC es requerida. Pero, ¿qué pasa si los requisitos del producto cambian y queremos fallar con elegancia haciendo que el campo dasherName sea opcional? Con un único Result<DataModel>tendríamos que refactorizar el DataModel, getDataModel y getResponse métodos.

Pero con Result<> fields para cada llamada RPC, sólo tenemos que cambiar una línea:

fun getResponse(dataModel: DataModel): GetDeliverySummaryResponse {
    val response = GetDeliverySummaryResponse.newBuilder()

    response.deliveryUuid = dataModel.delivery.getOrThrow().deliveryUuid
    response.customerName = dataModel.customer.getOrThrow().name
    response.dasherName = dataModel.dasher.getOrNull()?.map { it.name }

    return response.build()
}

La clave aquí es que decidir cómo manejar los diferentes errores es lógica de negocio. Al mantener esa lógica de negocio en el método getResponse, podemos implementar nuevos requisitos del producto con cambios mínimos en el código.

Asignación de excepciones a la respuesta gRPC adecuada

Nuestro método RPC puede lanzar excepciones en dos lugares - validateRequest y getResponse. En getErrorStatus devuelve el código de respuesta gRPC apropiado en función de la excepción.

Tenga en cuenta que para que esto funcione bien, tenemos que asegurarnos de definir clases de excepción personalizadas para cada una de nuestras llamadas RPC.

/** Converts an exception into the appropriate gRPC response status. */
fun getErrorStatus(e: Throwable): Status {
    return when(e) {
        is DeliveryNotFoundException -> {
            Status.NOT_FOUND.withDescription("Delivery not found").withCause(e)
        }
        is CustomerNotFoundException -> {
            Status.NOT_FOUND.withDescription("Customer not found").withCause(e)
        }
        is DasherNotFoundException -> {
            Status.NOT_FOUND.withDescription("Dasher not found").withCause(e)
        }
        is GetDeliveryException -> {
            Status.INTERNAL.withDescription("Error getting delivery").withCause(e)
        }
        is GetCustomerException -> {
            Status.INTERNAL.withDescription("Error getting customer").withCause(e)
        }
        is GetDasherException -> {
            Status.INTERNAL.withDescription("Error getting dasher").withCause(e)
        }
        is InvalidRequestException -> {
            Status.INVALID_ARGUMENT.withDescription(e.message).withCause(e)
        }
        else -> {
            Status.INTERNAL
                .withDescription("Error: ${e.message}")
                .withCause(e)
        }
    }
}

Uso de la concurrencia estructurada para mejorar nuestro método gRPC

Ahora, vamos a ver cómo podemos mejorar nuestro ejemplo inicial utilizando concurrencia estructurada. Una de las características clave de Kotlin es el soporte para coroutines, que son similares a los hilos ligeros. La concurrencia estructurada es una técnica para organizar coroutines usando una relación padre/hijo. Esto nos da el beneficio de la E/S asíncrona sin los problemas de legibilidad de futuros y callbacks.

Para mostrar cuándo utilizar la concurrencia estructurada, ampliemos el ejemplo inicial con algunos requisitos adicionales:

  1. Obtención de datos en paralelo para reducir la latencia
  2. No busques el Dasher para recoger pedidos

Requisito 1: Obtención de datos en paralelo

La obtención de datos en paralelo disminuye la latencia, pero complica el código, especialmente en el caso de las RPC que llaman a muchos servicios. Por ejemplo, puede que necesites el resultado de una llamada a un servicio para hacer otra llamada a otro servicio, pero esas llamadas dependen también de otras llamadas. ¿Cómo podemos controlar qué llamadas RPC hacer en paralelo sin que nuestro código se convierta en un caos?

La concurrencia estructurada facilita esta tarea. Utilizamos supervisorScope, async() and await() para hacer peticiones en paralelo usando coroutines. No tenemos que preocuparnos de en qué orden hacer las peticiones - simplemente encadena coroutines y Kotlin hará las peticiones en paralelo cuando sea posible.

En esta entrada del blog, queremos centrarnos en ejemplos prácticos de cómo utilizamos la concurrencia estructurada en lugar de una introducción general al concepto. Si no está familiarizado con coroutines, supervisorScope, async() or await()hay algunos grandes guías que cubren el tema. En aras de la brevedad, me remitiré a esas guías para presentar estos temas.
En primer lugar, hacemos un pequeño ajuste en el DataModel para que cada campo sea un Deferred<Result<T>> en lugar de Result<T>.

/** Holds the response (or error) from any network requests that this RPC method makes. */
data class DataModel(
    val delivery: Deferred<Result<DeliveryResponse>>,
    val customer: Deferred<Result<CustomerResponse>>,
    val dasher: Deferred<Result<DasherResponse>>
)

Ahora podemos modificar getDataModel para encadenar coroutines:

/** Fetches the data needed for this RPC method. All I/O happens in this method. */
suspend fun getDataModel(request: GetDeliverySummaryRequest): DataModel {
    return supervisorScope {
        val delivery = async {
            Result.runCatching {
                DeliveryService.getDelivery(request.deliveryUuid.value)
            }
        }

        val customer = async {
            delivery.awaitResult().mapCatching {
                if (!it.hasCustomerId()) {
                    throw IllegalStateException("Delivery does not have a customer id")
                }

                CustomerService.getCustomer(it.customerId.value)
            }
        }

        val dasher = async {
            delivery.awaitResult().mapCatching {
                if (!it.hasDasherId()) {
                    throw IllegalStateException("Delivery does not have a dasher id")
                }

                DasherService.getDasher(it.dasherId.value)
            }
        }

        DataModel(
            delivery = delivery,
            customer = customer,
            dasher = dasher
        )
    }
}

/**
 * Convenience function to await a Deferred<Result>.
 * If the call to await() throws an error, it wraps the error as a failed Result.
 */
suspend fun <T> Deferred<Result<T>>.awaitResult(): Result<T> {
    return try {
        this.await()
    } catch (e: Throwable) {
        Result.failure(e)
    }
}

Requisito 2: No busques el Dasher para recoger pedidos

¿Dónde deberíamos añadir la lógica para no obtener el Dasher para los pedidos de recogida? Es tentador ponerlo en la función getDataModel, pero eso no es del todo correcto. La decisión de utilizar o no los datos del Dasher es lógica de negocio. ¿Cómo podemos mantener esta lógica de negocio fuera de la función getDataModel?

La respuesta es utilizar la evaluación perezosa, realizando sólo llamadas a la red en el punto en el que intentamos leer el resultado de la función DataModel. En lugar de iniciar nuestras coroutines inmediatamente, las marcamos como perezosas. De esta forma Kotlin no ejecutará el comando coroutines hasta que llame .await() en el Deferred. Si trasladamos las llamadas a .await() a la getResponse entonces no haremos ninguna llamada a la red a menos que nuestra lógica de negocio utilice los datos.

Una advertencia es que coroutineScope no saldrá hasta que todas las coroutines se hayan completado, pero con la carga perezosa algunas coroutines podrían no iniciarse nunca, por lo que nuestra aplicación se colgaría indefinidamente. La solución es cancelar las coroutines no utilizadas después de llamar a getResponse.

Aquí está el código:

public suspend fun getDeliverySummary(request: GetDeliverySummaryRequest): GetDeliverySummaryResponse {
    try {
        validateRequest(request)

        return supervisorScope {
            try {
                // All network calls happen here
                val dataModel = getDataModel(request)

                // All business logic and transformation of data happens here
                getResponse(dataModel)
            } finally {
                // If any of the lazy coroutines are not started or completed, cancel them
                // Without this, the supervisorScope will never exit if one of the coroutines was never started.
                this.coroutineContext.cancelChildren()
            }
        }
    } catch (e: Throwable) {
        throw getErrorStatus(e).asException()
    }
}

/** Fetches the data needed for this RPC method. All I/O happens in this method. */
suspend fun getDataModel(request: GetDeliverySummaryRequest, coroutineScope: CoroutineScope): DataModel {
    return coroutineScope.run {
        val delivery = async(start = CoroutineStart.LAZY) {
            Result.runCatching {
                DeliveryService.getDelivery(request.deliveryUuid.value)
            }
        }

        val customer = async(start = CoroutineStart.LAZY) {
            delivery.awaitResult().mapCatching {
                if (!it.hasCustomerId()) {
                    throw IllegalStateException("Delivery does not have a customer id")
                }

                CustomerService.getCustomer(it.customerId.value)
            }
        }

        val dasher = async(start = CoroutineStart.LAZY) {
            delivery.awaitResult().mapCatching {
                if (!it.hasDasherId()) {
                    throw IllegalStateException("Delivery does not have a dasher id")
                }

                DasherService.getDasher(it.dasherId.value)
            }
        }

        DataModel(
            delivery = delivery,
            customer = customer,
            dasher = dasher
        )
    }
}

/** Converts the DataModel into the grpc response.
 *
 * Throws an exception if the DataModel is missing required data. */
suspend fun getResponse(dataModel: DataModel): GetDeliverySummaryResponse {
    val response = GetDeliverySummaryResponse.newBuilder()

    // Calculate each response field as a separate coroutine.
    // Since we're using lazy evaluation in the DataModel, data isn't fetched until we call awaitResult() here.
    // Calculating each response field as a separate coroutine ensures that we fetch data with maximum concurrency.
    // If we were to await() each Deferred serially, the requests would be triggered serially instead of in parallel.
    coroutineScope {
        launch {
            response.deliveryUuid = dataModel.delivery.awaitResult().getOrThrow().deliveryUuid
        }

        launch {
            if (!dataModel.deliveryResponse.awaitResult().getOrThrow().isPickupOrder()) {
                response.dasherName = dataModel.dasher.awaitResult().getOrThrow().name
            }
        }

        launch {
            response.customerName = dataModel.customer.awaitResult().getOrThrow().name
        }
    }

    return response.build()
}

Conclusión

Al separar la lógica de negocio de la E/S, es fácil entender rápidamente qué está haciendo un método RPC. El DataModel nos dirá a qué servicios llama nuestro método RPC. Los evaluadores de cada propiedad del modelo de datos nos muestran cómo se utilizan los datos.

También somos capaces de realizar I/O eficientemente, paralelizando peticiones mientras aprovechamos la carga perezosa para evitar llamadas de red innecesarias. Adoptando el patrón de núcleo funcional y shell imperativo y utilizando las características de concurrencia estructurada de Kotlin, puedes escribir métodos gRPC eficientes, sin errores y fáciles de modificar en respuesta a los requisitos cambiantes del producto.

About the Author

Trabajos relacionados

Ubicación
Nueva York, NY
Departamento
Ingeniería
Ubicación
Sunnyvale, CA; San Francisco, CA
Departamento
Ingeniería
Ubicación
Toronto, ON
Departamento
Ingeniería
Ubicación
New York, NY; San Francisco, CA; Sunnyvale, CA; Los Angeles, CA; Seattle, WA
Departamento
Ingeniería
Ubicación
San Francisco, CA; Sunnyvale, CA
Departamento
Ingeniería