Skip to content
3666

Blog


Comment rendre Kafka Consumer compatible avec Gevent en Python

17 février 2021

|

Jessica Zhao

La gestion des tâches asynchrones à l'aide de Gevent améliore l'évolutivité et l'efficacité des ressources pour les systèmes distribués. Cependant, l'utilisation de cet outil avec Kafka peut s'avérer difficile. 

Chez DoorDash, de nombreux services sont basés sur Python, notamment les technologies RabbitMQ et Celery, qui ont joué un rôle central dans le système de file d'attente asynchrone de notre plateforme. Nous nous appuyons également sur Gevent, une bibliothèque de concurrence basée sur les coroutines, pour améliorer encore l'efficacité de nos opérations de traitement des tâches asynchrones. Au fur et à mesure de la croissance de DoorDash, nous avons été confrontés à des problèmes d'évolutivité et à des incidents qui nous ont incités à remplacer RabbitMQ et Celery par Apache Kafka, une plateforme open source de streaming d'événements distribués qui offre une fiabilité et une évolutivité supérieures. 

Cependant, lors de la migration vers Kafka, nous avons découvert que Gevent, l'outil que nous utilisons pour le traitement asynchrone des tâches dans notre système de point de vente (POS), n'est pas compatible avec Kafka. Cette incompatibilité est due au fait que nous utilisons Gevent pour corriger nos bibliothèques de code Python afin d'effectuer des E/S asynchrones, alors que Kafka est basé sur librdkafka, une bibliothèque C. Le consommateur Kafka bloque les E/S dans la bibliothèque Python. Le consommateur Kafka bloque les E/S de la bibliothèque C et ne peut pas être corrigé par Gevent de la manière asynchrone que nous recherchons.

Nous avons résolu ce problème d'incompatibilité en allouant manuellement Gevent à un thread greenlet, en exécutant le traitement des tâches du consommateur Kafka dans le thread Gevent, et en remplaçant le blocage des E/S de la tâche du consommateur par la version de Gevent de l'appel "bloquant" pour obtenir l'asynchronisme. Les tests de performance et les résultats de production réels ont montré que notre consommateur Kafka fonctionnait sans problème avec Gevent, et qu'il était plus performant que le gestionnaire de tâches Celery/Gevent que nous avions auparavant, en particulier lorsqu'il s'agissait de traiter des temps d'E/S importants, ce qui ralentissait les services en aval. 

Pourquoi abandonner RabbitMQ/Celery au profit de Kafka avec Gevent ?

Afin d'éviter une série de pannes provenant de notre logique de traitement des tâches, plusieurs équipes d'ingénieurs de DoorDash ont migré de RabbitMQ et Celery vers une solution Kafka personnalisée. Bien que les détails puissent être trouvés dans cet article, voici un bref résumé des avantages de la migration vers Kafka :  

Depuis la migration vers Kafka, de nombreuses équipes DoorDash ont constaté des améliorations en termes de fiabilité et d'évolutivité. Afin de bénéficier d'avantages similaires, notre équipe de commerçants s'est préparée à migrer son service POS vers Kafka. Cette migration est compliquée par le fait que les services de notre équipe utilisent également Gevent :

  • Gevent est une bibliothèque Python basée sur les coroutines et les E/S non bloquantes. Avec Gevent, nous pouvons traiter des tâches d'E/S réseau lourdes de manière asynchrone sans qu'elles soient bloquées par l'attente d'E/S, tout en continuant à écrire du code de manière synchrone. Pour en savoir plus sur notre implémentation originale de Gevent, lisez cet article de blog.
  • Gevent peut facilement adapter le code d'une application existante ou une bibliothèque tierce pour les E/S asynchrones, ce qui le rend facile à utiliser pour nos services basés sur Python.
  • Gevent a une exécution légère via des greenlets, et fonctionne bien lorsqu'il s'adapte à l'application. 
  • Nos services ont des opérations d'E/S réseau lourdes avec des parties externes comme les marchands, dont les API peuvent avoir une latence longue et irrégulière, que nous ne contrôlons pas. Nous avons donc besoin d'un traitement asynchrone des tâches pour améliorer l'utilisation des ressources et l'efficacité des services.
  • Avant de mettre en œuvre Gevent, nous souffrions lorsqu'un partenaire important avait une panne, ce qui pouvait avoir un impact sur la performance de notre propre service. 

Gevent étant un composant essentiel pour nous aider à atteindre un débit élevé de traitement des tâches, nous voulions bénéficier des avantages de la migration vers Kafka tout en conservant les avantages de l'utilisation de Gevent.

Les nouveaux défis de la migration vers Kafka 

Lorsque nous avons commencé à migrer de Celery vers Kafka, nous avons été confrontés à de nouveaux défis en essayant de conserver Gevent intact. Tout d'abord, nous voulions maintenir le haut débit de traitement des tâches existant qui était permis par Gevent, mais nous n'avons pas pu trouver de bibliothèque Kafka Gevent prête à l'emploi, ni de ressources en ligne pour combiner Kafka et Gevent. 

Nous avons étudié la façon dont l'application monolithique de DoorDash a migré de Celery à Kafka, et nous avons constaté que ces cas d'utilisation utilisaient un processus dédié pour chaque tâche. Dans nos services et avec nos cas d'utilisation, dédier un processus par tâche entraînerait une consommation excessive de ressources par rapport à l'utilisation des threads Gevent. Nous ne pouvions tout simplement pas reproduire le travail de migration qui avait été effectué auparavant chez DoorDash, et nous avons dû élaborer une nouvelle implémentation pour nos cas d'utilisation, qui impliquait de fonctionner avec Gevent sans perte d'efficacité.

Lorsque nous avons examiné notre propre implémentation du consommateur Kafka avec Gevent, nous avons identifié un problème d'incompatibilité : comme la bibliothèque confluent-kafka-python que nous utilisons est basée sur la bibliothèque C librdkafka, ses appels de blocage ne peuvent pas être corrigés par Gevent parce que Gevent ne fonctionne que sur le code et les bibliothèques Python. Si nous remplaçons naïvement le travailleur Celery par un consommateur Kafka pour interroger les messages des tâches, nos threads Gevent de traitement des tâches existants seront bloqués par l'appel à l'interrogation du consommateur Kafka, et nous perdrons tous les avantages de l'utilisation de Gevent.

Schéma démontrant l'incompatibilité entre Gevent et Kafka
Figure 1 : Le Task worker est patché par Gevent pour traiter les tâches de manière asynchrone, mais il est bloqué par le consommateur Kafka à cause de librdkafka.

while True:
   message = consumer.poll(timeout=TIME_OUT)
   if not message:
       continue

Figure 2 : Cet extrait de code est une implémentation typique d'un consommateur Kafka avec un délai d'attente défini pour l'interrogation des messages. Cependant, il bloque les threads Gevent car le timeout est effectué par librdkafka.

Remplacer l'appel bloquant de Kafka par un appel asynchrone de Gevent

En étudiant des articles en ligne sur un problème similaire avec Kafka producer et Gevent, nous avons trouvé une solution pour résoudre le problème d'incompatibilité entre Kafka consumer et Gevent : lorsque Kafka consumer interroge les messages, nous réglons le timeout de blocage de Kafka sur zéro, ce qui ne bloque plus nos threads Gevent. 

Dans le cas où il n'y a pas de message disponible à interroger, afin d'économiser le cycle de l'unité centrale dans la boucle d'interrogation des messages, nous ajoutons une fonction gevent.sleep(timeout) call. De cette manière, nous pouvons changer de contexte pour effectuer le travail d'autres threads pendant que le thread du consommateur Kafka est en veille. Comme la mise en veille est effectuée par Gevent, les autres threads Gevent ne seront pas bloqués pendant que nous attendons la prochaine interrogation du message du consommateur.

while True:
   message = consumer.poll(timeout=0)
   if not message:
       gevent.sleep(TIME_OUT)
       continue

Figure 3 : Le fait de fixer à zéro le délai d'interrogation des messages du consommateur Kafka ne bloque plus les threads Gevent.

