Hive est une technologie Big Data capable de manipuler de très important volume de données. Pour ce faire, il existe plusieurs fonctionnalités sur hive pour découper la données en ensemble plus petit.
Comme la plupart des technologies liées à la data, il existe une possibilité de partitionner la donnée. L'avantage de Hive par rapport à d'autres technologies est qu'il peut utiliser des clés composites comme clé de partitionnement. Et le partitionnement n'est pas le seul moyen de découper une table en plus petits ensemble, il existe le bucketing et les skewed tables.

La contrainte du bucketing sur la taille des fichiers

Le bucketing est une technique très intéressante, elle permet de clusteriser les données dans un nombre de fichier maitrisé et de trier physiquement les données en fonction d'une ou plusieurs clés. C'est un levier de performance notamment pour les indicateurs de type distinct count sur la clé de bucketing.

Chaque insertion (transaction) insère un groupe de nouveaux fichiers, répartis dans les dossiers en fonction du partitionnement et répartis en fonction du nombre de bucket définis au niveau de la table.

Avoir une multitude de petits fichiers impacte notablement les performances. Il est préférable d'avoir sur Hive et tout particulièrement en ORC des fichiers supérieur à 1Gb.
Il existe une commande pour merger les petits fichiers pour les tables partitionnées ou non mais sans bucketing.

ALTER TABLE table_name [PARTITION (partition_key = 'partition_value' [, ...])] CONCATENATE

Le problème est que cette commande ne fonctione pas pour les tables bucketisées. Elle n'est pas capable de réunir les différents fichiers clusterisées.

Il existe néanmoins une manière pour regrouper les petits fichiers issus de multiples chargements, la compaction.

Une solution, la compaction ?

Le prérequis pour utiliser la compaction est d'utiliser ACID et d'activer la transaction sur la table.

La compaction peut être déclenchée automatiquement ou bien en tâche de fond selon un paramétrage spécifier au serveur.

Pour plus de détails sur la compaction :
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.4/bk_data-access/content/understanding-administering-compactions.html

Cette compaction utilise des jobs Map Reduce pour le faire et dans le cadre d'un cluster Interactive Query (comme c'est le cas pour moi) il est important de vérifier que le paramétrage du service Map Reduce est compatible avec le paramétrage des daemons LLAP.
Lorsque l'on active la transactionnalité sur une table, des répertoires dit "delta" sont créés à chaque chargement.


Dans le répertoire 1 on peut y voir le fruit d'une grosse transaction.

Dans le répertoire 2, le fruit d'une toute petite transaction a créé 13 petits fichiers d'une centaine de kilo octet (bucket 13).

Ce sont ces fichiers que nous allons vouloir merger afin d'avoir des plus gros fichiers plus en phase avec le stripe size de 64Mb définit pour nos tables stockées en ORC.

Nous rencontrons de graves problèmes de performance avec cette méthode de merging "automatique" de fichier. En effet pour activer la transaction il faut retirer le tri sur le bucket. C'est déjà une énorme concession à faire que de perdre le tri sur nos buckets. Mais nous sommes prêts à faire ce sacrifice.

En poussant encore plus loin le test nous nous apercevons que ACID nous fait perdre le cache de nos tables stockées en ORC, cache qui est la raison d'être de LLAP sans cache, les performances sont tellement dégradées que cette possibilité de merging de fichier devient inenvisageable.

Pour information nous sommes en HDP 2.6 sur HDInsight et le problème est connu de Hortonworks :

https://community.hortonworks.com/articles/149894/llap-a-one-page-architecture-overview.html

Une solution à base de permutation de partition ?

Issu d'une carrière de Data Warehousing, déformation professionnelle oblige, je pense directement, pour contourner ce problème à une solution autour d'un switch de partition. Parfait sur Hive, il existe la commande EXCHANGE PARTITION qui permet de permuter les partitions (https://cwiki.apache.org/confluence/display/Hive/Exchange+Partition).

Sur la base de cette commande nous allons pouvoir définir un workflow pour "défragmenter" les partitions les plus fragmenter lors d'un plan de maintenance.

    L'idée du workflow est :

  1. d'insérer le contenu d'une partition dans une table clone de la table à défragmenter
  2. de lancer un ANALYZE TABLE sur la partition
  3. de DROP la partition fragmentée
  4. de faire un EXCHANGE de la partition défragmenté vers la partition détruite précédemment
Catégories : BigData

Arnaud Voisin

Data Engineer chez DCube

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *