Blog ENI : Toute la veille numérique !
-25€ dès 75€ sur les livres en ligne, vidéos... avec le code FUSEE25. J'en profite !
Accès illimité 24h/24 à tous nos livres & vidéos ! 
Découvrez la Bibliothèque Numérique ENI. Cliquez ici
  1. Livres et vidéos
  2. Java Spring
  3. Reactor Core
Extrait - Java Spring Construisez vos applications réactives avec une architecture micro-services en environnement Jakarta EE (2e édition)
Extraits du livre
Java Spring Construisez vos applications réactives avec une architecture micro-services en environnement Jakarta EE (2e édition) Revenir à la page d'achat du livre

Reactor Core

Introduction

Le principal cas d’usage de l’utilisation de Reactor Core est la création d’une application web, intranet ou extranet, avec Spring WebFlux, le pendant de Spring MVC en mode réactif.

Cependant, il peut être intéressant dans certains cas d’utiliser directement Reactor Core pour :

  • programmer un serveur comme nous le ferions avec Vert.x

  • programmer plusieurs serveurs web dans une même application

  • programmer des serveurs qui ne servent pas des pages HTML via HTTP

  • avoir des serveurs de mocks pour les tests d’intégration

Nous allons aborder dans ce chapitre des éléments plutôt théoriques, car la librairie Spring Reactor Core est rarement utilisée seule pour plusieurs raisons :

  • Complexité : Spring Reactor Core est une bibliothèque puissante mais complexe qui implémente le paradigme de programmation réactive. Utiliser Reactor Core seule peut demander une courbe d’apprentissage assez raide, car elle nécessite une compréhension approfondie des concepts de flux, de mono et de l’opérateur de composition fonctionnelle.

  • Manque de facilité d’intégration : Spring Reactor Core fournit principalement une base pour la programmation réactive et ne propose pas directement des fonctionnalités spécifiques aux applications, telles que le traitement des requêtes...

Reactor Core

La programmation réactive est un concept spécial. Elle implique la propagation d’événements ou de signaux, comme des boules de flipper interagissant les unes avec les autres. En utilisant un style de programmation fonctionnelle, nous définissons les parcours possibles des boules, puis les lançons en parallèle pour qu’elles se déplacent simultanément. Chaque boule change de comportement lorsqu’elle interagit ou entre en collision avec d’autres éléments, ou lorsqu’elle atteint un point spécifique dans le temps. Le rôle du moteur est de gérer les interactions en appelant le code approprié pour chaque événement. Il les met en file d’attente dans une boucle d’événement et les exécute séquentiellement, ce qui donne l’impression d’une exécution parallèle depuis l’extérieur.

Les briques de base des Reactive Streams sont :

  • Subscriber : l’abonné (subscriber) est l’objet qui reçoit les éléments émis par l’éditeur (publisher). Il consomme les éléments et les traite selon la logique définie dans ses méthodes. Lorsqu’un Publisher émet un nouvel élément, il est transmis au Subscriber via la méthode onNext(). Le Subscriber peut également gérer les erreurs en utilisant la méthode onError() et indiquer la fin du flux de données avec la méthode onComplete().

  • Subscription : l’abonnement (subscription) entre le Publisher et le Subscriber. Lorsqu’un Subscriber s’abonne à un Publisher, le Publisher envoie une Subscription au Subscriber. Cette Subscription permet au Subscriber de demander un certain nombre d’éléments au Publisher en utilisant la méthode request(long n). Cette méthode spécifie combien d’éléments le Subscriber souhaite recevoir.

  • Processor : le processeur est une interface qui combine les rôles de Publisher et de Subscriber. Il peut être vu comme un élément intermédiaire dans le flux de données, où les éléments sont émis par un Publisher, traités par le Processor, puis consommés...

Méthode subscribe()

La méthode subscribe() est l’une des méthodes fondamentales en programmation réactive avec Reactor. Elle permet de déclencher l’exécution du flux réactif en s’abonnant à ce flux. Lorsqu’un flux est créé, il est inactif tant qu’aucune souscription n’est effectuée. La souscription déclenche l’émission des éléments du flux et l’exécution des traitements définis dans les différentes étapes du flux.

1. Surcharge

