On Fri, 25 Aug 2023 at 17:40, vignesh C <vignes...@gmail.com> wrote: > > On Sat, 19 Aug 2023 at 11:53, Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > It's entirely possible for a logical slot to have a confirmed_flush > > LSN higher than the last value saved on disk while not being marked as > > dirty. It's currently not a problem to lose that value during a clean > > shutdown / restart cycle but to support the upgrade of logical slots > > [1] (see latest patch at [2]), we seem to rely on that value being > > properly persisted to disk. During the upgrade, we need to verify that > > all the data prior to shudown_checkpoint for the logical slots has > > been consumed, otherwise, the downstream may miss some data. Now, to > > ensure the same, we are planning to compare the confirm_flush LSN > > location with the latest shudown_checkpoint location which means that > > the confirm_flush LSN should be updated after restart. > > > > I think this is inefficient even without an upgrade because, after the > > restart, this may lead to decoding some data again. Say, we process > > some transactions for which we didn't send anything downstream (the > > changes got filtered) but the confirm_flush LSN is updated due to > > keepalives. As we don't flush the latest value of confirm_flush LSN, > > it may lead to processing the same changes again. > > I was able to test and verify that we were not processing the same > changes again. > Note: The 0001-Add-logs-to-skip-transaction-filter-insert-operation.patch > has logs to print if a decode transaction is skipped and also a log to > mention if any operation is filtered. > The test.sh script has the steps for a) setting up logical replication > for a table b) perform insert on table that need to be published (this > will be replicated to the subscriber) c) perform insert on a table > that will not be published (this insert will be filtered, it will not > be replicated) d) sleep for 5 seconds e) stop the server f) start the > server > I used the following steps, do the following in HEAD: > a) Apply 0001-Add-logs-to-skip-transaction-filter-insert-operation.patch > patch in Head and build the binaries b) execute test.sh c) view N1.log > file to see that the insert operations were filtered again by seeing > the following logs: > LOG: Filter insert for table tbl2 > ... > ===restart=== > ... > LOG: Skipping transaction 0/156AD10 as start decode at is greater 0/156AE40 > ... > LOG: Filter insert for table tbl2 > > We can see that the insert operations on tbl2 which was filtered > before server was stopped is again filtered after restart too in HEAD. > > Lets see that the same changes were not processed again with patch: > a) Apply v4-0001-Persist-to-disk-logical-slots-during-a-shutdown-c.patch > from [1] also apply > 0001-Add-logs-to-skip-transaction-filter-insert-operation.patch patch > and build the binaries b) execute test.sh c) view N1.log file to see > that the insert operations were skipped after restart of server by > seeing the following logs: > LOG: Filter insert for table tbl2 > ... > ===restart=== > ... > Skipping transaction 0/156AD10 as start decode at is greater 0/156AFB0 > ... > Skipping transaction 0/156AE80 as start decode at is greater 0/156AFB0 > > We can see that the insert operations on tbl2 are not processed again > after restart with the patch.
Here is another way to test using pg_replslotdata approach that was proposed earlier at [1]. I have rebased this on top of HEAD and the v5 version for the same is attached. We can use the same test as test.sh shared at [2]. When executed with HEAD, it was noticed that confirmed_flush points to WAL location before both the transaction: slot_name slot_type datoid persistency xmin catalog_xmin restart_lsn confirmed_flush two_phase_at two_phase plugin --------- --------- ------ ---------- ---- ----------- ----------- --------------- ------------ --------- ------ sub logical 5 persistent 0 735 0/1531E28 0/1531E60 0/0 0 pgoutput WAL record information generated using pg_walinspect for various records at and after confirmed_flush WAL 0/1531E60: row_number | start_lsn | end_lsn | prev_lsn | xid | resource_manager | record_type | record_length | main_data_length | fpi_length | description | block_ref ------------+-----------+-----------+-----------+-----+------------------+---------------------+---------------+------------------+------------+------------------------------------------------------------------- --------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------- 1 | 0/1531E60 | 0/1531EA0 | 0/1531E28 | 0 | Heap2 | PRUNE | 57 | 9 | 0 | snapshotConflictHorizon: 0, nredirected: 0, ndead: 1, nunused: 0, redirected: [], dead: [1], unused: [] | blkref #0: rel 1663/5/1255 fork main blk 58 2 | 0/1531EA0 | 0/1531EE0 | 0/1531E60 | 735 | Heap | INSERT+INIT | 59 | 3 | 0 | off: 1, flags: 0x08 | blkref #0: rel 1663/5/16384 fork main blk 0 3 | 0/1531EE0 | 0/1531F20 | 0/1531EA0 | 735 | Heap | INSERT | 59 | 3 | 0 | off: 2, flags: 0x08 | blkref #0: rel 1663/5/16384 fork main blk 0 4 | 0/1531F20 | 0/1531F60 | 0/1531EE0 | 735 | Heap | INSERT | 59 | 3 | 0 | off: 3, flags: 0x08 | blkref #0: rel 1663/5/16384 fork main blk 0 5 | 0/1531F60 | 0/1531FA0 | 0/1531F20 | 735 | Heap | INSERT | 59 | 3 | 0 | off: 4, flags: 0x08 | blkref #0: rel 1663/5/16384 fork main blk 0 6 | 0/1531FA0 | 0/1531FE0 | 0/1531F60 | 735 | Heap | INSERT | 59 | 3 | 0 | off: 5, flags: 0x08 | blkref #0: rel 1663/5/16384 fork main blk 0 7 | 0/1531FE0 | 0/1532028 | 0/1531FA0 | 735 | Transaction | COMMIT | 46 | 20 | 0 | 2023-08-27 23:22:17.161215+05:30 | 8 | 0/1532028 | 0/1532068 | 0/1531FE0 | 736 | Heap | INSERT+INIT | 59 | 3 | 0 | off: 1, flags: 0x08 | blkref #0: rel 1663/5/16387 fork main blk 0 9 | 0/1532068 | 0/15320A8 | 0/1532028 | 736 | Heap | INSERT | 59 | 3 | 0 | off: 2, flags: 0x08 | blkref #0: rel 1663/5/16387 fork main blk 0 10 | 0/15320A8 | 0/15320E8 | 0/1532068 | 736 | Heap | INSERT | 59 | 3 | 0 | off: 3, flags: 0x08 | blkref #0: rel 1663/5/16387 fork main blk 0 11 | 0/15320E8 | 0/1532128 | 0/15320A8 | 736 | Heap | INSERT | 59 | 3 | 0 | off: 4, flags: 0x08 | blkref #0: rel 1663/5/16387 fork main blk 0 12 | 0/1532128 | 0/1532168 | 0/15320E8 | 736 | Heap | INSERT | 59 | 3 | 0 | off: 5, flags: 0x08 | blkref #0: rel 1663/5/16387 fork main blk 0 13 | 0/1532168 | 0/1532198 | 0/1532128 | 736 | Transaction | COMMIT | 46 | 20 | 0 | 2023-08-27 23:22:17.174756+05:30 | 14 | 0/1532198 | 0/1532210 | 0/1532168 | 0 | XLOG | CHECKPOINT_SHUTDOWN | 114 | 88 | 0 | redo 0/1532198; tli 1; prev tli 1; fpw true; xid 0:737; oid 16399; multi 1; offset 0; oldest xid 723 in DB 1; oldest multi 1 in DB 1; oldest/newest commit timestamp xid: 0/0; oldest running xid 0; shutdown | Whereas the same test executed with the patch applied shows that confirmed_flush points to CHECKPOINT_SHUTDOWN record: slot_name slot_type datoid persistency xmin catalog_xmin restart_lsn confirmed_flush two_phase_at two_phase plugin --------- --------- ------ ----------- --- ----------- ----------- --------------- ----------- --------- ------ sub logical 5 persistent 0 735 0/1531E28 0/1532198 0/0 0 pgoutput WAL record information generated using pg_walinspect for various records at and after confirmed_flush WAL 0/1532198: row_number | start_lsn | end_lsn | prev_lsn | xid | resource_manager | record_type | record_length | main_data_length | fpi_length | description | block_ref ------------+-----------+-----------+-----------+-----+------------------+---------------------+---------------+------------------+------------+------------------------------------------------------------------- --------------------------------------------------------------------------------------------------------------------------------------------+----------- 1 | 0/1532198 | 0/1532210 | 0/1532168 | 0 | XLOG | CHECKPOINT_SHUTDOWN | 114 | 88 | 0 | redo 0/1532198; tli 1; prev tli 1; fpw true; xid 0:737; oid 16399; multi 1; offset 0; oldest xid 723 in DB 1; oldest multi 1 in DB 1; oldest/newest commit timestamp xid: 0/0; oldest running xid 0; shutdown | (1 row) [1] - https://www.postgresql.org/message-id/flat/CALj2ACW0rV5gWK8A3m6_X62qH%2BVfaq5hznC%3Di0R5Wojt5%2Byhyw%40mail.gmail.com [2] - https://www.postgresql.org/message-id/CALDaNm2BboFuFVYxyzP4wkv7%3D8%2B_TwsD%2BugyGhtibTSF4m4XRg%40mail.gmail.com Regards, Vignesh
From 934f2d32bf6d3b3bb8ae0fcf334aa371fc95de19 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Sun, 27 Aug 2023 22:03:27 +0530 Subject: [PATCH v5] pg_replslotdata TODO: Display invalidation information for replication slot. --- src/backend/replication/slot.c | 39 --- src/bin/Makefile | 1 + src/bin/pg_replslotdata/.gitignore | 2 + src/bin/pg_replslotdata/Makefile | 44 +++ src/bin/pg_replslotdata/nls.mk | 6 + src/bin/pg_replslotdata/pg_replslotdata.c | 362 ++++++++++++++++++++++ src/bin/pg_replslotdata/t/001_basic.pl | 11 + src/include/replication/slot.h | 97 +----- src/include/replication/slot_common.h | 147 +++++++++ 9 files changed, 574 insertions(+), 135 deletions(-) create mode 100644 src/bin/pg_replslotdata/.gitignore create mode 100644 src/bin/pg_replslotdata/Makefile create mode 100644 src/bin/pg_replslotdata/nls.mk create mode 100644 src/bin/pg_replslotdata/pg_replslotdata.c create mode 100644 src/bin/pg_replslotdata/t/001_basic.pl create mode 100644 src/include/replication/slot_common.h diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index bb09c4010f..567d61540a 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -53,45 +53,6 @@ #include "storage/procarray.h" #include "utils/builtins.h" -/* - * Replication slot on-disk data structure. - */ -typedef struct ReplicationSlotOnDisk -{ - /* first part of this struct needs to be version independent */ - - /* data not covered by checksum */ - uint32 magic; - pg_crc32c checksum; - - /* data covered by checksum */ - uint32 version; - uint32 length; - - /* - * The actual data in the slot that follows can differ based on the above - * 'version'. - */ - - ReplicationSlotPersistentData slotdata; -} ReplicationSlotOnDisk; - -/* size of version independent data */ -#define ReplicationSlotOnDiskConstantSize \ - offsetof(ReplicationSlotOnDisk, slotdata) -/* size of the part of the slot not covered by the checksum */ -#define ReplicationSlotOnDiskNotChecksummedSize \ - offsetof(ReplicationSlotOnDisk, version) -/* size of the part covered by the checksum */ -#define ReplicationSlotOnDiskChecksummedSize \ - sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize -/* size of the slot data that is version dependent */ -#define ReplicationSlotOnDiskV2Size \ - sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize - -#define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 3 /* version for new files */ - /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; diff --git a/src/bin/Makefile b/src/bin/Makefile index 373077bf52..1db7dd00d4 100644 --- a/src/bin/Makefile +++ b/src/bin/Makefile @@ -23,6 +23,7 @@ SUBDIRS = \ pg_controldata \ pg_ctl \ pg_dump \ + pg_replslotdata \ pg_resetwal \ pg_rewind \ pg_test_fsync \ diff --git a/src/bin/pg_replslotdata/.gitignore b/src/bin/pg_replslotdata/.gitignore new file mode 100644 index 0000000000..13a4afb8ef --- /dev/null +++ b/src/bin/pg_replslotdata/.gitignore @@ -0,0 +1,2 @@ +/pg_replslotdata +/tmp_check/ diff --git a/src/bin/pg_replslotdata/Makefile b/src/bin/pg_replslotdata/Makefile new file mode 100644 index 0000000000..69518ee53b --- /dev/null +++ b/src/bin/pg_replslotdata/Makefile @@ -0,0 +1,44 @@ +#------------------------------------------------------------------------- +# +# Makefile for src/bin/pg_replslotdata +# +# Copyright (c) 1998-2021, PostgreSQL Global Development Group +# +# src/bin/pg_replslotdata/Makefile +# +#------------------------------------------------------------------------- + +PGFILEDESC = "pg_replslotdata - provides information about the replication slots from $PGDATA/pg_replslot/<slot_name> $PGDATA/pg_replslot/<slot_name>" +PGAPPICON=win32 + +subdir = src/bin/pg_replslotdata +top_builddir = ../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = \ + $(WIN32RES) \ + pg_replslotdata.o + +all: pg_replslotdata + +pg_replslotdata: $(OBJS) | submake-libpgport + $(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) + +install: all installdirs + $(INSTALL_PROGRAM) pg_replslotdata$(X) '$(DESTDIR)$(bindir)/pg_replslotdata$(X)' + +installdirs: + $(MKDIR_P) '$(DESTDIR)$(bindir)' + +uninstall: + rm -f '$(DESTDIR)$(bindir)/pg_replslotdata$(X)' + +clean distclean maintainer-clean: + rm -f pg_replslotdata$(X) $(OBJS) + rm -rf tmp_check + +check: + $(prove_check) + +installcheck: + $(prove_installcheck) diff --git a/src/bin/pg_replslotdata/nls.mk b/src/bin/pg_replslotdata/nls.mk new file mode 100644 index 0000000000..74bee593c9 --- /dev/null +++ b/src/bin/pg_replslotdata/nls.mk @@ -0,0 +1,6 @@ +# src/bin/pg_replslotdata/nls.mk +CATALOG_NAME = pg_replslotdata +AVAIL_LANGUAGES = cs de el es fr ja ko pl ru sv tr uk vi zh_CN +GETTEXT_FILES = $(FRONTEND_COMMON_GETTEXT_FILES) pg_replslotdata.c +GETTEXT_TRIGGERS = $(FRONTEND_COMMON_GETTEXT_TRIGGERS) +GETTEXT_FLAGS = $(FRONTEND_COMMON_GETTEXT_FLAGS) diff --git a/src/bin/pg_replslotdata/pg_replslotdata.c b/src/bin/pg_replslotdata/pg_replslotdata.c new file mode 100644 index 0000000000..aed5d6750c --- /dev/null +++ b/src/bin/pg_replslotdata/pg_replslotdata.c @@ -0,0 +1,362 @@ +/*------------------------------------------------------------------------- + * + * pg_replslotdata.c - provides information about the replication slots + * from $PGDATA/pg_replslot/<slot_name>. + * + * Copyright (c) 2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_replslotdata/pg_replslotdata.c + *------------------------------------------------------------------------- + */ +/* + * We have to use postgres.h not postgres_fe.h here, because there's so much + * backend-only stuff in the XLOG include files we need. But we need a + * frontend-ish environment otherwise. Hence this ugly hack. + */ +#define FRONTEND 1 + +#include "postgres.h" + +#include <dirent.h> +#include <sys/stat.h> + +#include "access/xlog.h" +#include "access/xlog_internal.h" +#include "common/logging.h" +#include "common/string.h" +#include "getopt_long.h" +#include "pg_getopt.h" +#include "replication/slot_common.h" + +static bool verbose = false; + +static void process_replslots(void); +static void read_and_display_repl_slot(const char *name); + +static void +usage(const char *progname) +{ + printf(_("%s displays information about the replication slots from $PGDATA/pg_replslot/<slot_name>.\n\n"), progname); + printf(_("Usage:\n")); + printf(_(" %s [OPTION] [DATADIR]\n"), progname); + printf(_("\nOptions:\n")); + printf(_(" [-D, --pgdata=]DATADIR data directory\n")); + printf(_(" -V, --version output version information, then exit\n")); + printf(_(" -v, --verbose write a lot of output\n")); + printf(_(" -?, --help show this help, then exit\n")); + printf(_("\nIf no data directory (DATADIR) is specified, " + "the environment variable PGDATA\nis used.\n\n")); + printf(_("Report bugs to <%s>.\n"), PACKAGE_BUGREPORT); + printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL); +} + +static void +process_replslots(void) +{ + DIR *rsdir; + struct dirent *rsde; + uint32 cnt = 0; + + rsdir = opendir("pg_replslot"); + if (rsdir == NULL) + { + pg_log_error("could not open directory \"%s\": %m", "pg_replslot"); + exit(1); + } + + /* XXX: comment here about the format spefiiers */ + printf("%-64s %9s %10s %11s %10s %12s %21s %21s %21s %21s %10s %20s\n" + "%-64s %9s %10s %11s %10s %12s %21s %21s %21s %21s %10s %20s\n", + "slot_name", "slot_type", "datoid", "persistency", "xmin", "catalog_xmin", "restart_lsn", "invalidated_at", "confirmed_flush", "two_phase_at", "two_phase", "plugin", + "---------", "---------", "------", "-----------", "----", "------------", "-----------", "--------------", "---------------", "------------", "---------", "------"); + + while (errno = 0, (rsde = readdir(rsdir)) != NULL) + { + struct stat statbuf; + char path[MAXPGPATH]; + + if (strcmp(rsde->d_name, ".") == 0 || + strcmp(rsde->d_name, "..") == 0) + continue; + + snprintf(path, sizeof(path), "pg_replslot/%s", rsde->d_name); + + /* we're only creating directories here, skip if it's not our's */ + if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode)) + continue; + + /* we crashed while a slot was being setup or deleted, clean up */ + if (pg_str_endswith(rsde->d_name, ".tmp")) + { + pg_log_warning("server was crashed while the slot \"%s\" was being setup or deleted", + rsde->d_name); + continue; + } + + /* looks like a slot in a normal state, restore */ + read_and_display_repl_slot(rsde->d_name); + cnt++; + } + + if (errno) + { + pg_log_error("could not read directory \"%s\": %m", "pg_replslot"); + exit(1); + } + + if (cnt == 0) + { + pg_log_info("no replication slots were found"); + exit(0); + } + + if (closedir(rsdir)) + { + pg_log_error("could not close directory \"%s\": %m", "pg_replslot"); + exit(1); + } +} + +static void +read_and_display_repl_slot(const char *name) +{ + ReplicationSlotOnDisk cp; + char slotdir[MAXPGPATH]; + char path[MAXPGPATH]; + char restart_lsn[NAMEDATALEN]; + char confirmed_flush[NAMEDATALEN]; + char two_phase_at[NAMEDATALEN]; + char persistency[NAMEDATALEN]; + int fd; + int readBytes; + pg_crc32c checksum; + + /* delete temp file if it exists */ + sprintf(slotdir, "pg_replslot/%s", name); + sprintf(path, "%s/state.tmp", slotdir); + + fd = open(path, O_RDONLY | PG_BINARY, 0); + + if (fd > 0) + { + pg_log_error("found temporary state file \"%s\": %m", path); + exit(1); + } + + sprintf(path, "%s/state", slotdir); + + if (verbose) + pg_log_info("reading replication slot from \"%s\"", path); + + fd = open(path, O_RDONLY | PG_BINARY, 0); + + /* + * We do not need to handle this as we are rename()ing the directory into + * place only after we fsync()ed the state file. + */ + if (fd < 0) + { + pg_log_error("could not open file \"%s\": %m", path); + exit(1); + } + + if (verbose) + pg_log_info("reading version independent replication slot state file"); + + /* read part of statefile that's guaranteed to be version independent */ + readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize); + if (readBytes != ReplicationSlotOnDiskConstantSize) + { + if (readBytes < 0) + { + pg_log_error("could not read file \"%s\": %m", path); + exit(1); + } + else + { + pg_log_error("could not read file \"%s\": read %d of %zu", + path, readBytes, + (Size) ReplicationSlotOnDiskConstantSize); + exit(1); + } + } + + /* verify magic */ + if (cp.magic != SLOT_MAGIC) + { + pg_log_error("replication slot file \"%s\" has wrong magic number: %u instead of %u", + path, cp.magic, SLOT_MAGIC); + exit(1); + } + + /* verify version */ + if (cp.version != SLOT_VERSION) + { + pg_log_error("replication slot file \"%s\" has unsupported version %u", + path, cp.version); + exit(1); + } + + /* boundary check on length */ + if (cp.length != ReplicationSlotOnDiskV2Size) + { + pg_log_error("replication slot file \"%s\" has corrupted length %u", + path, cp.length); + exit(1); + } + + if (verbose) + pg_log_info("reading the entire replication slot state file"); + + /* now that we know the size, read the entire file */ + readBytes = read(fd, + (char *) &cp + ReplicationSlotOnDiskConstantSize, + cp.length); + if (readBytes != cp.length) + { + if (readBytes < 0) + { + pg_log_error("could not read file \"%s\": %m", path); + exit(1); + } + else + { + pg_log_error("could not read file \"%s\": read %d of %zu", + path, readBytes, (Size) cp.length); + exit(1); + } + } + + if (close(fd) != 0) + { + pg_log_error("could not close file \"%s\": %m", path); + exit(1); + } + + /* now verify the CRC */ + INIT_CRC32C(checksum); + COMP_CRC32C(checksum, + (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize, + ReplicationSlotOnDiskChecksummedSize); + FIN_CRC32C(checksum); + + if (!EQ_CRC32C(checksum, cp.checksum)) + { + pg_log_error("checksum mismatch for replication slot file \"%s\": is %u, should be %u", + path, checksum, cp.checksum); + exit(1); + } + + sprintf(restart_lsn, "%X/%X", LSN_FORMAT_ARGS(cp.slotdata.restart_lsn)); + sprintf(confirmed_flush, "%X/%X", LSN_FORMAT_ARGS(cp.slotdata.confirmed_flush)); + sprintf(two_phase_at, "%X/%X", LSN_FORMAT_ARGS(cp.slotdata.two_phase_at)); + + if (cp.slotdata.persistency == RS_PERSISTENT) + sprintf(persistency, "persistent"); + else if (cp.slotdata.persistency == RS_EPHEMERAL) + sprintf(persistency, "ephemeral"); + else if (cp.slotdata.persistency == RS_TEMPORARY) + sprintf(persistency, "temporary"); + + /* display the slot information */ + printf("%-64s %9s %10u %11s %10u %12u %21s %21s %21s %10d %20s\n", + NameStr(cp.slotdata.name), + cp.slotdata.database == InvalidOid ? "physical" : "logical", + cp.slotdata.database, + persistency, + cp.slotdata.xmin, + cp.slotdata.catalog_xmin, + restart_lsn, + confirmed_flush, + two_phase_at, + cp.slotdata.two_phase, + NameStr(cp.slotdata.plugin)); +} + +int +main(int argc, char *argv[]) +{ + static struct option long_options[] = { + {"pgdata", required_argument, NULL, 'D'}, + {"verbose", no_argument, NULL, 'v'}, + {NULL, 0, NULL, 0} + }; + + char *DataDir = NULL; + const char *progname; + int c; + + pg_logging_init(argv[0]); + set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_replslotdata")); + progname = get_progname(argv[0]); + + if (argc > 1) + { + if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) + { + usage(progname); + exit(0); + } + if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0) + { + puts("pg_replslotdata (PostgreSQL) " PG_VERSION); + exit(0); + } + } + + while ((c = getopt_long(argc, argv, "D:v", long_options, NULL)) != -1) + { + switch (c) + { + case 'D': + DataDir = optarg; + break; + case 'v': + verbose = true; + break; + default: + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); + exit(1); + } + } + + if (DataDir == NULL) + { + if (optind < argc) + DataDir = argv[optind++]; + else + DataDir = getenv("PGDATA"); + } + + /* complain if any arguments remain */ + if (optind < argc) + { + pg_log_error("too many command-line arguments (first is \"%s\")", + argv[optind]); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + if (DataDir == NULL) + { + pg_log_error("no data directory specified"); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); + exit(1); + } + + if (verbose) + pg_log_info("data directory: \"%s\"", DataDir); + + if (chdir(DataDir) < 0) + { + pg_log_error("could not change directory to \"%s\": %m", + DataDir); + exit(1); + } + + process_replslots(); + + return 0; +} diff --git a/src/bin/pg_replslotdata/t/001_basic.pl b/src/bin/pg_replslotdata/t/001_basic.pl new file mode 100644 index 0000000000..d6830dc2ac --- /dev/null +++ b/src/bin/pg_replslotdata/t/001_basic.pl @@ -0,0 +1,11 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +use strict; +use warnings; +use PostgreSQL::Test::Utils; +use Test::More tests => 8; + +program_help_ok('pg_replslotdata'); +program_version_ok('pg_replslotdata'); +program_options_handling_ok('pg_replslotdata'); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index a8a89dc784..a7d16a37a3 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -15,104 +15,9 @@ #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/spin.h" +#include "replication/slot_common.h" #include "replication/walreceiver.h" -/* - * Behaviour of replication slots, upon release or crash. - * - * Slots marked as PERSISTENT are crash-safe and will not be dropped when - * released. Slots marked as EPHEMERAL will be dropped when released or after - * restarts. Slots marked TEMPORARY will be dropped at the end of a session - * or on error. - * - * EPHEMERAL is used as a not-quite-ready state when creating persistent - * slots. EPHEMERAL slots can be made PERSISTENT by calling - * ReplicationSlotPersist(). For a slot that goes away at the end of a - * session, TEMPORARY is the appropriate choice. - */ -typedef enum ReplicationSlotPersistency -{ - RS_PERSISTENT, - RS_EPHEMERAL, - RS_TEMPORARY -} ReplicationSlotPersistency; - -/* - * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the - * 'invalidated' field is set to a value other than _NONE. - */ -typedef enum ReplicationSlotInvalidationCause -{ - RS_INVAL_NONE, - /* required WAL has been removed */ - RS_INVAL_WAL_REMOVED, - /* required rows have been removed */ - RS_INVAL_HORIZON, - /* wal_level insufficient for slot */ - RS_INVAL_WAL_LEVEL, -} ReplicationSlotInvalidationCause; - -/* - * On-Disk data of a replication slot, preserved across restarts. - */ -typedef struct ReplicationSlotPersistentData -{ - /* The slot's identifier */ - NameData name; - - /* database the slot is active on */ - Oid database; - - /* - * The slot's behaviour when being dropped (or restored after a crash). - */ - ReplicationSlotPersistency persistency; - - /* - * xmin horizon for data - * - * NB: This may represent a value that hasn't been written to disk yet; - * see notes for effective_xmin, below. - */ - TransactionId xmin; - - /* - * xmin horizon for catalog tuples - * - * NB: This may represent a value that hasn't been written to disk yet; - * see notes for effective_xmin, below. - */ - TransactionId catalog_xmin; - - /* oldest LSN that might be required by this replication slot */ - XLogRecPtr restart_lsn; - - /* RS_INVAL_NONE if valid, or the reason for having been invalidated */ - ReplicationSlotInvalidationCause invalidated; - - /* - * Oldest LSN that the client has acked receipt for. This is used as the - * start_lsn point in case the client doesn't specify one, and also as a - * safety measure to jump forwards in case the client specifies a - * start_lsn that's further in the past than this value. - */ - XLogRecPtr confirmed_flush; - - /* - * LSN at which we enabled two_phase commit for this slot or LSN at which - * we found a consistent point at the time of slot creation. - */ - XLogRecPtr two_phase_at; - - /* - * Allow decoding of prepared transactions? - */ - bool two_phase; - - /* plugin name */ - NameData plugin; -} ReplicationSlotPersistentData; - /* * Shared memory state of a single replication slot. * diff --git a/src/include/replication/slot_common.h b/src/include/replication/slot_common.h new file mode 100644 index 0000000000..ff4556ff22 --- /dev/null +++ b/src/include/replication/slot_common.h @@ -0,0 +1,147 @@ +/*------------------------------------------------------------------------- + * slot_common.h + * Replication slot management. + * + * Copyright (c) 2021, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef SLOT_COMMON_H +#define SLOT_COMMON_H + +/* + * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the + * 'invalidated' field is set to a value other than _NONE. + */ +typedef enum ReplicationSlotInvalidationCause +{ + RS_INVAL_NONE, + /* required WAL has been removed */ + RS_INVAL_WAL_REMOVED, + /* required rows have been removed */ + RS_INVAL_HORIZON, + /* wal_level insufficient for slot */ + RS_INVAL_WAL_LEVEL, +} ReplicationSlotInvalidationCause; + +/* + * Behaviour of replication slots, upon release or crash. + * + * Slots marked as PERSISTENT are crash-safe and will not be dropped when + * released. Slots marked as EPHEMERAL will be dropped when released or after + * restarts. Slots marked TEMPORARY will be dropped at the end of a session + * or on error. + * + * EPHEMERAL is used as a not-quite-ready state when creating persistent + * slots. EPHEMERAL slots can be made PERSISTENT by calling + * ReplicationSlotPersist(). For a slot that goes away at the end of a + * session, TEMPORARY is the appropriate choice. + */ +typedef enum ReplicationSlotPersistency +{ + RS_PERSISTENT, + RS_EPHEMERAL, + RS_TEMPORARY +} ReplicationSlotPersistency; + +/* + * On-Disk data of a replication slot, preserved across restarts. + */ +typedef struct ReplicationSlotPersistentData +{ + /* The slot's identifier */ + NameData name; + + /* database the slot is active on */ + Oid database; + + /* + * The slot's behaviour when being dropped (or restored after a crash). + */ + ReplicationSlotPersistency persistency; + + /* + * xmin horizon for data + * + * NB: This may represent a value that hasn't been written to disk yet; + * see notes for effective_xmin, below. + */ + TransactionId xmin; + + /* + * xmin horizon for catalog tuples + * + * NB: This may represent a value that hasn't been written to disk yet; + * see notes for effective_xmin, below. + */ + TransactionId catalog_xmin; + + /* oldest LSN that might be required by this replication slot */ + XLogRecPtr restart_lsn; + + /* RS_INVAL_NONE if valid, or the reason for having been invalidated */ + ReplicationSlotInvalidationCause invalidated; + + /* + * Oldest LSN that the client has acked receipt for. This is used as the + * start_lsn point in case the client doesn't specify one, and also as a + * safety measure to jump forwards in case the client specifies a + * start_lsn that's further in the past than this value. + */ + XLogRecPtr confirmed_flush; + + /* + * LSN at which we enabled two_phase commit for this slot or LSN at which + * we found a consistent point at the time of slot creation. + */ + XLogRecPtr two_phase_at; + + /* + * Allow decoding of prepared transactions? + */ + bool two_phase; + + /* plugin name */ + NameData plugin; +} ReplicationSlotPersistentData; + +/* + * Replication slot on-disk data structure. + */ +typedef struct ReplicationSlotOnDisk +{ + /* first part of this struct needs to be version independent */ + + /* data not covered by checksum */ + uint32 magic; + pg_crc32c checksum; + + /* data covered by checksum */ + uint32 version; + uint32 length; + + /* + * The actual data in the slot that follows can differ based on the above + * 'version'. + */ + + ReplicationSlotPersistentData slotdata; +} ReplicationSlotOnDisk; + +/* size of version independent data */ +#define ReplicationSlotOnDiskConstantSize \ + offsetof(ReplicationSlotOnDisk, slotdata) +/* size of the part of the slot not covered by the checksum */ +#define ReplicationSlotOnDiskNotChecksummedSize \ + offsetof(ReplicationSlotOnDisk, version) +/* size of the part covered by the checksum */ +#define ReplicationSlotOnDiskChecksummedSize \ + sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize +/* size of the slot data that is version dependent */ +#define ReplicationSlotOnDiskV2Size \ + sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize + +#define SLOT_MAGIC 0x1051CA1 /* format identifier */ +#define SLOT_VERSION 3 /* version for new files */ + +#endif /* SLOT_COMMON_H */ -- 2.34.1