Reviers, le 26 février 2024
Depuis sa version 10, PostgreSQL propose de la réplication logique. Cette
réplication se base sur un module de décodage logique appelé pgoutput
mais
ce n’est pas le seul module de décodage logique. Il en existe plus d’une dizaine
actuellement, et il est possible de coder le sien. Dans cet article, nous allons
faire un retour rapide sur la réplication logique, le décodage logique pour
aborder à la fin la création d’un module de décodage logique et le client qui
utilisera ce module.
Réplication logique
La réplication existe dans PostgreSQL depuis la version 9.0 (donc depuis 2010, ce qui commence à être vieux). Cette réplication est excellente pour avoir un PRA mais elle souffre de limitations intrinsèques difficiles à corriger.
L’une des plus importantes est que le système de réplication ne sait pas ce qu’il réplique. Il sait qu’il doit écrire tel et tel octet à tel emplacement dans tel fichier, mais il ne sait pas à quelle opération cela correspond :
- est-ce une écriture dans une table ? un index ? une vue matérialisée ?
- est-ce le résultat d’une commande DDL ? ou d’un INSERT ? d’un DELETE ?
- quelle base cela concerne-t-il ?
La conséquence directe est que cette réplication a une granularité inexistante : c’est toute l’instance ou rien.
Les données qui voyagent entre le primaire et le secondaire sont en binaire, et donc leur codage dépend fortement du serveur primaire. En conséquence, les serveurs secondaires doivent tous avoir la même version majeure, les mêmes options de compilation, être installés sur la même architecture matérielle (par exemple tous sur x86_64, ou tous sur ARMv8) et la même architecture logicielle (notamment la même librairie C).
Enfin, autre point limitant et c’est un peu une lapalissade, le secondaire est en lecture seule donc nous ne pouvons rien écrire dessus. Pas d’index supplémentaire pour l’utilisation spécifique du secondaire, même pas de tables temporaires, etc.
La réplication logique répond à ces besoins. Les données envoyées ne sont pas les requêtes mais le résultat des requêtes, dans un format texte décodable partout. Des informations supplémentaires sont envoyées comme le nom des objets SQL et celui des opérations SQL concernées.
Les deux serveurs sont disponibles en lecture/écriture. Ce sont deux serveurs autonomes. Ils peuvent être de versions différentes, installées sur des systèmes différents. Un serveur va proposer une « publication » des données, dans laquelle des tables seront sélectionnées, voire juste certaines lignes ou colonnes. Le serveur qui veut recevoir ces données va s’abonner à la publication.
Voici un petit exemple de mise en place sur deux instances, s1 (port 5432) et s2 (port 5433). Ils sont sur la même machine, et ils sont tous les deux en version
- Voici la configuration spécifique de s1 :
port = 5432
wal_level = logical
Et voici la configuration spécifique de s2 :
port = 5433
Sur s1, nous allons créer une base, y ajouter une table et la remplir. Voici le
script SQL pour le faire à partir de psql
:
CREATE DATABASE b1;
\c b1
CREATE TABLE t1 (c1 integer, c2 text);
ALTER TABLE t1 ADD PRIMARY KEY (c1);
INSERT INTO t1 SELECT i, 'Ligne '||i FROM generate_series(1, 1000) i;
Nous pouvons maintenant créer une publication sur la base b1. Nous allons le faire ici pour toutes les tables mais nous aurions pu n’en sélectionner que quelques-unes. La création de la publication se fait ainsi :
CREATE PUBLICATION pub_s1 FOR ALL TABLES;
Passons maintenant au serveur s2. Pour s’abonner, il faut déjà disposer des objets SQL. Nous allons créer une base b1, et sa table t1 mais sans les données. Le plus simple est d’utiliser la commande ci-dessous :
pg_dump --port 5432 --schema-only b1 | psql --port 5433 b1
Nous pouvons maintenant créer la souscription à l’abonnement pub_s1
. Cela se
fait ainsi :
CREATE SUBSCRIPTION sub_pub_s1
CONNECTION 'port=5432 dbname=b1'
PUBLICATION pub_s1;
À partir de là, une synchronisation initiale des données est effectuée pour peupler la table. Toute modification survenant sur cette table du serveur s1 sera automatiquement reproduite sur la table t1 du serveur s2. Le serveur s2 est disponible en écriture : nous pouvons y ajouter des objets et des données.
Les données de réplication passent toujours par les journaux de transactions
disponibles sur le primaire. Un slot de réplication créé automatiquement permet
de conserver les journaux jusqu’à consommation par le serveur s2. Voici ce
qu’indique la vue pg_replication_slots
sur le serveur s1 :
b1=# SELECT * FROM pg_replication_slots \gx
-[ RECORD 1 ]-------+-----------
slot_name | sub_pub_s1
plugin | pgoutput
slot_type | logical
datoid | 16388
database | b1
temporary | f
active | t
active_pid | 95464
xmin |
catalog_xmin | 746
restart_lsn | 0/1C70DB8
confirmed_flush_lsn | 0/1C72E70
wal_status | reserved
safe_wal_size |
two_phase | f
conflicting | f
Nous avons donc bien un slot de réplication logique pour l’abonnement du serveur
s2 à la publication pub_s1. Ce slot est actif et permanent (non temporaire). Il
utilise le plugin pgoutput
. Reste à savoir ce qu’est un plugin…
Plugin ou module de décodage logique
Un plugin (en VO) ou module de décodage logique (en VF) est une librairie C qui va récupérer les enregistrements des journaux de transactions, les décoder et traiter les informations utiles pour lui.
Plusieurs modules existent déjà. Le module pgoutput
est celui conçu et
utilisé pour la réplication logique native de PostgreSQL. Le module
test_decoding
est fourni avec le code de PostgreSQL comme exemple de module
pour les développeurs. Et enfin, il en existe une dizaine listés sur la page
« Logical Decoding Plugins » du wiki
PostgreSQL,
pour transformer les informations des journaux en un format cible :
JSON, ordres Mongo, et même SQL.
Il est possible d’utiliser un module logique via l’interface SQL grâce à quelques fonctions :
pg_create_logical_replication_slot()
pour créer un slot de réplication logique ;pg_logical_slot_get_changes()
etpg_logical_slot_peek_changes()
pour récupérer les changements à partir de ce slot ;pg_drop_replication_slot()
pour supprimer le slot.
Voici une petite démonstration avec le module test_decoding
qui fait partie
des modules contrib de la distribution PostgreSQL :
-- Ces ordres peuvent être exécutés dans des sessions différentes sur la base b1
-- Création du slot de réplication logique
b1=# SELECT pg_create_logical_replication_slot('testslot', 'test_decoding');
pg_create_logical_replication_slot
------------------------------------
(testslot,0/1C73000)
(1 row)
-- Récupération des changements
-- (aucun car aucun INSERT/UPDATE/DELETE/TRUNCATE entre temps)
b1=# SELECT * FROM pg_logical_slot_get_changes('testslot', NULL, NULL);
lsn | xid | data
-----+-----+------
(0 rows)
-- Insertion de deux lignes
b1=# INSERT INTO t1 VALUES (-1, 'Ligne -1'), (-2, 'Ligne -2');
INSERT 0 2
-- Récupération des changements
-- on voit bien les 2 lignes avec les valeurs des deux colonnes
b1=# SELECT * FROM pg_logical_slot_get_changes('testslot', NULL, NULL);
lsn | xid | data
-----------+-----+-------------------------------------------------------------
0/1C73000 | 746 | BEGIN 746
0/1C73000 | 746 | table public.t1: INSERT: c1[integer]:-1 c2[text]:'Ligne -1'
0/1C73D78 | 746 | table public.t1: INSERT: c1[integer]:-2 c2[text]:'Ligne -2'
0/1C73DF0 | 746 | COMMIT 746
(4 rows)
-- Insertion d'une ligne
b1=# INSERT INTO t1 VALUES (0, 'pgsession');
INSERT 0 1
-- Récupération des changements
-- on voit bien la nouvelle ligne avec les valeurs des deux colonnes
b1=# SELECT * FROM pg_logical_slot_get_changes('testslot', NULL, NULL);
lsn | xid | data
-----------+-----+-------------------------------------------------------------
0/1C73E28 | 747 | BEGIN 747
0/1C73E28 | 747 | table public.t1: INSERT: c1[integer]:0 c2[text]:'pgsession'
0/1C73EA0 | 747 | COMMIT 747
(3 rows)
-- Modification d'une ligne
b1=# UPDATE t1 SET c2=upper(c2) WHERE c1<0;
UPDATE 2
-- Récupération des changements
-- on voit bien 2 lignes (celles modifiées par l'ordre)
-- avec les nouvelles valeurs calculées
b1=# SELECT * FROM pg_logical_slot_get_changes('testslot', NULL, NULL);
lsn | xid | data
-----------+-----+-------------------------------------------------------------
0/1C97180 | 748 | BEGIN 748
0/1C97180 | 748 | table public.t1: UPDATE: c1[integer]:-2 c2[text]:'LIGNE -2'
0/1C971D0 | 748 | table public.t1: UPDATE: c1[integer]:-1 c2[text]:'LIGNE -1'
0/1C97250 | 748 | COMMIT 748
(6 rows)
-- Suppression du slot
b1=# SELECT pg_drop_replication_slot('testslot');
pg_drop_replication_slot
--------------------------
(1 row)
Le module décode bien toutes les informations logiques.
Codage de notre module
Si les modules actuellement disponibles ne satisfont pas notre besoin, il est tout à fait possible de créer son propre module. Le document sur le décodage logique intègre un chapitre sur les fonctions callback utilisables pour créer un module de décodage logique.
Cette documentation indique que quatre fonctions sont requises :
_PG_output_plugin_init
pour initialiser les pointeurs vers les fonctions callback ;begin_cb()
pour traiter un début de transaction ;change_cb()
pour traiter un changement (donc une instruction DML en écriture) ;commit_cb()
pour traiter une fin de transaction.
Comme d’habitude, nous avons placé le code sur notre dépôt GitHub et mis en place des tags pour suivre plus facilement.
Le tag j5e0 correspond à notre squelette de module de décodage logique. Il contient un fichier Makefile identique aux autres modules que nous avons déjà codés et un fichier plugin_audit.c contenant le code de base de notre plugin.
En ligne 55 se trouve le code d’initialisation des pointeurs vers les fonctions callback. Cela ressemble là aussi beaucoup au code de nos autres modules.
En ligne
65,
nous avons le code de la fonction callback startup_cb()
. Cette fonction va
allouer de la mémoire pour une structure qui contiendra notre contexte mémoire
pour le jour où on en aura besoin. Et en ligne
94,
la fonction callback shutdown_cb()
désalloue cette mémoire.
Les fonctions callback begin_cb()
et commit_cb
sont requises donc elles
existent, mais sont vides.
La fonction qui nous intéresse principalement pour ce module est la fonction
change_cb()
. Cette fonction est appelée pour chaque ligne insérée, supprimée
ou modifiée. La structure LogicalDecodingContext nous permet de renvoyer un
texte qui sera affiché par la fonction SQL pg_logical_slot_get_changes
. Dans
ce squelette, le texte est une constante mais nous allons y apporter des
changements dans les exemples suivants. Son contenu est disponible en ligne
107.
La compilation se fait comme d’habitude :
$ cd journee5/audit
$ make
$ make install
Avec cette dernière commande, la bibliothèque partagée est copiée dans le
répertoire lib
de PostgreSQL. L’emplacement exact peut se retrouver en
utilisant la commande suivante :
$ pg_config --libdir
Pour tester l’utilisation de notre module de décodage logique, nous allons reprendre l’exemple précédent :
b1=# SELECT pg_create_logical_replication_slot('testslot', 'plugin_audit');
pg_create_logical_replication_slot
------------------------------------
(testslot,0/1C9C640)
(1 row)
b1=# INSERT INTO t1 VALUES (-3, 'Ligne -3'), (-4, 'Ligne -4');
INSERT 0 2
b1=# SELECT * FROM pg_logical_slot_get_changes('testslot', NULL, NULL);
lsn | xid | data
-----------+-----+-------
0/1C9C640 | 755 | yeah!
0/1C9C6C8 | 755 | yeah!
(2 rows)
b1=# SELECT pg_drop_replication_slot('testslot');
pg_drop_replication_slot
--------------------------
(1 row)
Nous avons donc bien deux lignes affichées correspondant à l’action pour chaque
ligne insérée, et la colonne data
contient bien notre texte.
Notre module fonctionne mais il n’est pas très informatif. Essayons maintenant de récupérer le nom de la table visée par l’opération DML.
Le code se trouve sur le tag
j5e1.
L’identifiant de la table fait partie du troisième argument de la fonction
callback change_cb
. Récupérer son nom qualifié du nom du schéma demande à lire
le contenu de la table pg_class
au moyen de quelques fonctions.
Après compilation et tests, voici le résultat suite à une écriture dans la table t1 :
b1=# INSERT INTO t1 VALUES (-5, 'Ligne -5');
INSERT 0 1
b1=# SELECT * FROM pg_logical_slot_get_changes('testslot', NULL, NULL);
lsn | xid | data
-----------+-----+-----------
0/1C9C8D8 | 756 | public.t1
(1 row)
La colonne data
indique bien maintenant le nom de la table.
On pourrait aussi vouloir récupérer l’opération réalisée. Le code résultant
fait partie du tag
j5e2. Cette
fois, nous utilisons l’argument change
pour récupération l’opération
effectuée. Et voici le résultat :
b1=# INSERT INTO t1 VALUES (-6, 'Ligne -6');
INSERT 0 1
b1=# SELECT * FROM pg_logical_slot_get_changes('testslot', NULL, NULL);
lsn | xid | data
-----------+-----+------------------
0/1C9F6F0 | 757 | public.t1 INSERT
(1 row)
Plein d’autres informations sont récupérables, comme notamment les noms des
colonnes et leurs nouvelles valeurs. Le code du module d’exemple
test_decoding
est très intéressant à ce regard.
Maintenant que nous avons un module de décodage logique capable de reconnaître
les tables ciblées par des opérations DML, plutôt que de passer par psql
,
nous allons coder un client qui va l’utiliser.
Codage d’un client qui utilise notre module
Le tag j5e3 ajoute le code squelette d’un client ressemblant beaucoup à ce que vous avions écrit dans Hack’PG #4.
Le fichier Makefile est un peu modifié pour générer l’application cliente et le module de décodage logique. Le code du client se trouve dans le fichier audit.c. Ce code ne fait que se connecter à une base, indique le nom de la table donnée en argument de la commande, puis se déconnecte.
Le code précédent n’est qu’un squelette de client. Maintenant, il nous faut ajouter notre code spécifique pour la gestion du module de décodage logique. En effet, cet outil doit tout d’abord créer le slot de réplication. Puis, une boucle va permettre de récupérer les changements au fur et à mesure. Une fois la boucle terminée, le slot est supprimé. Le code correspondant à cette description est disponible sur le tag j5e4.
Pour tester l’application, il faut deux terminaux : un avec psql
pour lancer
des requêtes DML et l’autre avec l’application pour voir si elle récupère bien
le nom des tables impactées. Par exemple, avec l’option -e
qui permet
d’afficher les requêtes exécutées par l’outil, et donc de suivre son exécution.
$ ./audit --echo -d b1 -p5432 t1
SELECT pg_catalog.set_config('search_path', '', false);
audit: Auditing table "t1"...
SELECT * FROM pg_create_logical_replication_slot('audit_235578', 'plugin_audit', false, true);
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
[...]
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
public.t1 INSERT
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
[...]
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
public.t1 INSERT
public.t1 INSERT
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
[...]
SELECT * FROM pg_logical_slot_get_changes('audit_235578', NULL, NULL);
SELECT * FROM pg_drop_replication_slot('audit_235578');
Faire une boucle de 60 secondes, c’est un peu bête. Mieux vaudrait une boucle infinie qu’un Control-C arrêterait. Par contre, il faut récupérer l’interruption pour nettoyer le slot de réplication créé au début. Une solution facile et intelligente serait d’utiliser un slot temporaire, mais il est intéressant de voir la gestion d’une interruption. Le tag j5e5 montre justement comment intercepter l’interruption pour exécuter son propre code.
Cette application demande la table à tracer en paramètre, mais pour l’instant, elle trace les requêtes DML pour toutes les tables. Il reste donc à ajouter le code qui vérifie que la requête concerne bien la table ciblée. Cela correspond au code disponible sur le tag j5e6.
Un tel outil peut servir de base à un outil d’audit des changements sur une base.
Conclusion
Voici notre tour sur la réplication et le décodage logique terminé. La création de modules de décodage logique ouvre la porte à de nombreuses nouvelles applications, que ce soit pour de la réplication, de l’audit ou du CDC.
À noter que notre VIP tech de la pgsession de cette année a été Bertrand Drouvot, qui nous a justement parlé des dernières nouveautés sur la réplication logique dans PostgreSQL.