This patch is looking pretty good to me, modulo the failing pg_dump tests. Attached is a fixup patch. I have mainly updated some comments and variable naming for (my) clarity. No functional changes.
-- Peter Eisentraut http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
From 04f772f350773cce9890386c4a5924ee251ebbe6 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut <peter_e@gmx.net> Date: Tue, 21 Mar 2017 15:38:39 -0400 Subject: [PATCH] fixup! Logical replication support for initial data copy --- src/backend/catalog/pg_subscription.c | 6 +- src/backend/commands/subscriptioncmds.c | 2 +- .../libpqwalreceiver/libpqwalreceiver.c | 9 +- src/backend/replication/logical/tablesync.c | 190 +++++++++++---------- src/backend/replication/logical/worker.c | 22 +-- src/include/replication/logical.h | 2 - src/include/replication/worker_internal.h | 8 +- src/test/regress/expected/subscription.out | 2 - src/test/regress/sql/subscription.sql | 2 - src/test/subscription/t/001_rep_changes.pl | 6 +- src/test/subscription/t/004_sync.pl | 2 +- 11 files changed, 127 insertions(+), 124 deletions(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 9b74892548..e420ec14d2 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -321,10 +321,8 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, return SUBREL_STATE_UNKNOWN; } - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("subscription table %u in subscription %u does not exist", - relid, subid))); + elog(ERROR, "subscription table %u in subscription %u does not exist", + relid, subid); } /* Get the state. */ diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index cba2d5c085..0784ca7951 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -992,7 +992,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) (errmsg("could not receive list of replicated tables from the publisher: %s", res->err))); - /* Proccess tables. */ + /* Process tables. */ slot = MakeSingleTupleTableSlot(res->tupledesc); while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 3176182523..4dd8eef1f9 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -792,13 +792,12 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, * Convert tuple query result to tuplestore. */ static void -libpqrcv_proccessTuples(PGresult *pgres, WalRcvExecResult *walres, +libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, const int nRetTypes, const Oid *retTypes) { int tupn; int coln; int nfields = PQnfields(pgres); - char *cstrs[MaxTupleAttributeNumber]; HeapTuple tuple; AttInMetadata *attinmeta; MemoryContext rowcontext; @@ -830,9 +829,11 @@ libpqrcv_proccessTuples(PGresult *pgres, WalRcvExecResult *walres, "libpqrcv query result context", ALLOCSET_DEFAULT_SIZES); - /* Proccess returned rows. */ + /* Process returned rows. */ for (tupn = 0; tupn < PQntuples(pgres); tupn++) { + char *cstrs[MaxTupleAttributeNumber]; + CHECK_FOR_INTERRUPTS(); /* Do the allocations in temporary context. */ @@ -885,7 +886,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, case PGRES_SINGLE_TUPLE: case PGRES_TUPLES_OK: walres->status = WALRCV_OK_TUPLES; - libpqrcv_proccessTuples(pgres, walres, nRetTypes, retTypes); + libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes); break; case PGRES_COPY_IN: diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 6c67a5ea9f..3e16b0d576 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -36,7 +36,7 @@ * - if the apply is in front of the sync in the wal stream the new * state is set to CATCHUP and apply loops until the sync process * catches up to the same LSN as apply - * - if the sync if in front of the apply in the wal stream the new + * - if the sync is in front of the apply in the wal stream the new * state is set to SYNCDONE * - if both apply and sync are at the same position in the wal stream * the state of the table is set to READY @@ -104,7 +104,6 @@ #include "utils/lsyscache.h" #include "utils/memutils.h" -static List *table_states = NIL; static bool table_states_valid = false; StringInfo copybuf = NULL; @@ -112,10 +111,10 @@ StringInfo copybuf = NULL; /* * Exit routine for synchronization worker. */ -static void -finish_sync_worker(char *slotname) +static void pg_attribute_noreturn() +finish_sync_worker(void) { - /* Commit any outstanding trasnsaction, */ + /* Commit any outstanding transaction. */ if (IsTransactionState()) CommitTransactionCommand(); @@ -193,41 +192,37 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) * worker. * * If the sync worker is in catch up mode and reached the predetermined - * synchronization point in wal stream, it will mark the table as ready and - * finish. + * synchronization point in the WAL stream, mark the table as READY and + * finish. If it caught up too far, set to SYNCDONE and finish. Things will + * then proceed in the "sync in front" scenario. */ static void -process_syncing_tables_for_sync(char *slotname, XLogRecPtr end_lsn) +process_syncing_tables_for_sync(XLogRecPtr current_lsn) { - TimeLineID tli; - Assert(IsTransactionState()); - /* - * Synchronization worker has catched up with apply. Update the table - * state and finish. - */ SpinLockAcquire(&MyLogicalRepWorker->relmutex); if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP && - end_lsn >= MyLogicalRepWorker->relstate_lsn) + current_lsn >= MyLogicalRepWorker->relstate_lsn) { + TimeLineID tli; + MyLogicalRepWorker->relstate = - (end_lsn == MyLogicalRepWorker->relstate_lsn) + (current_lsn == MyLogicalRepWorker->relstate_lsn) ? SUBREL_STATE_READY : SUBREL_STATE_SYNCDONE; - MyLogicalRepWorker->relstate_lsn = end_lsn; + MyLogicalRepWorker->relstate_lsn = current_lsn; SpinLockRelease(&MyLogicalRepWorker->relmutex); - /* The synchronization is done so write it into catalog. */ SetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); walrcv_endstreaming(wrconn, &tli); - finish_sync_worker(slotname); + finish_sync_worker(); } else SpinLockRelease(&MyLogicalRepWorker->relmutex); @@ -236,28 +231,31 @@ process_syncing_tables_for_sync(char *slotname, XLogRecPtr end_lsn) /* * Handle table synchronization cooperation from the apply worker. * - * Walk over all subscription tables that are individually tracked by apply - * process (currently all that have state other than SUBREL_STATE_READY) and - * manage synchronization for them. + * Walk over all subscription tables that are individually tracked by the + * apply process (currently, all that have state other than + * SUBREL_STATE_READY) and manage synchronization for them. * - * In case there are tables that need synchronized and are not being - * synchronized yet (and there are free slots for sync workers) it will start - * sync workers for them. + * If there are tables that need synchronizing and are not being synchronized + * yet, start sync workers for them (if there are free slots for sync + * workers). * - * For tables that are being synchronized already, it will check if sync - * workers either need action from the apply worker or have finished. + * For tables that are being synchronized already, check if sync workers + * either need action from the apply worker or have finished. * - * The usual action needed by apply is to mark table for catchup and wait for - * the catchup to happen. In case that sync worker got in front of apply - * worker it will mark the table as synced but not ready yet as it needs to be - * tracked until apply reaches the same position to which it was synced. + * The usual scenario is that the apply got ahead of the sync while the sync + * ran, and then the action needed by apply is to mark a table for CATCHUP and + * wait for the catchup to happen. In the less common case that sync worker + * got in front of the apply worker, the table is marked as SYNCDONE but not + * ready yet, as it needs to be tracked until apply reaches the same position + * to which it was synced. * - * In case the synchronization position is reached the table can be marked - * as ready and no longer tracked. + * If the synchronization position is reached, then the table can be marked as + * READY and is no longer tracked. */ static void -process_syncing_tables_for_apply(char *slotname, XLogRecPtr end_lsn) +process_syncing_tables_for_apply(XLogRecPtr current_lsn) { + static List *table_states = NIL; ListCell *lc; Assert(!IsTransactionState()); @@ -303,82 +301,85 @@ process_syncing_tables_for_apply(char *slotname, XLogRecPtr end_lsn) { /* * Apply has caught up to the position where the table sync - * has finished, time to mark the table as ready so that + * has finished. Time to mark the table as ready so that * apply will just continue to replicate it normally. */ - if (end_lsn >= rstate->lsn) + if (current_lsn >= rstate->lsn) { rstate->state = SUBREL_STATE_READY; - rstate->lsn = end_lsn; + rstate->lsn = current_lsn; StartTransactionCommand(); SetSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, rstate->lsn); CommitTransactionCommand(); } - - continue; } else { - LogicalRepWorker *worker; - int nworkers = 0; + LogicalRepWorker *syncworker; + int nsyncworkers = 0; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(MyLogicalRepWorker->subid, - rstate->relid, false); - if (worker) + syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, + rstate->relid, false); + if (syncworker) { - SpinLockAcquire(&worker->relmutex); - rstate->state = worker->relstate; - rstate->lsn = worker->relstate_lsn; - SpinLockRelease(&worker->relmutex); + SpinLockAcquire(&syncworker->relmutex); + rstate->state = syncworker->relstate; + rstate->lsn = syncworker->relstate_lsn; + SpinLockRelease(&syncworker->relmutex); } else - nworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + /* + * If no sync worker for this table yet, could running sync + * workers for this subscription, while we have the lock, for + * later. + */ + nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); LWLockRelease(LogicalRepWorkerLock); /* * There is a worker synchronizing the relation and waiting for * apply to do something. */ - if (worker && rstate->state == SUBREL_STATE_SYNCWAIT) + if (syncworker && rstate->state == SUBREL_STATE_SYNCWAIT) { /* - * When the synchronization process is at the catchup phase. - * * There are three possible synchronization situations here. - * a) Apply is infront of the table sync, in this case we - * tell the table sync to catch up. - * b) Apply is behind the table sync, in that case we tell - * the table sync to mark the table as syncdone and finish. - * c) Apply and table sync are at the same position, in which - * case we tell table sync to mark the table as ready and - * finish. * - * In any case we'll need to wait for tablesync to change + * a) Apply is in front of the table sync: We tell the table + * sync to CATCHUP. + * + * b) Apply is behind the table sync: We tell the table sync + * to mark the table as SYNCDONE and finish. + + * c) Apply and table sync are at the same position: We tell + * table sync to mark the table as READY and finish. + * + * In any case we'll need to wait for table sync to change * the state in catalog and only then continue ourselves. */ - if (end_lsn > rstate->lsn) + if (current_lsn > rstate->lsn) { rstate->state = SUBREL_STATE_CATCHUP; - rstate->lsn = end_lsn; + rstate->lsn = current_lsn; } - else if (end_lsn == rstate->lsn) + else if (current_lsn == rstate->lsn) { rstate->state = SUBREL_STATE_READY; - rstate->lsn = end_lsn; + rstate->lsn = current_lsn; } else rstate->state = SUBREL_STATE_SYNCDONE; - SpinLockAcquire(&worker->relmutex); - worker->relstate = rstate->state; - worker->relstate_lsn = rstate->lsn; - SpinLockRelease(&worker->relmutex); + SpinLockAcquire(&syncworker->relmutex); + syncworker->relstate = rstate->state; + syncworker->relstate_lsn = rstate->lsn; + SpinLockRelease(&syncworker->relmutex); - /* Signal the worker as it may be waiting for us. */ - logicalrep_worker_wakeup_ptr(worker); + /* Signal the sync worker, as it may be waiting for us. */ + logicalrep_worker_wakeup_ptr(syncworker); /* * Enter busy loop and wait for synchronization status @@ -392,7 +393,7 @@ process_syncing_tables_for_apply(char *slotname, XLogRecPtr end_lsn) * there is some free sync worker slot, start new sync worker * for the table. */ - else if (!worker && nworkers < max_sync_workers_per_subscription) + else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription) { logicalrep_worker_launch(MyLogicalRepWorker->dbid, MySubscription->oid, @@ -408,12 +409,12 @@ process_syncing_tables_for_apply(char *slotname, XLogRecPtr end_lsn) * Process state possible change(s) of tables that are being synchronized. */ void -process_syncing_tables(char *slotname, XLogRecPtr end_lsn) +process_syncing_tables(XLogRecPtr current_lsn) { if (am_tablesync_worker()) - process_syncing_tables_for_sync(slotname, end_lsn); + process_syncing_tables_for_sync(current_lsn); else - process_syncing_tables_for_apply(slotname, end_lsn); + process_syncing_tables_for_apply(current_lsn); } /* @@ -446,8 +447,8 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel) } /* - * Callback for the COPY FROM which reads from the remote connection - * and passes the data back to our local COPY. + * Data source callback for the COPY FROM, which reads from the remote + * connection and passes the data back to our local COPY. */ static int copy_read_data(void *outbuf, int minread, int maxread) @@ -482,13 +483,9 @@ copy_read_data(void *outbuf, int minread, int maxread) CHECK_FOR_INTERRUPTS(); if (len == 0) - { break; - } else if (len < 0) - { return bytesread; - } else { /* Process the data */ @@ -552,7 +549,7 @@ fetch_remote_table_info(char *nspname, char *relname, lrel->nspname = nspname; lrel->relname = relname; - /* First fetch Oid and replication identity. */ + /* First fetch Oid and replica identity. */ initStringInfo(&cmd); appendStringInfo(&cmd, "SELECT c.oid, c.relreplident" " FROM pg_catalog.pg_class c," @@ -605,7 +602,7 @@ fetch_remote_table_info(char *nspname, char *relname, (errmsg("could not fetch table info for table \"%s.%s\": %s", nspname, relname, res->err))); - /* We don't know number of rows infront so allocate enough space. */ + /* We don't know number of rows coming, so allocate enough space. */ lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *)); lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); lrel->attkeys = NULL; @@ -695,7 +692,7 @@ copy_table(Relation rel) char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) { - char slotname[NAMEDATALEN]; + char *slotname; char *err; /* Check the state of the table synchronization. */ @@ -710,14 +707,18 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) CommitTransactionCommand(); /* - * We are limited to 63 characters of the name length so we cut the - * original slot name to 36 chars because the "_sync_" adds 6, each - * each unsigned integer (oid) has maximum of 10 characters and we have - * one additional "_" separator between slot name and subscription oid. + * To build a slot name for the sync work, we are limited to NAMEDATALEN - + * 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars + * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the + * NAMEDATALEN on the remote that matters, but this scheme will also work + * reasonably if that is different.) */ - snprintf(slotname, NAMEDATALEN, "%.36s_%u_sync_%u", - MySubscription->slotname, MySubscription->oid, - MyLogicalRepWorker->relid); + StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */ + slotname = psprintf("%.*s_%u_sync_%u", + NAMEDATALEN - 28, + MySubscription->slotname, + MySubscription->oid, + MyLogicalRepWorker->relid); wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err); if (wrconn == NULL) @@ -787,7 +788,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) copy_table(rel); - res = walrcv_exec(wrconn, "ROLLBACK", 0, NULL); + res = walrcv_exec(wrconn, "COMMIT", 0, NULL); if (res->status != WALRCV_OK_COMMAND) ereport(ERROR, (errmsg("table copy could not finish transaction on publisher"), @@ -821,18 +822,19 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); - finish_sync_worker(slotname); + finish_sync_worker(); } break; } case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: /* Nothing to do here but finish. */ - finish_sync_worker(slotname); + finish_sync_worker(); + break; default: elog(ERROR, "unknown relation state \"%c\"", MyLogicalRepWorker->relstate); } - return pstrdup(slotname); + return slotname; } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 2396d24b43..bbf3506be0 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -109,7 +109,6 @@ WalReceiverConn *wrconn = NULL; Subscription *MySubscription = NULL; bool MySubscriptionValid = false; -static char *myslotname = NULL; bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; @@ -129,18 +128,18 @@ static void reread_subscription(void); * Note we need to do smaller or equals comparison for SYNCDONE state because * it might hold position of end of intitial slot consistent point WAL * record + 1 (ie start of next record) and next record can be COMMIT of - * transaction we are now proccessing (which is what we set remote_final_lsn + * transaction we are now processing (which is what we set remote_final_lsn * to in apply_handle_begin). */ static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) { - return (!am_tablesync_worker() && - (rel->state == SUBREL_STATE_READY || - (rel->state == SUBREL_STATE_SYNCDONE && - rel->statelsn <= remote_final_lsn))) || - (am_tablesync_worker() && - MyLogicalRepWorker->relid == rel->localreloid); + if (am_tablesync_worker()) + return MyLogicalRepWorker->relid == rel->localreloid; + else + return (rel->state == SUBREL_STATE_READY || + (rel->state == SUBREL_STATE_SYNCDONE && + rel->statelsn <= remote_final_lsn)); } /* @@ -459,8 +458,8 @@ apply_handle_commit(StringInfo s) in_remote_transaction = false; - /* Proccess any tables that are being synchronized in parallel. */ - process_syncing_tables(myslotname, commit_data.end_lsn); + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); } @@ -1110,7 +1109,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) reread_subscription(); /* Process any table synchronization changes. */ - process_syncing_tables(myslotname, last_received); + process_syncing_tables(last_received); } /* Cleanup the memory. */ @@ -1420,6 +1419,7 @@ ApplyWorkerMain(Datum main_arg) MemoryContext oldctx; char originname[NAMEDATALEN]; XLogRecPtr origin_startpos; + char *myslotname; WalRcvStreamOptions options; /* Attach to slot */ diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index a5088c4cba..d10dd2c90a 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -99,8 +99,6 @@ extern LogicalDecodingContext *CreateCopyDecodingContext( List *output_plugin_options, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write); -extern void DecodingContextProccessTuple(LogicalDecodingContext *ctx, - Relation rel, HeapTuple tup); extern List *DecodingContextGetTableList(LogicalDecodingContext *ctx); extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index f642b1c6dc..bf96d340ca 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -71,10 +71,14 @@ extern int logicalrep_sync_worker_count(Oid subid); extern void logicalrep_worker_sigterm(SIGNAL_ARGS); extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); -void process_syncing_tables(char *slotname, XLogRecPtr end_lsn); +void process_syncing_tables(XLogRecPtr current_lsn); void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue); -#define am_tablesync_worker() OidIsValid(MyLogicalRepWorker->relid) +static inline bool +am_tablesync_worker(void) +{ + return OidIsValid(MyLogicalRepWorker->relid); +} #endif /* WORKER_INTERNAL_H */ diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index d8dc55a129..0912bef657 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -14,7 +14,6 @@ CREATE SUBSCRIPTION testsub PUBLICATION foo; ERROR: syntax error at or near "PUBLICATION" LINE 1: CREATE SUBSCRIPTION testsub PUBLICATION foo; ^ -set client_min_messages to error; -- fail - cannot do CREATE SUBSCRIPTION CREATE SLOT inside transaction block BEGIN; CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub WITH (CREATE SLOT); @@ -23,7 +22,6 @@ COMMIT; CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub; ERROR: invalid connection string syntax: missing "=" after "testconn" in connection info string -reset client_min_messages; CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (NOCONNECT); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 62c99d8b20..c1199ee629 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -12,14 +12,12 @@ CREATE SUBSCRIPTION testsub CONNECTION 'foo'; -- fail - no connection CREATE SUBSCRIPTION testsub PUBLICATION foo; -set client_min_messages to error; -- fail - cannot do CREATE SUBSCRIPTION CREATE SLOT inside transaction block BEGIN; CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub WITH (CREATE SLOT); COMMIT; CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub; -reset client_min_messages; CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (NOCONNECT); diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 23eb39d666..d1817f57da 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -3,7 +3,7 @@ use warnings; use PostgresNode; use TestLib; -use Test::More tests => 13; +use Test::More tests => 14; # Initialize publisher node my $node_publisher = get_new_node('publisher'); @@ -209,5 +209,9 @@ $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); is($result, qq(0), 'check replication slot was dropped on publisher'); +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber'); + $node_subscriber->stop('fast'); $node_publisher->stop('fast'); diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl index 98cdf08b48..87541a0e6e 100644 --- a/src/test/subscription/t/004_sync.pl +++ b/src/test/subscription/t/004_sync.pl @@ -1,4 +1,4 @@ -# Basic logical replication test +# Tests for logical replication table syncing use strict; use warnings; use PostgresNode; -- 2.12.0
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers