ロジカルレプリケーションの紹介 (2)

ロジカルレプリケーション紹介の 2 回目となります。1 回目では、ロジカルレプリケーションの概要や基本的な使い方を紹介しました。今回はロジカルレプリケーションでトリガを利用する方法や、ロジカルレプリケーションの構成、運用時の注意点などについて紹介します。

検証環境

対象とする PostgreSQL のバージョンは12、サーバやユーザの名前などについては、1 回目と同様に下記の通りです。

  • パブリッシャとなる PostgreSQL のサーバ名を first_server、データベース名を first
  • サブスクライバとなる PostgreSQL のデータベース名を second
  • ロジカルレプリケーション用のユーザ名を logi_user

そのため、

  • first=# で始まる SQL はパブリッシャで実行するコマンド
  • second=# で始まる SQL はサブスクライバで実行するコマンド

となります。

サブスクライバでトリガ起動

パブリッシャよりデータ操作がレプリケーションされたことを契機にトリガを起動する方法について紹介します。

以下では

  • トリガ関数によって情報を書き込まれるテーブル
  • データ変更トリガに登録するトリガ関数
  • サブスクライバがデータを受け取ったときに起動するデータ変更トリガ

の 3 つを作成して、サブスクライバがデータを受け取った際に、トリガ起動する挙動を確認します。

時刻を記録するテーブルの作成

まずはトリガ発火時に、その時刻を記録するテーブルをサブスクライバに作成します。

second=# CREATE TABLE logical_table_history (id integer PRIMARY KEY, ope_type char(1), ope_timestamp timestamp);
CREATE TABLE

トリガ関数の作成

次に、トリガ発火時に実行する FUNCTION をサブスクライバに作成します。
戻り値を TRIGGER 型にすることで、データ変更トリガの発火時に実行されるトリガ関数として作成できます。

second=# CREATE OR REPLACE FUNCTION register_logical_table_history()
second=# RETURNS trigger AS $$
second=# BEGIN
second=#     IF NEW.id IS NULL THEN
second=#         -- DELETE 操作
second=#         UPDATE public.logical_table_history SET ope_type = substr(TG_OP, 1, 1), ope_timestamp = now() WHERE id = OLD.id;
second=#     ELSE
second=#         -- INSERT または UPDATE 操作
second=#         INSERT INTO public.logical_table_history VALUES (NEW.id, substr(TG_OP, 1, 1), now())
second=#         ON CONFLICT ON CONSTRAINT logical_table_history_pkey
second=#         DO UPDATE SET ope_type = substr(TG_OP, 1, 1), ope_timestamp = now();
second=#     END IF;
second=#     RETURN NULL;
second=# END;
second=# $$ LANGUAGE plpgsql;
CREATE FUNCTION

データ変更トリガの作成

続けて、サブスクライバに作成したトリガ関数を実行するためのデータ変更トリガを作成します。

ロジカルレプリケーションでは行レベル(FOR EACH ROW)のトリガのみサポートされていますので、契機として設定できる操作に TRUNCATE を含めることはできません。
関数呼び出しを行うタイミングを指定する BEFORE / AFTER については、制約はなく、どちらもサポートされています。

今回作成するトリガは、AFTER を指定し、契機となる操作を INSERT、UPDATE、DELETE としています。

second=# CREATE TRIGGER logical_table_trigger AFTER INSERT OR UPDATE OR DELETE ON logical_table FOR EACH ROW EXECUTE PROCEDURE register_logical_table_history();
CREATE TRIGGER

ちなみに、TRUNCATE を含めると、下記のようなエラーが発生します。

second=# CREATE TRIGGER logical_table_trigger AFTER TRUNCATE ON logical_table FOR EACH ROW EXECUTE PROCEDURE register_logical_table_history();
ERROR:  TRUNCATE FOR EACH ROW triggers are not supported

サブスクライバのテーブルに対して、ENABLE REPLICA TRIGGER の指定

ロジカルレプリケーションがデータ操作を適用するセッションは session_replication_role が replica となります。
そのため、ロジカルレプリケーションによるデータ操作時にトリガを発火させたい場合には、サブスクライバのテーブルに対して ENABLE REPLICA TRIGGER の指定を行う必要があります。