Un compromis possible pour effectuer ce changement manuel de contexte de thread Gevent est que si nous interférons avec le cycle de consommation des messages Kafka, nous pourrions sacrifier toutes les optimisations provenant de la bibliothèque Kafka. Cependant, grâce aux tests de performance, nous n'avons pas constaté de dégradation après avoir effectué ce type de changements, et nous pourrions même constater des améliorations de performance en utilisant Kafka par rapport à Celery.

Comparaison de débit : Kafka vs Celery

Le graphique ci-dessous présente la comparaison du débit, en temps d'exécution, entre Kafka et Celery. Celery et Kafka obtiennent des résultats similaires sur de petites charges, mais Celery est relativement sensible au nombre de tâches simultanées qu'il exécute, alors que Kafka maintient un temps de traitement presque identique quelle que soit la charge. Le nombre maximum de tâches exécutées simultanément dans les tests est de 6 000 et Kafka montre un excellent débit même avec des retards d'E/S dans les tâches, alors que le temps d'exécution des tâches de Celery augmente sensiblement jusqu'à 140 secondes. Alors que Celery est compétitif pour de petites quantités de tâches sans temps d'E/S, Kafka surpasse Celery pour de grandes quantités de tâches concurrentes, en particulier lorsqu'il y a des retards d'E/S.

ParamètresTemps d'exécution de KafkaTemps d'exécution du céleri
100 travaux par demande, 5 demandes
pas de délai d'attente E/S
256 ms153 ms
200 travaux par demande, 5 demandes
pas de délai d'attente E/S
222 ms257 ms
200 travaux par demande, 10 demandes
pas de délai d'attente E/S
251 - 263 ms400 ms - 2 secondes
200 travaux par demande, 20 demandes
pas de délai d'attente E/S
255 ms650 ms
300 travaux par demande, 10 demandes
pas de délai d'attente E/S
256 - 261 ms443 ms
300 travaux par demande, 10 demandes
5 secs Délai d'attente E/S
5,3 secondes10 - 61 secondes
300 travaux par demande, 20 demandes
5 secs Délai d'attente E/S
5,25 secondes10 - 140 secondes
Figure 4 : Kafka est nettement plus performant que Celery pour les charges d'E/S importantes.

Résultats

La migration de Celery vers Kafka tout en continuant à utiliser Gevent nous permet d'avoir une solution de mise en file d'attente des tâches plus fiable tout en maintenant un débit élevé. Les expériences de performance ci-dessus montrent des résultats prometteurs pour des situations de volume et de latence d'E/S élevés. Jusqu'à présent, nous avons utilisé Kafka consumer avec Gevent pendant quelques mois en production, et nous avons constaté un débit élevé et fiable sans la récurrence des problèmes que nous avions vus auparavant lorsque nous utilisions Celery. 

Conclusion

L'utilisation de Kafka avec Gevent est une combinaison puissante. Kafka a fait ses preuves et a gagné en popularité en tant que bus de messagerie et solution de mise en file d'attente, tandis que Gevent est un outil puissant pour améliorer le débit des services Python lourds en entrées/sorties. Malheureusement, nous n'avons pas trouvé de bibliothèque disponible pour combiner Kafka et Gevent, probablement parce que Gevent ne fonctionne pas avec la bibliothèque C librdkafka sur laquelle Kafka est basé. Dans notre cas, nous avons dû nous battre, mais nous avons été heureux de trouver une solution efficace pour combiner les deux. Pour d'autres entreprises, si un débit élevé, l'évolutivité et la fiabilité sont les propriétés souhaitées pour leurs applications Python qui nécessitent un bus de messagerie, Kafka avec Gevent pourrait être la solution. 

Remerciements

Les auteurs souhaitent remercier Mansur Fattakhov, Hui Luan, Patrick Rogers, Simone Restelli et Adi Sethupat pour leurs contributions et leurs conseils au cours de ce projet.

About the Author

Emplois connexes

Localisation
Oakland, CA ; San Francisco, CA
Département
Ingénierie
Localisation
Oakland, CA ; San Francisco, CA
Département
Ingénierie
Localisation
New York, NY
Département
Ingénierie
Localisation
Sunnyvale, CA ; San Francisco, CA
Département
Ingénierie
Localisation
Toronto, ON
Département
Ingénierie