Blog Arolla

AMQP 101 ~ Part 1

AMQP

A l'heure des architectures élastiques, des clusters de serveurs, de la répartitions de charges, de l'intégration de système tiers et de l'asynchronisme, il existe un composant essentiel: le bus de messages. Véritable coordinateur de l'infrastructure et des différents systèmes, il permet aux différents composants de communiquer entre eux de manière totalement transparente et sans contrainte des technologies sous-jacentes de chacun d'eux. Il facilite l'interopérabilité et le découplage de systèmes hétérogènes.

On retrouve ainsi généralement (dans les grosses structures!) des files WebsphereMQ/MQseries (IBM) ou des broker Tibco, ... Des solutions relativement lourdes à mettre en place et très... propriétaires! On pourrait alors rétorquer la présence du standard JMS (Java Message Service), sauf que celui-ci ne définit pas un protocole d'échange mais un ensemble d'API ce qui ne facilite pas vraiment l'interopérabilité avec un autre langage.

Alors qu'un mouvement de fond pousse à la simplicité, des solutions alternatives se mettent en place comme le pubsub de Redis ou encore ZeroMq.

Bien que ces technologies soient utilisées par les plus grands du web, certains projets restent encore frileux à utiliser des protocoles spécifiques/propriétaires et non normés. Il existe cependant une alternative "industrielle", qui paradoxalement est très simple d'utilisation et à mettre en place: AMQP, hummmm encore un acronyme douteux? Jamais entendu parler? Et bien regardez de plus près le portefeuille d'applications de VMWare (SpringSource): RabbitMQ. Il s'agit d'une implémentation du broker de ce protocole.

Advanced Message Queueing Protocol (AMQP) a été créé (par iMatix à l'initiative de JP Morgan en 2003-6) comme un standard ouvert (open standard) définissant un protocole "wire-level". Autrement dit il s'agit d'un protocole d'échange de données, il décrit un format de données permettant d'envoyer des données et des commandes à n'importe quel Broker implémentant ce protocole, quel que soit le langage utilisé par le client ou le broker.

Avant de voir quelques exemples d'utilisation, présentons les principaux concepts/composants d'AMQP.

AMQP en 10min

S'agissant d'un protocole d'échange de message on retrouve naturellement la notion de ... file ou Queue! Il s'agit grossièrement d'une liste FIFO (First In First Out) dans laquelle sont ajoutés les messages qui seront ensuite redistribués aux différents consommateurs connectés à cette file.

Une différence fondamentale d'AMQP avec les autres protocoles est qu'un message n'est jamais envoyé directement à une file! Les producteurs publient (publish) les messages dans des zones d'échanges ou Exchange. L'Exchange joue le rôle d'un aiguilleur, mais contrairement à la SNCF, il est possible d'aiguiller le même message sur plusieurs Queue. Les consommateurs s'inscrivent (subscribe) aux files pour recevoir les messages.

Mais alors comment passe-t-on de l'Exchange à la Queue? Et bien en définissant une liaison ou Binding entre la Queue et l'Exchange. Humpffff ?!? Autrement dit chaque file s'enregistre (declare_binding) sur une zone d'échange en définissant un motif de routage (pattern_key généralement appellé routing_key) qui permettra à la zone d'échange de savoir si les messages qu'elle reçoit intéressent ou non la file. Il s'agit d'une chaîne de caractères qui sera utilisée par l'Exchange pour filtrer les messages qu'il reçoit.

Un message est constitué d'une enveloppe, d'attributs et d'un contenu (payload). Les attributs contiendront par exemple le type de contenu (ContentType, ex. "application/json", "application/xml", "plain/text" ...) et la clé de routage que nous verrons un peu plus loin.

Note: parmi les attributs certains sont optionnels et sont souvent appelés "en-tête" (headers).

Aperçu général

En pseudo-code cela pourrait s'illustrer ainsi:

/**
 * A source produces something.
 * A sink consumes something.
 */
interface Sink {
    def consume(message:Message)
}

class Queue extends Sink { ... }

class Exchange extends Sink {

    val connectedSinks = new ListBuffer[(RoutingKey,Sink)]

    def declareBinding(patternKey:RoutingKey, sink:Sink) {
        connectedSinks.add((patternKey, sink))
    }

    def consume(Message message) {
        connectedSinks.foreach( (patternKey,sink) =>
            if(accepts(patternKey, message))
                sink.consume(message)
        )
    }

    def accepts(patternKey:RoutingKey,
                message:Message):Boolean = ...
}

Quels sont les critères qui font qu'un Exchange considère qu'une règle de routage est satisfaite par un message?

Et bien cela dépend du type de l'Exchange. Il faut tout d'abord que vous sachiez qu'il est possible d'associer à chaque message une clé de routage (routing_key). Il s'agit d'une chaîne de caractères que l'on pourrait assimiler au TO d'un mail. Il existe trois types d'Exchange principaux: direct, fanout et topic. Selon le broker utilisé (RabbitMQ par exemple), il est possible de développer sa propre implémentation (nous verrons cela dans un prochain article).

Tenez, Prenez tout! - Le type Fanout

Il s'agit certainement du type le plus simple à comprendre:

Un message envoyé à un Exchange de type fanout sera redistribué à toutes les files qui se seront enregistrées. Le motif de routage est tout simplement ignoré.

def accepts(patternKey:RoutingKey,
            message:Message) = true

Fanout Exchange

Si t'as la même clé je te le donne - Le type direct

Un message envoyé à un Exchange de type direct sera redistribué aux files dont le motif de routage correspond exactement à la clé de routage.

def accepts(patternKey:RoutingKey,
            message:Message) =
                patternKey.equalTo(message.routingKey)

Note: il est possible de déclarer plusieurs règles de routage pour une Queue sur le même Exchange.

Direct Exchange

Tu veux ou tu veux pas ? - type topic

Il s'agit du type d'Exchange le plus évolué.

Le motif de routage utilisé par la file pour s'enregistrer sur la zone d'échange est transformé en une sorte Glob (un expression régulière simplifiée). Un message sera donc transféré à la file si sa clé de routage satisfait le motif de routage.

def accepts(patternKey:RoutingKey, message:Message):Boolean = patternKey.isSatisfiedBy(message.routingKey) 

Les clés et motifs de routages doivent alors satisfaire des règles plus strictes:

Une clé de routage doit être constituée d'un ou plusieurs mots délimités par un point (.). Chaque mot est constitué des lettres de A à Z et de a à Z ainsi que des chiffres de 0 à 9: 'hal.shutdown', kitt.feature.turboBoost, ...

Le motif de routage doit obéir aux mêmes règles à ceci près que * correspondra à n'importe quel mot et # correspondra à n'importe quelle séquence de mots.

En pseudo-bnf:

    routingKey: <word> ('.' <word>)*
    word: ('a'..'z' | 'A'..'Z' | '0'..'9')*

    patternKey: <part> ('.' <part>)*
    part: <word> | '*' | '#'
Routing key\Pattern # * users.*.login users.# users.* #.alert
users.bob.login X X X
users.alert.login X X X
users.dos.alert X X X
payment.alert X X
users.alert X X X X
users.updated X X X
shutdown X X

Topic Exchange

Routage des messages un par un:

Topic Exchange

Remarque: il est possible de voir les types fanout et direct comme des spécialisations du type topic:

  • Si toutes les Queue sont liées avec le motif #, l'Exchange aura le même comportement qu'un Exchange de type fanout.
  • Si les motifs n'utilisent pas les caractères spéciaux * et #, l'Exchange aura le même comportement qu'un Exchange de type direct.

Bon, à moins de lire lentement, vous ne devriez pas être à beaucoup plus de 10min de lecture pour connaitre les principaux concepts de AMQP. Voyons désormais des cas d'utilisations.

Exemples d’utilisation

Centralisation des logs et Monitoring

Logging et Monitoring

Un Loadbalancer

Considérons que nous disposons d'un ensemble de machines identiques capables d’exécuter un certain nombre de commande. Le temps de traitement de chaque commande est évidement suffisamment long pour qu'il y ait un intérêt à vouloir le répartir sur plusieurs machines.

Load balancing: avant distribution

Remplissage pas à pas des différentes files:

Load balancing: traitement en cours distribution

Le type de l'Exchange importe peu tant que l'Exchange redirige les messages à la Queue. Une notion importante est qu'un message est consommé d'une file. Un message consommé n'est donc plus disponible et ne sera pas distribué aux autres consommateurs. Une fois qu'un consommateur a fini de traiter le message, il demandera le prochain message à traiter à la Queue. Les messages sont donc plus ou moins (en fonction des temps de traitement) redistribués en round-robin sur l'ensemble des consommateurs.

RPC (Remote Procedure Call)

Maintenant que nous avons distribué nos tâches sur différentes machines comment obtenir le résultat? Eh bien les gars d'AMQP ont tous prévu, parmi les entêtes disponibles il existe un champ reply-to. Ce champs contient le nom de la Queue sur laquelle le résultat devra être envoyé. Cette Queue peut être une file existante ou une file créée spécialement pour cette occasion. Il est en effet possible de créer une Queue dont le nom sera généré aléatoirement par le Broker. Une fois cette Queue créée, son nom est récupérée et renseignée dans le champs reply-to du message qui sera soumis. Le producteur du message n'aura plus qu'à attendre (il pourra néanmoins continuer son petit train train) jusqu'à ce qu'il reçoive le résultat de son appel.

Remote Procedure Call: entête reply-to

Persistance et durabilité, il est passé où mon message?

Un message qui survit à un crash du broker AMQP est appelé message persistant. Pour qu'un message soit persistant il faut:

  • qu'il soit marqué comme persistant
  • qu'il soit envoyé à un Exchange marqué comme durable (c'est à dire qui sera recréé au redémarrage du broker)
  • et qu'il arrive dans une Queue marquée comme durable