second=# ALTER TABLE logical_table ENABLE REPLICA TRIGGER logical_table_trigger;
ALTER TABLE

パブリッシャでデータ挿入

以上の対応を行うことで、サブスクライバがデータを受け取った際に、トリガを発火させて、トリガ関数を実行できるようになりました。

準備ができましたので、実際にパブリッシャ側でデータを挿入して動作を確認します。

first=# INSERT INTO logical_table VALUES (1, 1, 'trigger_test');
INSERT 0 1

サブスクライバ側でデータの状態を確認します。

second=# SELECT * FROM logical_table;
 id | no |     data
----+----+--------------
  1 |  1 | trigger_test
(1 行)

second=# SELECT * FROM logical_table_history;
 id | ope_type |       ope_timestamp
----+----------+----------------------------
  1 | I        | 2022-03-31 15:13:57.637665
(1 行)

データ操作を契機としたトリガ関数の実行により、logical_table_history テーブルにデータ操作の履歴が記録されたことを確認できました。

カスケードレプリケーションの設定

ロジカルレプリケーションは、サブスクライバから更に別のサブスクライバへレプリケーションさせることも可能です。

first → second → third という構成でロジカルレプリケーションを構成します。

first データベースにパブリケーションを作成します

すべてのテーブルを対象としたパブリケーションを作成します、

first=# CREATE PUBLICATION pb_logical_first FOR ALL TABLES;
CREATE PUBLICATION

second データベースにパブリケーションとサブスクリプションを作成します

second データベースでも同様にすべてのテーブルを対象としたパブリケーションを作成します。

second=# CREATE PUBLICATION pb_logical_second FOR ALL TABLES;
CREATE PUBLICATION

次に、first からデータ操作をサブスクライブするためのサブスクリプションを作成します。

second=# CREATE SUBSCRIPTION sb_logical_second CONNECTION 'host=first_server port=5432 user=logi_user dbname=first' PUBLICATION pb_logical_first;
CREATE SUBSCRIPTION

third データベースにサブスクリプションを作成します

second からデータ操作をサブスクライブするためのサブスクリプションを作成します。

third=# CREATE SUBSCRIPTION sb_logical_third CONNECTION 'host=second_server port=5432 user=logi_user dbname=second' PUBLICATION pb_logical_second;
CREATE SUBSCRIPTION

first データベースのテーブルにデータを追加します

パブリケーション、サブスクリプションをそれぞれ作成できましたので、first データベースの logical_table テーブルにデータを追加します。

first=# INSERT INTO logical_table VALUES (1, 1, 'from_first');
INSERT 0 1

first=# SELECT * FROM logical_table;
 id | no |   data
----+----+-----------
  1 |  1 | from_first
(1 行)

second データベースにデータ操作がレプリケーションされているかを確認します。

second=# SELECT * FROM logical_table;
 id | no |   data
----+----+-----------
  1 |  1 | from_first
(1 行)

first データベースでのデータ操作が、second データベースにレプリケーションされたことを確認できました。

次に、third データベースにデータがレプリケーションされているかを確認します。

third=# SELECT * FROM logical_table;
 id | no |   data
----+----+-----------
  1 |  1 | from_first
(1 行)

first で行ったデータ操作が、second を通じて third にレプリケーションされたことを確認できました。

カスケードレプリケーション時の注意点

どこかのデータベースが機能停止になった場合、そのデータベースからカスケードされているデータベース以降に、データ操作がレプリケーションされなくなります。

たとえば、今回の例でいいますと、second データベースが停止した場合、third データベースにもデータ操作がレプリケーションされなくなることにご注意ください。
その場合には、third データベースに作成した second データベースからデータ操作を受け取るサブスクリプションを削除し、新しく first データベースからデータ操作を受け取るサブスクリプションを作成する、などの対応が必要となります。

双方向レプリケーション

今度は、双方向のレプリケーションについて確認します。
first から second へのレプリケーションと、second から first へのレプリケーションを行う環境を構築します。

