Retour au blog

Traiter de grands volumes de données Doctrine grâce à Messenger

Comme promis dans mon précédent article "Rex : RabbitMQ et Supervisor avec Docker et le composant Messenger de Symfony", je reviens vous parler de grands volumes de données avec Doctrine.

Grands volumes & Doctrine ? Ce mec ne sait pas de quoi il parle, je m’en vais.

Et à priori je ne peux pas t’en vouloir, c’est même marqué dans la doc que c’est une mauvaise idée.

Sauf que moi, j’aime quand même bien Doctrine, surtout quand je dois créer une hiérarchie un peu complexe, avec des comportements du style de Gedmo. Bref, le genre de trucs pas fun à simuler en SQL pur.

Messenger à la rescousse

Deux des gros problèmes des longs scripts Doctrine, qui expliquent leur lenteur, sont :

  • d’une part : tous les imports se font séquentiellement.
  • d’autre part : l’EntityManager de Doctrine a tendance à ralentir plus il tourne longtemps, et malgré mes efforts, je n’ai pas réussi à contrer ce phénomène.

Traiter paralèllement toutes les pages de l'ensemble

Pour limiter le temps de chaque script, la solution qui m’est naturellement apparue était la pagination. Si on on considère que chaque page consiste à créer X objets aléatoires, chaque script ne s’occuperait que d’une partie bien délimitée de la masse de données à traiter, et le script suivant d’une autre partie.

Ainsi l’EntityManager est rechargé à chaque page, et ne tourne pas trop longtemps.

De plus, en utilisant les propriétés de la congruence, il est possible de paralléliser le traitement de plusieurs pages (tu vas voir, c’est plus simple que ce jargon mathématique ne le laisse penser) :

Pour paralléliser n pages il suffit de :

  • Pour chaque i entre 0 et n-1, on lance le traitement de la page i
  • S'il n'y a plus de résultats, on s'arrête
  • Sinon, à la fin de ce traitement on lance le traitement de la page i+n

On a donc récursivement le traitement parallèle de toutes les pages de l’ensemble.

Mais comment lancer le traitement de la page suivante à la fin d’un script ?

C’est la que Messenger entre en scène.

En effet, en partant du principe que chaque message contient les informations nécessaires au filtrage (l’entité concernée, etc.), la page concernée (le i) ainsi que le nombre de traitement simultanés (le n), on peut considérer le traitement de chaque page comme le simple traitement d’un message, et à la fin du traitement, renvoyer le même message pour la page i+n.

En combinant cette logique avec Supervisor pour pouvoir limiter le temps de vie de chaque consumer, on peut simplement traiter des scripts de plusieurs heures.

Et on peut utiliser cette simple logique pour également traiter de gros imports, dans le cadre d’une reprise d’existant par exemple.

Pour aller plus loin : et si je veux enchaîner plusieurs types de scripts ?

C’est souvent le cas si quand on fait des imports / des fixtures : on a besoin de faire un autre traitement, mais seulement une fois que le premier script est terminé avec succès. Du fait de la parallélisation des traitements, il devient complexe de savoir si cette condition est remplie.

Complexe, certes. Impossible, sûrement pas.

L’approche que j’ai adoptée repose sur plusieurs prérequis :

  • On parlera de plusieurs imports successifs, chacun composé de plusieurs séries, chacune composée de pages déclenchées par un message.
  • Une date de création de la première série de messages connue et identique. C’est le paramètre qui servira d’identifiant à l’import, et on le passera à chaque message.
  • Un moyen d’accès à une ressource partagée. J’ai utilisé une combinaison du Cache Component pour la ressource partagée, et du Lock Component pour la gestion concurrente.
  • Une valeur de retour du traitement d’une page, qui permet de savoir si une page contenait des résultats (et donc si l’import doit continuer). J’ai utilisé un booléen.
  • Une séquence des différents traitements à faire, afin de savoir quel traitement un message doit réaliser, et quel import réaliser une fois que l’import courant sera terminé.
public function __invoke(Message $message): void
{
   try {
       // fait les traitements nécessaires, et retourne vrai si on considère que cette page est une des pages de fin
       $over = $this->handleExecution($message,$debut, $fin);
       if (!$over) {
           // On a pas terminé la série en cours, on va donc traiter la page n + step
           $message->setPage($message->getPage() + $message->getStep());
           $this->bus->dispatch($message);
           return;
       }

       $importKey = $message->getImportKey();
       // On utilise la date de fin, qui est toujours définie, et la même pour tous les messages concurrents dans tous les cas
       $count = $this->updateCount($importKey, $fin);

       if ($count < $message->getPageStep()) {
           // On a pas encore terminé l'import, il reste des séries 
           return;
       }       

       // toutes les séries de l'import sont arrivées à complétion, et on doit lancer le prochain import
       $key = \array_search($importKey, $this->importSequence, true);
       if ($key === false) {
           // on a pas trouvé la clef, on ne pourra pas trouver de suivant, donc on sort
           return;
       }
       if (\array_key_exists($key + 1, $this->importSequence)) {
           $nextKey = $this->importSequence[$key + 1]; 
           // on a trouvé la clef, on lance donc les séries de l'import suivant
           for ($startingPage = 1; $startingPage < 1 + $message->getPageStep(); $startingPage++) {
               $this->bus->dispatch(new Message(
                   $nextKey,
                   $fin,
                   null,
                   $startingPage, 
                   $message->getPageStep()               
               ));
           }
       } else {
           // il n'y a pas de clef suivante : on est donc à la fin de tous les imports
           // on peut éventuellement faire des traitements de fin
       }
}

Et la partie sur le cache et le lock que j’ai teasée juste avant me diras-tu ? Et bien, c’est utilisé pour la gestion du compteur de pages terminées, dans le méthode updateCount :

public function updateCount($importKey, $fin): int
{
     $dateAsString = $fin->format("U.u");
      // on doit mettre un lock ici, pour s'assurer que deux threads concurrents d'import ne vont pas incrémenter le count n'importe comment
      // @var Symfony\Component\Lock\LockInterface $lock
      $lock = $this->lockFactory->createLock($importKey);
      $lock->acquire(true);
      // on recupère la valeur stockée, avec null par défaut
      $item = $this->cache->get($importKey, static function () {
         return null;
      });
      // puis on la supprime
      // @var Symfony\Contracts\Cache\CacheInterface $this->cache
      $this->cache->delete($importKey);
      if (!isset($item[$dateAsString])) {
          // la date a changé, on est donc sur un autre import, il faut repartir a 1
          $item = $this->cache->get($importKey, static function () use ($dateAsString) {
          return [$dateAsString => 1];
       });
      } else {
          // la date est la même, on incrémente donc de 1
          $item = $this->cache->get($importKey, static function () use ($dateAsString, $item) {
          return [$dateAsString => $item[$dateAsString] + 1];
      });
    }
    // on relache le lock pour pouvoir 
    $lock->release();
    return $item[$dateAsString];
}

 

On peut voir dans cette méthode l’utilisation du Cache Component et du Lock Component  sans avoir besoin d’expliciter l’implémentation du cache (en l’occurence j’aime bien l’implémentation doctrine dans la mesure où elle permet de vérifier les valeurs utilisées, mais ça fonctionne exactement de la même façon avec n’importe quelle implémentation).

Conclusion

Grâce à ces quelques astuces, on a vu que malgré les mises en garde, il était finalement possible de traiter de grands volumes de données en tirant partie de la parallélisation.