Il est important de savoir que par défaut les Exchanges et les Queues ne sont pas marquées comme durable.

Mettons de côté les éventuels crashs, et voyons maintenant un cas fréquent de déploiement:

Il n'y a pas de Queue liée à l'Exchange. Les messages envoyés à l'Exchange sont ... perdus! En effet, l'Exchange n'a qu'un rôle d'aiguilleur, aucun message n'est persisté, c'est à la charge des Queues de conserver les messages jusqu'à ce qu'un éventuel consommateur commence à vider la file.

Il est donc important:

  • soit d'être sûr que dans la séquence de mise en route de l'application, les Queues soient bien créées et associées avant que les messages ne commencent à être publiés.
  • soit de configurer les Exchanges et les Queues qui sont au cœur de l'application comme durable.

On retrouve aussi souvent dans les (exemples de) codes des producteurs de messages: la création aussi de la Queue, même si celle-ci n'est pas utilisée par le producteur. Le protocole prévoit en effet que si une Queue existe déjà l'appel à la création n'aura aucun effet si la Queue créée la même configuration. Sinon, une erreur est retournée au dernier qui essaie de créer la Queue.

Afin de "contrôler" encore plus ces créations, il est même possible de définir sur le broker la configuration de chaque Exchange et chaque Queue afin de s'assurer qu'elle soit créées dans le respect de ces paramètres.

SEDA

SEDA (Staged Event-Driven Architecture) est une proposition d'architecture qui a émergé de la thèse de Matt Welsh. Grossièrement, il s'agit de décomposer un traitement en une succession d'étapes. Ces étapes peuvent alors être distribuées sur plusieurs unités de traitements permettant une plus grande montée en charge. Chaque étape communique avec la suivante par l'intermédiaire de file en y soumettant la prochaine requête à effectuer. On retrouve dans cette approche les concepts de l'architecture par évènement (EDA)

SEDA

Photo de groupe

Quelques explications?

  1. Le client envoie une requête d'authentification.
  2. Le message est redistribué à un second Exchange qui est lié au premier avec le motif '#': tous les messages lui sont donc redistribués. La possibilité de lier un Exchange à un autre Exchange est une possibilité offerte (extension) par RabbitMQ qui n'est pas proposé dans AMQP.
  3. Le second Exchange a ainsi pour rôle de tracer les appels entrants: logger, monitorer, etc...
  4. Le message est aussi redistribué dans une file dédiée auth.# qui sera consommée par notre service d'authentification. Ce service peut lui aussi envoyer des informations/logs aux services de logging et monitoring en publiant sur l'Exchange correspondant.
  5. Une fois authentifié, le client peux consulter ses informations clientes en effectuant cette fois une requête (ie un message) avec une clé de routage correspondante, par exemple: users.458e92ab3.infos où 458e92ab3 correspond à son identifiant. Il est ainsi possible de rediriger la demande en fonction du serveur qui a en charge ce numéro de client (sharding), ou en loadbalacing sur les 3 machines qui ont été déployées pour faire face à cette charge.
  6. Les informations demandées sont alors retournées via la Queue qui aura été définie en reply-to. Les informations vérifiées, notre client va commencer à effectuer ses achats. Comme il s'agit du service qui sera le plus sollicité dans notre application, nous décidons de démarrer 6 machines qui consommeront la file correspondante (liée avec le motif orders.*).
  7. La réponse est ainsi retournée au client via websocket par exemple.
  8. Le traitement de la commande étant généralement asynchrone, lorsque celle-ci est acceptée nous décidons d'en informer le client en lui envoyant une notification: un mail par exemple et/ou...
  9. ...un message s'il est toujours en ligne.

Si la charge continue d'augmenter, il nous suffit de rajouter de nouvelles machines qui consommeront nos différentes Queues.

Webographie

Perf.

SEDA

NodeJS

Custom Exchange

Corrections

08/11/2012 - @hintjens: C'est iMatix qui a implémenter le premier Broker suite à la demande de JP Morgan Chase'n Co.

Plus de publications

Comments are closed.