first データベースの環境構築

first データベースに、ロジカルレプリケーションを行うためのテーブル、パブリケーション、サブスクリプションをそれぞれ作成します。

first=# CREATE TABLE first_to_second (id INTEGER PRIMARY KEY, val TEXT);
CREATE TABLE

first=# CREATE TABLE second_to_first (id INTEGER PRIMARY KEY, val TEXT);
CREATE TABLE

first=# CREATE PUBLICATION pb_first_to_second FOR TABLE first_to_second;
CREATE PUBLICATION

first=# CREATE SUBSCRIPTION sb_second_to_first CONNECTION 'host=second_server port=5432 user=logi_user dbname=second' PUBLICATION pb_second_to_first;
CREATE SUBSCRIPTION

second データベースの環境構築

second データベースにも同様にテーブル、パブリケーション、サブスクリプションをそれぞれ作成します。

second=# CREATE TABLE first_to_second (id INTEGER PRIMARY KEY, val TEXT);
CREATE TABLE

second=# CREATE TABLE second_to_first (id INTEGER PRIMARY KEY, val TEXT);
CREATE TABLE

second=# CREATE PUBLICATION pb_second_to_first FOR TABLE second_to_first;
CREATE PUBLICATION

second=# CREATE SUBSCRIPTION sb_first_to_second CONNECTION 'host=first_server port=5432 user=logi_user dbname=first' PUBLICATION pb_first_to_second;
CREATE SUBSCRIPTION

レプリケーションの確認

first データベースおよび second データベースで、ロジカルレプリケーションを行う準備ができました。

first データベース側でデータを追加して挙動を確認します。

first=# INSERT INTO first_to_second VALUES (1, 'from_first');
INSERT 0 1

second データベースにレプリケーションされているかを確認します。

second=# SELECT * FROM first_to_second;
 id |    val
----+------------
  1 | from_first
(1 行)

無事にレプリケーションされました。
今度は、second データベース側でデータを追加します。

second=# INSERT INTO second_to_first VALUES (1, 'from_second');
INSERT 0 1

first データベースにレプリケーションされているかを確認します。

first=# SELECT * FROM second_to_first;
 id |     val
----+-------------
  1 | from_second
(1 行)

無事にレプリケーションされました。

ストリーミングレプリケーションでは、一方向のレプリケーションしかできませんでしたが、ロジカルレプリケーションでは双方向のレプリケーションを実現できます。
ただし、同じテーブルに対して(マルチマスタのような)双方向のレプリケーションは行えませんので、ご注意ください。

ロジカルレプリケーションが正常に動作しているかの監視方法

ロジカルレプリケーションが正常に動作しているかを確認する方法をご紹介します。

ロジカルレプリケーションが正常に動作しているかは、パブリッシャの「現在の WAL 書き込み位置」と、サブスクライバの「適用済み WAL の最終位置」を比較することで確認できます。

パブリッシャの現在の WAL 書き込み位置の確認

現在の WAL 書き込み位置は pg_current_wal_lsn 関数を実行することで確認できます。

first=# SELECT pg_current_wal_lsn();
 pg_current_wal_lsn
--------------------
 6/CD052A10
(1 行)

サブスクライバの「適用済み WAL の最終位置」の確認

サブスクライバの「適用済み WAL の最終位置」は

  • パブリッシャの pg_stat_replication.replay_lsn
  • サブスクライバの pg_stat_wal_receiver.received_lsn

のいずれかで確認できます。

今回はパブリッシャの pg_stat_replication.replay_lsn を確認します。

pg_stat_replication.replay_lsn が、サブスクライバに適用済み WAL の最終位置となります。

遅延が発生している場合には、replay_lag や flush_lag に遅延の時間が表示されます。replay_lag や flush_lag に値が表示されていない(NULL)なら、遅延は発生していないと判断できます。

pg_stat_replication にサブスクライバの情報が存在しない場合は、そもそもロジカルレプリケーションは正常に動作していません。