La méthode subscribe() a différentes surcharges qui permettent de spécifier comment le flux doit être consommé. En effet, il s’agit de définir comment traiter les éléments émis par le flux, comment gérer les erreurs éventuelles et comment réagir lorsque le flux est terminé.

  • subscribe() ne prend aucun argument et ne définit aucun traitement particulier pour les événements émis par le flux. Cela signifie que le flux sera simplement exécuté, mais les événements émis par le flux ne seront pas traités.

Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);  
flux.subscribe(); 
  • subscribe(Consumer<? super T> consumer) prend en argument un consommateur (Consumer) qui définit le traitement à appliquer sur chaque élément émis par le flux. Le consommateur reçoit les éléments du flux un par un et peut effectuer des actions spécifiques sur chaque élément.

Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);  
flux.subscribe(item -> System.out.println("Élément : " + item)); 
  • subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer) permet de spécifier à la fois un consommateur pour traiter les éléments du flux et un consommateur pour traiter les erreurs éventuellement émises par le flux.

Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);  
flux.subscribe(  
    item -> System.out.println("Élément : " + item),  
    error -> System.err.println("Erreur : " + error.getMessage())  
); 
  • subscribe(Consumer<?...

Utilisation du BaseSubscriber

En programmation réactive, nous utilisons généralement la programmation fonctionnelle avec les lambdas pour définir nos traitements. Cependant, dans certaines situations, il peut être utile de créer un Subscriber personnalisé pour gérer les événements d’un flux réactif de manière spécifique. C’est là que la classe utilitaire BaseSubscriber entre en jeu.

BaseSubscriber est une classe utilitaire fournie par Reactor qui simplifie la création de souscripteurs personnalisés pour les flux réactifs. Elle fournit une implémentation de base ainsi que des méthodes de gestion des événements-clés, ce qui facilite la personnalisation du comportement du souscripteur en fonction des besoins spécifiques de l’application. Pour créer un Subscriber personnalisé, il suffit d’étendre la classe BaseSubscriber et de substituer les méthodes appropriées en fonction de nos besoins. Par exemple, en implémentant la méthode hookOnNext(), nous pouvons spécifier l’action à effectuer lorsqu’un nouvel élément est émis par le flux. De même, en implémentant la méthode hookOnError(), nous pouvons gérer les erreurs qui surviennent pendant le traitement du flux.

Il est important de noter que les instances de BaseSubscriber (ou de ses sous-classes) sont à usage unique. Cela signifie qu’un BaseSubscriber annulera automatiquement son abonnement au premier Publisher s’il est abonné à un second Publisher. Cette gestion automatique garantit que le souscripteur est utilisé de manière appropriée dans le contexte réactif.

Le BaseSubscriber de Reactor propose plusieurs...

Générer une séquence manuellement

Nous pouvons générer les séquences de façon synchrone ou asynchrone.

1. Modèle synchrone

Les données asynchrones dans un flux proviennent généralement de sources externes telles que des connexions réseau ou des signaux déclenchés par des planificateurs. Cependant, il peut être utile de générer nous-mêmes une séquence de données. Pour cela, Reactor offre la méthode generate(), qui permet de créer un flux infini d’éléments en utilisant une logique de génération personnalisée. La méthode generate() prend deux paramètres : un fournisseur de l’état initial (stateSupplier) avec une fonction génératrice qui prend l’état courant et un SynchronousSink en tant que paramètres. Le fournisseur de l’état initial est utilisé pour initialiser l’état de la génération, et la fonction génératrice est appelée à chaque itération pour générer un nouvel élément à émettre dans le flux.

Par exemple, pour générer une séquence de nombres entiers de manière infinie, nous pouvons utiliser generate() avec un état initial de zéro et une fonction génératrice qui émet l’état courant et met à jour l’état en ajoutant 1 à chaque itération.

Flux<Integer> infiniteSequence = Flux.generate(  
    () -> 0, // État initial  
    (state, sink) -> {  
        sink.next(state); // Émet l'élément dans le flux  
        return state + 1; // Met à jour l'état  
    }  
); 

La méthode generate() de Reactor permet de générer des flux de données personnalisés de manière flexible. Nous pouvons l’utiliser pour générer des séquences numériques, des données aléatoires ou même récupérer des données à partir de sources externes....

Méthode handle()

La méthode handle() de la classe Flux est utilisée pour gérer de manière personnalisée les éléments émis par le flux. Elle permet de transformer les éléments du flux ou d’effectuer des traitements spécifiques sur chaque élément. La signature de la méthode handle() est la suivante :

Flux<T> handle(BiConsumer<T, SynchronousSink<R>> handler) 

Le paramètre handler est un BiConsumer qui prend deux arguments : l’élément émis par le flux (T) et un SynchronousSink<R> utilisé pour émettre les éléments résultants du traitement (R). Le SynchronousSink est un objet qui permet de générer de manière synchrone des éléments dans un flux. Il est utilisé pour émettre les éléments résultants du traitement à partir du handler. Dans le handler, vous pouvez effectuer différentes actions sur l’élément reçu, telles que le transformer en un nouvel élément (R) à émettre dans le flux, l’ignorer, le filtrer ou effectuer toute autre logique de traitement nécessaire.

Voici un exemple :

Flux<Integer> originalFlux = Flux.range(1, 10);  
  
Flux<String> processedFlux = originalFlux.handle((number...

Scheduler

Spring Reactor offre aux développeurs plusieurs possibilités pour gérer l’ordonnancement des tâches grâce à l’utilisation d’ordonnanceurs (schedulers), aussi appelés planificateurs. Ceux-ci permettent de contrôler l’exécution des tâches de manière asynchrone et efficace. Ils s’utilisent avec les méthodes publishOn() et subscribeOn() pour contrôler le contexte d’exécution dans lequel les opérations du flux sont effectuées. Ces méthodes permettent de spécifier les schedulers à utiliser pour différentes parties du flux, ce qui peut avoir un impact significatif sur la concurrence et les performances de l’application.

La méthode publishOn() permet de spécifier le scheduler sur lequel les opérations suivantes seront exécutées. Ainsi, toutes les opérations qui viennent après l’appel à publishOn() seront effectuées dans le contexte du scheduler spécifié. Cela peut être utile pour déléguer le traitement asynchrone à un autre thread, particulièrement pour les opérations coûteuses en temps ou bloquantes. 

Par exemple :

    Flux.range(1, 10)  
      .publishOn(Schedulers.boundedElastic()) // Utilise le 
scheduler boundedElastic() pour le traitement asynchrone   
      .map(value -> {  
        // Code de traitement asynchrone  ...

Traiter correctement les cas d’erreurs

Avec Spring Reactor Core, vous pouvez gérer les erreurs de différentes manières. Voici quelques approches courantes pour gérer les erreurs dans un flux.

1. Opérateur onErrorResume

L’opérateur onErrorResume() permet de capturer une erreur et de remplacer le flux qui l’a générée par un autre flux. Il est utilisé comme suit :

Flux<Integer> flux = Flux.just(1, 2, 3)  
    .map(i -> {  
        if (i == 2) {  
            throw new RuntimeException("Erreur !");  
        }  
        return i;  
    })  
    .onErrorResume(e -> Flux.just(10, 20, 30));  
  
flux.subscribe(  
    data -> System.out.println("Donnée : " + data),  
    error -> System.err.println("Erreur : " + error),  
    () -> System.out.println("Terminé")  
); 

2. Opérateur onErrorReturn

L’opérateur onErrorReturn() permet de capturer une erreur et de renvoyer une valeur de remplacement spécifiée....

Capture et renvoi d’exceptions

1. Gestion des exceptions

Lors de l’utilisation de Reactor, la gestion des exceptions dans les opérateurs ou les fonctions se présente ainsi :

  • Tous les opérateurs peuvent potentiellement déclencher des exceptions ou appeler des fonctions de rappel qui peuvent échouer, donc ils contiennent tous une forme de gestion d’erreur.

  • Les exceptions non vérifiées sont toujours propagées via le signal onError. Par exemple, une RuntimeException lancée dans une fonction map() se traduit par un événement onError.

  • Reactor définit certaines exceptions, telles que OutOfMemoryError, comme étant toujours considérées comme fatales et elles sont lancées plutôt que propagées.

  • Dans certains cas, une exception non vérifiée ne peut pas être propagée en raison de situations de concurrence pendant les phases d’abonnement et de demande, et, dans ces cas, l’erreur est abandonnée.

Pour gérer les exceptions vérifiées, vous pouvez utiliser try et catch, ou bien vous pouvez encapsuler l’exception vérifiée dans une exception non vérifiée en utilisant Exceptions.propagate().

  • Si vous devez renvoyer un Flux et traiter une exception vérifiée, vous pouvez utiliser Flux.error(exceptionVerifiee) pour créer un Flux produisant une erreur.

  • La classe utilitaire Exceptions de Reactor peut vous aider à encapsuler et à obtenir l’exception originale si nécessaire.

Exemple d’utilisation de Exceptions.propagate() pour traiter une IOException dans une fonction map() :

public String convert(int i) throws IOException {  
    if (i > 3) {  
        throw new IOException("boom " + i);  
    }  
    return "OK " + i;  
}  
  
Flux<String> converted = Flux  
    .range(1, 10)  
    .map(i -> {  
        try { return convert(i); }  
        catch (IOException e) { throw Exceptions.propagate(e); } 
    });  ...

Sinks

En Reactor, un sink offre la possibilité de déclencher manuellement des signaux de manière indépendante. Il crée ainsi une structure similaire à celle d’un éditeur de publication (Publisher) capable de gérer plusieurs abonnés (Subscriber), à l’exception des variantes unicast (communication de un vers un).

Les sinks de base exposés par Reactor Core garantissent que l’utilisation multithread est détectée et ne peut pas entraîner de violations de spécifications ou de comportements indéfinis du point de vue des abonnés en aval. Lorsque vous utilisez l’API tryEmit*, les appels parallèles échouent rapidement. Lorsque vous utilisez l’API emit*, l’EmissionFailureHandler fourni peut permettre de réessayer en cas de contentieux (par exemple, boucle occupée), sinon le sink se terminera avec une erreur. Cela représente une amélioration par rapport à Processor.onNext(), qui doit être synchronisé de manière externe ou peut entraîner un comportement indéfini du point de vue des abonnés en aval.

La classe Sinks est généralement privilégiée par rapport aux objets Processor, car elle offre une approche plus sûre et plus simple pour la gestion de signaux. Les constructeurs de sinks fournissent une API guidée pour les principaux types de producteurs pris en charge. Certains comportements trouvés dans Flux, tels que onBackpressureBuffer(), sont également présents dans dans la classe Sinks.

On trouve plusieurs catégories de sinks :...

Transformations personnalisées sur le flux

Il existe un certain nombre d’opérateurs qui sont utilisés pour effectuer des transformations personnalisées sur un flux :

  • transform() : cet opérateur permet de transformer un flux en utilisant une fonction qui prend en paramètre le flux d’entrée et renvoie un nouveau flux. La transformation est effectuée dès que l’opérateur est appelé, même avant que le flux ne soit souscrit.

  • transformDeferred() : cet opérateur est similaire à transform(), mais la transformation est différée jusqu’à ce que quelqu’un ait souscrit au flux. Cela permet de reporter la transformation et de l’appliquer uniquement lorsque des abonnés s’y intéressent.

  • transformDeferredContextual() : cet opérateur permet de transformer chaque élément du flux en utilisant une fonction qui prend en paramètres l’élément courant du flux et le ContextView. La transformation est différée jusqu’à ce que le flux possède un souscripteur, ce qui permet d’appliquer des transformations contextuelles spécifiques à chaque élément du flux.

Voici un exemple simple utilisant l’opérateur transform() pour transformer un flux de chaînes en un flux de leurs longueurs :

public class TransformExample {  
  public static void main(String[] args) {  
    // Créer un flux de chaînes  
    Flux<String> stringFlux = Flux.just("rouge", "vert", "orange"); 
  
    // Transformer le flux en un flux de longueurs de chaînes  
    Flux<Integer> lengthFlux = 
stringFlux.transform(stringFluxTransformer()); 
  
    // Souscrire au flux de longueurs  
    lengthFlux.subscribe(length -> 
System.out.println("Longueur : " + length));  
  }  
  
  private static Function<? super Flux<String>,? 
extends Publisher<Integer>> stringFluxTransformer() {  
    return flux -> flux.map(s -> s.length());  
  }  ...

Global hooks

Les global hooks (crochets globaux) sont des points d’extension qui permettent aux développeurs de modifier le comportement global de certains aspects-clés du framework. Autrement dit, ils centralisent la personnalisation de fonctionnalités qui s’appliquent à l’ensemble du flux réactif de l’application.

1. Hooks de suppression

Les hooks de suppression (dropping hooks) sont appelés lorsqu’une source d’opérateur ne se conforme pas à la spécification Reactive Streams. Ces erreurs sortent du chemin d’exécution habituel, c’est-à-dire qu’elles ne peuvent pas être transmises par le biais du signal onError. Cela se produit généralement quand un Publisher fait appel à onNext() sur l’opérateur, alors qu’il a précédemment appelé onCompleted() sur celui-ci. Dans cette situation, la valeur renvoyée par onNext() est abandonnée. Il en est de même pour un signal onError non nécessaire. Les hooks associés, onNextDropped() et onErrorDropped(), vous donnent la possibilité d’assigner un consumer global pour ces suppressions. Par exemple, vous pouvez l’utiliser pour enregistrer la suppression et libérer les ressources associées à une valeur si nécessaire (puisqu’elle n’atteint jamais le reste de la chaîne réactive). Si vous définissez les hooks à la suite, c’est un processus additif : chaque consumer que vous assignez est invoqué. Les hooks peuvent être complètement réinitialisés à leurs paramètres par défaut en utilisant les méthodes Hooks.resetOn*Dropped().

Par exemple :

  public void onNextDroppedFailReplaces() {  
    AtomicReference<Object> dropHook = new AtomicReference<>(); 
    Publisher<Integer> p = s -> {  
      s.onSubscribe(Operators.emptySubscription());  
      s.onNext(1);  
      s.onNext(2);  
      s.onNext(3);  
    };  
    List<Integer> seen = new ArrayList<>();  ...

API Context

L’API Context est utilisée pour propager des informations de manière réactive à travers les Flux et les Mono. Elle permet de partager des données entre les différentes étapes du traitement réactif sans avoir à utiliser des variables globales ou des références partagées. La classe Context représente un contexte associé à un flux ou un mono. Elle est utilisée pour stocker des données sous forme de paires clé-valeur. Chaque flux ou mono peut avoir son propre contexte, et lorsqu’un flux ou un mono émet une valeur, il peut également propager son contexte aux opérateurs suivants.

1. Structure du Context

Context est une interface qui rappelle Map. Elle stocke des paires clé-valeur et vous permet de récupérer une valeur stockée par sa clé.

Quelques remarques concernant le contexte :

  • Les clés et les valeurs sont de type Object, donc une instance de Context (et ContextView) peut contenir un nombre quelconque de valeurs très diverses provenant de différentes bibliothèques et sources.

  • Un Context est immuable. Il ne peut pas être modifié après sa création. Les méthodes d’écriture telles que put() et putAll() renvoient une nouvelle instance du contexte avec les modifications apportées.

L’interface Context possède, depuis la version 3.4.0, une version simplifiée destinée à la lecture seule, appelée ContextView. Celle-ci n’expose aucune méthode d’écriture. Autrement dit, ContextView ne sert qu’à lire dans un contexte.

  • Vous pouvez vérifier si une clé est présente avec hasKey(Object key).

  • Utilisez getOrDefault(Object key, T defaultValue) pour récupérer une valeur (convertie en T) ou pour utiliser une valeur par défaut si l’instance de Context ne contient pas cette clé.

  • Utilisez getOrEmpty(Object key) pour obtenir un Optional<T> (l’instance de Context tente de convertir la valeur stockée en T).

  • Utilisez put(Object key, Object value) pour stocker une paire clé-valeur, ce qui renvoie une nouvelle instance de contexte. Vous pouvez également fusionner deux contextes en un nouveau en utilisant putAll(ContextView).

  • Utilisez...

Tests

L’approche TDD (Test Driven Development, soit développement piloté par les tests) exige des tests unitaires et des tests d’intégration Spring.

Nous commençons par ajouter la dépendance reactor-test :

<dependency>  
    <groupId>io.projectreactor</groupId>  
    <artifactId>reactor-test</artifactId>  
    <scope>test</scope>  
    (1)  
</dependency> 

Cette dépendance offre des fonctionnalités supplémentaires pour faciliter le test des applications basées sur Spring Reactor. Les principaux apports de cette dépendance sont les suivants :

  • La mise en place du scheduler. La dépendance permet de simuler différents types de schedulers (comme Schedulers.parallel() ou Schedulers.single()) pour les tests. Cela facilite la vérification des comportements asynchrones sans avoir besoin d’attendre réellement les délais d’exécution.

  • La classe StepVerifier, qui permet de vérifier facilement les séquences d’événements émis par des Flux et des Mono. Avec StepVerifier, vous pouvez vérifier si les Flux ou Mono émettent les éléments attendus, s’ils se terminent correctement, et s’ils génèrent des erreurs lorsqu’elles sont attendues.

  • La classe VirtualTimeScheduler, qui offre la possibilité de contrôler le temps virtuel lors des tests pour les opérations de délai et de planification. Cela permet de simuler des délais sans devoir attendre réellement le temps écoulé.

  • La classe TestSubscriber, qui permet de tester un Flux ou un Mono en souscrivant avec un TestSubscriber, ce qui facilite l’inspection des événements émis, des erreurs et des signaux de complétion.

  • La classe StepVerifierOptions, qui permet de configurer les options du StepVerifier, telles que l’activation ou la désactivation du mode debug, qui affiche des informations de débogage supplémentaires lors de l’exécution du StepVerifier.

La dépendance reactor-test fournit des outils puissants pour tester et valider le comportement des Flux et des Mono dans le contexte de tests unitaires...

Réacteur de débogage

Le mode de débogage de Reactor (Reactor Debug) est une fonctionnalité de Spring Reactor Core qui permet de faciliter le débogage et la compréhension du flux des événements lors du développement d’applications réactives. Il offre des outils pour visualiser les interactions entre les différents opérateurs et souscripteurs, ainsi que pour surveiller l’état du flux à différents moments de son exécution.

Pour activer le mode de débogage, vous pouvez utiliser les méthodes doOnSubscribe(), doOnNext(), doOnError(), doOnComplete() et doOnTerminate() avec l’opérateur log() qui affiche des informations de débogage à chaque étape du flux.

Voici un exemple d’utilisation des opérations doOnSubscribe(), doOnNext(), doOnError(), doOnComplete(), doOnTerminate() et log() pour activer le débogage dans un projet Reactor.

Flux.just("A", "B", "C", "D")  
  .log() // Affiche les informations de log  
  // Déclenché à la souscription  
  .doOnSubscribe(subscription -> System.out.println("Flux  
subscribed!"))  
  // Déclenché à chaque nouvel élément  
  .doOnNext(element -> System.out.println("Next element: " + 
element))  
  // Déclenché en cas d'erreur  
  .doOnError(error -> System.out.println("An error has occurred: " 
+ error.getMessage()))  
  // Déclenché lorsque le flux se termine normalement  
  .doOnComplete(() -> System.out.println("Flux complete!"))  
  // Déclenché lorsque le flux est terminé pour une quelconque  
autre raison 
  .doOnTerminate(() -> System.out.println("Flux termination  
(either onComplete or onError)!"))  
  .subscribe(); 

Ce programme...

Conclusion

En conclusion, l’utilisation de Reactor Core offre une approche puissante et flexible pour la programmation réactive en Java. Grâce à sa mise en œuvre des spécifications Reactor et Reactive Streams, il permet de gérer efficacement des flux de données asynchrones, tout en fournissant un ensemble complet d’opérateurs pour la transformation, la combinaison et la gestion des flux.

Avec Reactor Core, les développeurs peuvent créer des pipelines de traitement de données réactives, où les traitements sont effectués de manière asynchrone et non bloquante, permettant ainsi une meilleure utilisation des ressources système et une extensibilité accrue. Les opérateurs fournis permettent de traiter des erreurs, de gérer la contre-pression et de contrôler l’ordonnancement des tâches, ce qui facilite le développement d’applications réactives robustes et performantes.

Les schedulers offrent également un contrôle précis sur l’exécution des tâches, permettant de spécifier sur quel thread ou scheduler les opérations doivent être effectuées, ce qui facilite la gestion de la concurrence et de la parallélisation.

En outre, Reactor Core fournit des mécanismes pour gérer les abonnements et la durée de vie des flux, notamment...