first=# SELECT * FROM pg_stat_replication;
-[ RECORD 1 ]----+------------------------------
pid              | 7400
usesysid         | 10
usename          | logi_user
application_name | sb_logical
client_addr      | 192.168.33.11
client_hostname  |
client_port      | 34358
backend_start    | 2022-03-31 16:27:44.879862+09
backend_xmin     |
state            | streaming
sent_lsn         | 6/CD052A10
write_lsn        | 6/CD052A10
flush_lsn        | 6/CD052A10
replay_lsn       | 6/CD052A10
write_lag        |
flush_lag        |
replay_lag       |
sync_priority    | 0
sync_state       | async
reply_time       | 2022-03-31 17:42:11.128553+09

今回の実行では、pg_stat_replication.replay_lsn と pg_current_wal_lsn 関数の結果が同一でしたので、遅延なくロジカルレプリケーションが行われていることが分かります。

ロジカルレプリケーションを運用する上での注意点

最後に、ロジカルレプリケーションを運用する上での注意点を紹介いたします。

pg_dump 時の注意点

pg_dump を行うユーザがスーパーユーザ権限を持っていない場合、サブスクリプションは pg_dump の対象となりません。
ロジカルレプリケーションが機能しているデータベースに対して pg_dump を行う場合、スーパーユーザ権限を持ったユーザで行う必要がありますので、ご注意ください。
以下で、実際にどのような挙動となるのかを紹介します。

非スーパーユーザで pg_dump 実行

非スーパーユーザで pg_dump を実行してみます。

$ pg_dump -d second -U logi_user > pg_dump_by_logi_user.dmp
pg_dump: 警告: 現在のユーザがスーパユーザではないため、サブスクリプションはダンプされません

非スーパーユーザでは、上記のような警告メッセージが表示され、出力されたダンプファイル内には、サブスクリプションが含まれません。

スーパーユーザで pg_dump 実行

今度はスーパーユーザで pg_dump を実行します。

-bash-4.2$ pg_dump -d second -U postgres > pg_dump_by_postgres.dmp
-bash-4.2$

スーパーユーザで実行すると警告なく pg_dump が完了し、ダンプファイルにサブスクリプションが含まれます。

パブリッシャにテーブルを追加した際の注意点

パブリッシャとサブスクライバがロジカルレプリケーションを機能している状態で、パブリッシャに新たにテーブルを追加しても、そのテーブルに対して行ったデータ操作はレプリケーションされないことにご注意ください。

サブスクライバ側でサブスクリプションのリフレッシュを行うことで、データ操作がレプリケーションされるようになります。

具体的な挙動を確認します。

まずパブリッシャで、すべてのテーブルを対象としたパブリケーションを作成します。

first=# CREATE PUBLICATION pb_all_table FOR ALL TABLES;
CREATE PUBLICATION

サブスクライバで、そのパブリケーションをサブスクライブするサブスクリプションを作成します。

second=# CREATE SUBSCRIPTION sb_all_table CONNECTION 'host=first_server port=5432 user=logi_user dbname=first' PUBLICATION pb_all_table;

これでロジカルレプリケーションが機能する状態となりました。

この状態で、パブリッシャで新たにテーブルを作成してデータを追加します。
ここでは add_table という名前のテーブルを作成しました。

first=# CREATE TABLE add_table (id INTEGER PRIMARY KEY, val TEXT);
CREATE TABLE

first=# INSERT INTO add_table VALUES (1, 'add');
INSERT 0 1

サブスクライバでも、同名のテーブルを作成し、データ操作がレプリケーションされるか確認します。

second=# CREATE TABLE add_table (id INTEGER PRIMARY KEY, val TEXT);
CREATE TABLE

second=# SELECT * FROM add_table;
 id | val
----+-----
(0 行)

予想どおり、新たに追加したテーブルに対しては、データ操作がレプリケーションされません。

この後、サブスクリプションをリフレッシュすることで、新たに追加したテーブルに対してもデータ操作がレプリケーションされるようになります。

second=# ALTER SUBSCRIPTION sb_all_table REFRESH PUBLICATION;
ALTER SUBSCRIPTION

second=# SELECT * FROM add_table;
 id | val
----+-----
  1 | add
(1 行)

無事に初期コピーが実行(※)され、データが同期されました。パブリッシャで行ったデータ操作も、レプリケーションされるようになっています。
このリフレッシュ操作は、CREATE SUBSCRIPTION または、最後にリフレッシュした後に、パブリケーションに追加されたテーブルが対象となります。
すでにレプリケーションが行われているテーブルに関しては、リフレッシュの対象とはならず、初期コピーは実行されません。

※オプションで、リフレッシュ時の初期コピーを行わないことも可能です。

外部キーで依存関係のあるテーブルに対する TRUNCATE を実施する場合の注意点

外部キーで依存関係があるテーブルに対して、TRUNCATE を実施した場合の挙動についても確認します。

パブリッシャのテーブルに対して TRUNCATE を実行した場合、サブスクライバにも TRUNCATE がレプリケーションされてデータが消去されます。
サブスクライバ側のテーブルに外部キーで依存関係があり、同一のサブスクリプションにその依存関係のテーブルが含まれていない場合、TRUNCATE は失敗します。

上記の説明を理解するために、実際に category(カテゴリ)、products(特産品) という 2 つのテーブルを用意して、外部キーで依存関係のあるテーブルに対する TRUNCATE の挙動を確認します。

パブリケーションの作成

products テーブルの category_no にはcategory テーブルの category_no を参照先とした外部キー制約が付与されていますが、category テーブルのみを対象としたパブリケーションを作成します。

first=# CREATE PUBLICATION pb_category FOR TABLE category;
CREATE PUBLICATION

サブスクリプションの作成

サブスクライバで、category テーブルのみをサブスクライブするサブスクリプションを作成します。

second=# CREATE SUBSCRIPTION sb_category CONNECTION 'host=first_server port=5432 user=logi_user dbname=first' PUBLICATION pb_category;
CREATE SUBSCRIPTION

TRUNCATE して挙動を確認

この状態でパブリッシャで category テーブルに対して TRUNCATE を行って挙動を検証します。

※category.category_no は products.category_no の依存先となっていますので、category テーブルに対する TRUNCATE は CASCADE をつけて実行します。

first=# TRUNCATE TABLE category CASCADE;
NOTICE: テーブル"products"へのカスケードを削除します
TRUNCATE TABLE

TRUNCATE が実行されましたので、データの内容を確認します。

first=# SELECT * FROM category;
 category_no | category_name
-------------+---------------
(0 行)

first=# SELECT * FROM products;
 products_no | products_name | category_no
-------------+---------------+-------------
(0 行)

パブリッシャでは問題なく category テーブルと、products テーブルが TRUNCATE されたことを確認できました。

次に、サブスクライバのテーブルの内容を確認します。

second=# SELECT * FROM category;
 category_no | category_name
-------------+---------------
           1 | 果物野菜
           2 | 魚介類海産物
           3 | 肉類乳製品
           4 | 郷土料理など
           5 | 工芸品民芸品
           6 | その他
(6 行)

second=# SELECT * FROM products;
 products_no | products_name | category_no
-------------+---------------+-------------
           1 | 夕張メロン    |           1
           2 | ジャガイモ    |           1
:
:
(128 行)

上記のように、サブスクライバ側では category および products テーブルは TRUNCATE されずにデータが存在しています。

これは、products テーブルがサブスクリプションに含まれていないので、category テーブルに連動して products テーブルを TRUNCATE できないことから、category テーブルへの TRUNCATE 自体が行われなかったためです。
この場合、products テーブルを category テーブルと同一のサブスクリプションに含めることで、パブリッシャで行った TRUNCATE がサブスクライバに正しくレプリケーションされるようになります。

おわりに

今回は、ロジカルレプリケーションの構成や注意点、ロジカルレプリケーションが機能しているかの確認方法を紹介いたしました。
パブリッシャとサブスクライバ間で単純にテーブルをレプリケーションするだけでなく、(ストリーミングレプリケーションと比べて)柔軟に構成できるのがロジカルレプリケーションの特徴です。
次回は、ロジカルレプリケーションで発生するコンフリクトやエラーについて紹介します。