On Tue, 1 Aug 2023 at 09:44, Peter Smith <smithpb2...@gmail.com> wrote: > > On Fri, Jul 28, 2023 at 5:22 PM Peter Smith <smithpb2...@gmail.com> wrote: > > > > Hi Melih, > > > > BACKGROUND > > ---------- > > > > We wanted to compare performance for the 2 different reuse-worker > > designs, when the apply worker is already busy handling other > > replications, and then simultaneously the test table tablesyncs are > > occurring. > > > > To test this scenario, some test scripts were written (described > > below). For comparisons, the scripts were then run using a build of > > HEAD; design #1 (v21); design #2 (0718). > > > > HOW THE TEST WORKS > > ------------------ > > > > Overview: > > 1. The apply worker is made to subscribe to a 'busy_tbl'. > > 2. After the SUBSCRIPTION is created, the publisher-side then loops > > (forever) doing INSERTS into that busy_tbl. > > 3. While the apply worker is now busy, the subscriber does an ALTER > > SUBSCRIPTION REFRESH PUBLICATION to subscribe to all the other test > > tables. > > 4. We time how long it takes for all tablsyncs to complete > > 5. Repeat above for different numbers of empty tables (10, 100, 1000, > > 2000) and different numbers of sync workers (2, 4, 8, 16) > > > > Scripts > > ------- > > > > (PSA 4 scripts to implement this logic) > > > > testrun script > > - this does common setup (do_one_test_setup) and then the pub/sub > > scripts (do_one_test_PUB and do_one_test_SUB -- see below) are run in > > parallel > > - repeat 10 times > > > > do_one_test_setup script > > - init and start instances > > - ipc setup tables and procedures > > > > do_one_test_PUB script > > - ipc setup pub/sub > > - table setup > > - publishes the "busy_tbl", but then waits for the subscriber to > > subscribe to only this one > > - alters the publication to include all other tables (so subscriber > > will see these only after the ALTER SUBSCRIPTION PUBLICATION REFRESH) > > - enter a busy INSERT loop until it informed by the subscriber that > > the test is finished > > > > do_one_test_SUB script > > - ipc setup pub/sub > > - table setup > > - subscribes only to "busy_tbl", then informs the publisher when that > > is done (this will cause the publisher to commence the stay_busy loop) > > - after it knows the publishing busy loop has started it does > > - ALTER SUBSCRIPTION REFRESH PUBLICATION > > - wait until all the tablesyncs are ready <=== This is the part that > > is timed for the test RESULT > > > > PROBLEM > > ------- > > > > Looking at the output files (e.g. *.dat_PUB and *.dat_SUB) they seem > > to confirm the tests are working how we wanted. > > > > Unfortunately, there is some slot problem for the patched builds (both > > designs #1 and #2). e.g. Search "ERROR" in the *.log files and see > > many slot-related errors. > > > > Please note - running these same scripts with HEAD build gave no such > > errors. So it appears to be a patch problem. > > > > Hi > > FYI, here is some more information about ERRORs seen. > > The patches were re-tested -- applied in stages (and also against the > different scripts) to identify where the problem was introduced. Below > are the observations: > > ~~~ > > Using original test scripts > > 1. Using only patch v21-0001 > - no errors > > 2. Using only patch v21-0001+0002 > - no errors > > 3. Using patch v21-0001+0002+0003 > - no errors > > ~~~ > > Using the "busy loop" test scripts for long transactions > > 1. Using only patch v21-0001 > - no errors > > 2. Using only patch v21-0001+0002 > - gives errors for "no copy in progress issue" > e.g. ERROR: could not send data to WAL stream: no COPY in progress > > 3. Using patch v21-0001+0002+0003 > - gives the same "no copy in progress issue" errors as above > e.g. ERROR: could not send data to WAL stream: no COPY in progress > - and also gives slot consistency point errors > e.g. ERROR: could not create replication slot > "pg_16700_sync_16514_7261998170966054867": ERROR: could not find > logical decoding starting point > e.g. LOG: could not drop replication slot > "pg_16700_sync_16454_7261998170966054867" on publisher: ERROR: > replication slot "pg_16700_sync_16454_7261998170966054867" does not > exist
I agree that "no copy in progress issue" issue has nothing to do with 0001 patch. This issue is present with the 0002 patch. In the case when the tablesync worker has to apply the transactions after the table is synced, the tablesync worker sends the feedback of writepos, applypos and flushpos which results in "No copy in progress" error as the stream has ended already. Fixed it by exiting the streaming loop if the tablesync worker is done with the synchronization. The attached 0004 patch has the changes for the same. The rest of v22 patches are the same patch that were posted by Melih in the earlier mail. Regards, Vignesh
From bd18bd59be0a263cb3385353e73ec25542bdeff2 Mon Sep 17 00:00:00 2001 From: Melih Mutlu <m.melihmu...@gmail.com> Date: Tue, 4 Jul 2023 22:04:46 +0300 Subject: [PATCH v22 2/3] Reuse Tablesync Workers Before this patch, tablesync workers were capable of syncing only one table. For each table, a new sync worker was launched and that worker would exit when done processing the table. Now, tablesync workers are not limited to processing only one table. When done, they can move to processing another table in the same subscription. If there is a table that needs to be synced, an available tablesync worker picks up that table and syncs it. Each tablesync worker continues to pick new tables to sync until there are no tables left requiring synchronization. If there was no available worker to process the table, then a new tablesync worker will be launched, provided the number of tablesync workers for the subscription does not exceed max_sync_workers_per_subscription. Discussion: http://postgr.es/m/CAGPVpCTq=rudd4judarc1xuwf4brh2gdsnf3rtomugj9rpp...@mail.gmail.com --- src/backend/replication/logical/launcher.c | 1 + src/backend/replication/logical/tablesync.c | 121 ++++++++++++++++++-- src/backend/replication/logical/worker.c | 40 ++++++- src/include/replication/worker_internal.h | 2 + 4 files changed, 149 insertions(+), 15 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index e231fa7f95..25dd06b8af 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -440,6 +440,7 @@ retry: worker->stream_fileset = NULL; worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; + worker->relsync_completed = false; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index ff859e0910..63b5bed88a 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -134,10 +134,12 @@ static StringInfo copybuf = NULL; /* * Exit routine for synchronization worker. + * + * If reuse_worker is false, at the conclusion of this function the worker + * process will exit. */ static void -pg_attribute_noreturn() -finish_sync_worker(void) +finish_sync_worker(bool reuse_worker) { /* * Commit any outstanding transaction. This is the usual case, unless @@ -149,21 +151,42 @@ finish_sync_worker(void) pgstat_report_stat(true); } + /* + * Disconnect from the publisher otherwise reusing the sync worker can + * error due to exceeding max_wal_senders. + */ + if (LogRepWorkerWalRcvConn != NULL) + { + walrcv_disconnect(LogRepWorkerWalRcvConn); + LogRepWorkerWalRcvConn = NULL; + } + /* And flush all writes. */ XLogFlush(GetXLogWriteRecPtr()); StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); + if (reuse_worker) + { + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\" will be reused to sync table \"%s\" with relid %u.", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); + } + else + { + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\" has finished", + MySubscription->name))); + } CommitTransactionCommand(); /* Find the leader apply worker and signal it. */ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); /* Stop gracefully */ - proc_exit(0); + if (!reuse_worker) + proc_exit(0); } /* @@ -383,7 +406,15 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ replorigin_drop_by_name(originname, true, false); - finish_sync_worker(); + /* Sync worker has completed synchronization of the current table. */ + MyLogicalRepWorker->relsync_completed = true; + + ereport(LOG, + (errmsg("logical replication table synchronization for subscription \"%s\", relation \"%s\" with relid %u has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); + CommitTransactionCommand(); } else SpinLockRelease(&MyLogicalRepWorker->relmutex); @@ -1288,7 +1319,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: case SUBREL_STATE_UNKNOWN: - finish_sync_worker(); /* doesn't return */ + finish_sync_worker(false); /* doesn't return */ } /* Calculate the name of the tablesync slot. */ @@ -1646,6 +1677,8 @@ run_tablesync_worker() char *slotname = NULL; WalRcvStreamOptions options; + MyLogicalRepWorker->relsync_completed = false; + start_table_sync(&origin_startpos, &slotname); ReplicationOriginNameForLogicalRep(MySubscription->oid, @@ -1668,12 +1701,78 @@ void TablesyncWorkerMain(Datum main_arg) { int worker_slot = DatumGetInt32(main_arg); + bool done = false; SetupApplyOrSyncWorker(worker_slot); - run_tablesync_worker(); + /* + * The loop where worker does its job. It loops until there is no relation + * left to sync. + */ + for (;!done;) + { + List *rstates; + ListCell *lc; + + run_tablesync_worker(); + + if (IsTransactionState()) + CommitTransactionCommand(); + + if (MyLogicalRepWorker->relsync_completed) + { + /* + * This tablesync worker is 'done' unless another table that needs + * syncing is found. + */ + done = true; + + /* This transaction will be committed by finish_sync_worker. */ + StartTransactionCommand(); + + /* + * Check if there is any table whose relation state is still INIT. + * If a table in INIT state is found, the worker will not be + * finished, it will be reused instead. + */ + rstates = GetSubscriptionRelations(MySubscription->oid, true); + + foreach(lc, rstates) + { + SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + + if (rstate->state == SUBREL_STATE_SYNCDONE) + continue; + + /* + * Take exclusive lock to prevent any other sync worker from + * picking the same table. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + + /* + * Pick the table for the next run if it is not already picked + * up by another worker. + */ + if (!logicalrep_worker_find(MySubscription->oid, rstate->relid, false)) + { + /* Update worker state for the next table */ + MyLogicalRepWorker->relid = rstate->relid; + MyLogicalRepWorker->relstate = rstate->state; + MyLogicalRepWorker->relstate_lsn = rstate->lsn; + LWLockRelease(LogicalRepWorkerLock); + + /* Found a table for next iteration */ + finish_sync_worker(true); + done = false; + break; + } + LWLockRelease(LogicalRepWorkerLock); + } + } + } - finish_sync_worker(); + finish_sync_worker(false); } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 8ed6fa7acf..d25bf5bea2 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3621,6 +3621,23 @@ LogicalRepApplyLoop(XLogRecPtr last_received) MemoryContextReset(ApplyMessageContext); } + if (am_tablesync_worker()) + { + /* + * apply_dispatch() may have gone into apply_handle_commit() + * which can call process_syncing_tables_for_sync. + * + * process_syncing_tables_for_sync decides whether the sync of + * the current table is completed. If it is completed, + * streaming must be already ended. So, we can break the loop. + */ + if (MyLogicalRepWorker->relsync_completed) + { + endofstream = true; + break; + } + } + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); } } @@ -3640,6 +3657,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* Process any table synchronization changes. */ process_syncing_tables(last_received); + + if (am_tablesync_worker()) + /* + * If relsync_completed is true, this means that the tablesync + * worker is done with synchronization. Streaming has already been + * ended by process_syncing_tables_for_sync. We should move to the + * next table if needed, or exit. + */ + if (MyLogicalRepWorker->relsync_completed) + endofstream = true; } /* Cleanup the memory. */ @@ -3742,8 +3769,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received) error_context_stack = errcallback.previous; apply_error_context_stack = error_context_stack; - /* All done */ - walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); + /* + * End streaming here for only apply workers. Ending streaming for + * tablesync workers is deferred until the worker exits its main loop. + */ + if (!am_tablesync_worker()) + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); } /* @@ -4617,9 +4648,10 @@ InitializeLogRepWorker(void) if (am_tablesync_worker()) ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", + (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" with relid %u has started", MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); else ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" has started", diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index fdbc1183f2..39b1721dee 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -56,6 +56,8 @@ typedef struct LogicalRepWorker char relstate; XLogRecPtr relstate_lsn; slock_t relmutex; + bool relsync_completed; /* has tablesync finished syncing + * the assigned table? */ /* * Used to create the changes and subxact files for the streaming -- 2.25.1
From d15ae6faa94033a4fa66f9d29c524d459d0f7f71 Mon Sep 17 00:00:00 2001 From: Melih Mutlu <m.melihmu...@gmail.com> Date: Tue, 4 Jul 2023 22:13:52 +0300 Subject: [PATCH v22 3/3] Reuse connection when tablesync workers change the target Previously tablesync workers establish new connections when it changes the syncing table, but this might have additional overhead. This patch allows the existing connection to be reused. As for the publisher node, this patch allows to reuse logical walsender processes after the streaming is done once. --- src/backend/replication/logical/launcher.c | 1 + src/backend/replication/logical/tablesync.c | 60 ++++++++++++++------- src/backend/replication/logical/worker.c | 18 ++++--- src/backend/replication/walsender.c | 7 +++ src/include/replication/worker_internal.h | 3 ++ 5 files changed, 62 insertions(+), 27 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 25dd06b8af..657e446eaf 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -441,6 +441,7 @@ retry: worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; worker->relsync_completed = false; + worker->slotnum = slot; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 63b5bed88a..45e753b189 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -151,16 +151,6 @@ finish_sync_worker(bool reuse_worker) pgstat_report_stat(true); } - /* - * Disconnect from the publisher otherwise reusing the sync worker can - * error due to exceeding max_wal_senders. - */ - if (LogRepWorkerWalRcvConn != NULL) - { - walrcv_disconnect(LogRepWorkerWalRcvConn); - LogRepWorkerWalRcvConn = NULL; - } - /* And flush all writes. */ XLogFlush(GetXLogWriteRecPtr()); @@ -1268,6 +1258,27 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid, relid, GetSystemIdentifier()); } +/* + * Determine the application_name for tablesync workers. + * + * Previously, the replication slot name was used as application_name. Since + * it's possible to reuse tablesync workers now, a tablesync worker can handle + * several different replication slots during its lifetime. Therefore, we + * cannot use the slot name as application_name anymore. Instead, the slot + * number of the tablesync worker is used as a part of the application_name. + * + * FIXME: if the tablesync worker starts to reuse the replication slot during + * synchronization, we should again use the replication slot name as + * application_name. + */ +static void +ApplicationNameForTablesync(Oid suboid, int worker_slot, + char *application_name, Size szapp) +{ + snprintf(application_name, szapp, "pg_%u_sync_%i_" UINT64_FORMAT, suboid, + worker_slot, GetSystemIdentifier()); +} + /* * Start syncing the table in the sync worker. * @@ -1329,15 +1340,26 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) slotname, NAMEDATALEN); - /* - * Here we use the slot name instead of the subscription name as the - * application_name, so that it is different from the leader apply worker, - * so that synchronous replication can distinguish them. - */ - LogRepWorkerWalRcvConn = - walrcv_connect(MySubscription->conninfo, true, - must_use_password, - slotname, &err); + /* Connect to the publisher if haven't done so already. */ + if (LogRepWorkerWalRcvConn == NULL) + { + char application_name[NAMEDATALEN]; + + /* + * The application_name must differ from the subscription name (used by + * the leader apply worker) because synchronous replication has to be + * able to distinguish this worker from the leader apply worker. + */ + ApplicationNameForTablesync(MySubscription->oid, + MyLogicalRepWorker->slotnum, + application_name, + NAMEDATALEN); + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, + must_use_password, + application_name, &err); + } + if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d25bf5bea2..c482f707dc 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3494,20 +3494,22 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ErrorContextCallback errcallback; /* - * Init the ApplyMessageContext which we clean up after each replication - * protocol message. + * Init the ApplyMessageContext if needed. This context is cleaned up + * after each replication protocol message. */ - ApplyMessageContext = AllocSetContextCreate(ApplyContext, - "ApplyMessageContext", - ALLOCSET_DEFAULT_SIZES); + if (!ApplyMessageContext) + ApplyMessageContext = AllocSetContextCreate(ApplyContext, + "ApplyMessageContext", + ALLOCSET_DEFAULT_SIZES); /* * This memory context is used for per-stream data when the streaming mode * is enabled. This context is reset on each stream stop. */ - LogicalStreamingContext = AllocSetContextCreate(ApplyContext, - "LogicalStreamingContext", - ALLOCSET_DEFAULT_SIZES); + if (!LogicalStreamingContext) + LogicalStreamingContext = AllocSetContextCreate(ApplyContext, + "LogicalStreamingContext", + ALLOCSET_DEFAULT_SIZES); /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d27ef2985d..2f3e93cc40 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1830,7 +1830,14 @@ exec_replication_command(const char *cmd_string) if (cmd->kind == REPLICATION_KIND_PHYSICAL) StartReplication(cmd); else + { + /* + * Reset flags because reusing tablesync workers can mean + * this is the second time here. + */ + streamingDoneSending = streamingDoneReceiving = false; StartLogicalReplication(cmd); + } /* dupe, but necessary per libpqrcv_endstreaming */ EndReplicationCommand(cmdtag); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 39b1721dee..b88ff6a646 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -39,6 +39,9 @@ typedef struct LogicalRepWorker /* Increased every time the slot is taken by new worker. */ uint16 generation; + /* Slot number of this worker. */ + int slotnum; + /* Pointer to proc array. NULL if not running. */ PGPROC *proc; -- 2.25.1
From 86a02f4fd54d5d5db89df21e327777eaf2aeaed3 Mon Sep 17 00:00:00 2001 From: Melih Mutlu <m.melihmu...@gmail.com> Date: Mon, 5 Jun 2023 15:04:41 +0300 Subject: [PATCH v22 1/3] Refactor to split Apply and Tablesync Workers Both apply and tablesync workers were using ApplyWorkerMain() as entry point. As the name implies, ApplyWorkerMain() should be considered as the main function for apply workers. Tablesync worker's path was hidden and does not have enough in common to share the same main function with apply worker. Also, most of the code shared by both worker types is already combined in LogicalRepApplyLoop(). There is no need to combine the rest in ApplyWorkerMain() anymore. This patch introduces TablesyncWorkerMain() as a new entry point for tablesync workers. This aims to increase code readability and help to the upcoming reuse tablesync worker improvements. Discussion: http://postgr.es/m/CAGPVpCTq=rudd4judarc1xuwf4brh2gdsnf3rtomugj9rpp...@mail.gmail.com --- src/backend/postmaster/bgworker.c | 3 + .../replication/logical/applyparallelworker.c | 2 +- src/backend/replication/logical/launcher.c | 32 +- src/backend/replication/logical/tablesync.c | 94 ++++- src/backend/replication/logical/worker.c | 380 ++++++++---------- src/include/replication/logicalworker.h | 1 + src/include/replication/worker_internal.h | 15 +- 7 files changed, 303 insertions(+), 224 deletions(-) diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 5b4bd71694..505e38376c 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -131,6 +131,9 @@ static const struct }, { "ParallelApplyWorkerMain", ParallelApplyWorkerMain + }, + { + "TablesyncWorkerMain", TablesyncWorkerMain } }; diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 6fb96148f4..1d4e83c4c1 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -942,7 +942,7 @@ ParallelApplyWorkerMain(Datum main_arg) MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = MyLogicalRepWorker->reply_time = 0; - InitializeApplyWorker(); + InitializeLogRepWorker(); InitializingApplyWorker = false; diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 542af7d863..e231fa7f95 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -459,24 +459,30 @@ retry: snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); if (is_parallel_apply_worker) + { snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain"); - else - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); - - if (OidIsValid(relid)) snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication worker for subscription %u sync %u", subid, relid); - else if (is_parallel_apply_worker) + "logical replication parallel apply worker for subscription %u", + subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker"); + } + else if (OidIsValid(relid)) + { + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication parallel apply worker for subscription %u", subid); + "logical replication tablesync worker for subscription %u sync %u", + subid, + relid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker"); + } else + { + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication apply worker for subscription %u", subid); - - if (is_parallel_apply_worker) - snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker"); - else - snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker"); + "logical replication apply worker for subscription %u", + subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker"); + } bgw.bgw_restart_time = BGW_NEVER_RESTART; bgw.bgw_notify_pid = MyProcPid; diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 6d461654ab..ff859e0910 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -104,17 +104,21 @@ #include "nodes/makefuncs.h" #include "parser/parse_relation.h" #include "pgstat.h" +#include "postmaster/interrupt.h" #include "replication/logicallauncher.h" #include "replication/logicalrelation.h" +#include "replication/logicalworker.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "replication/slot.h" #include "replication/origin.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/array.h" #include "utils/builtins.h" +#include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/rls.h" @@ -1241,7 +1245,7 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid, * * The returned slot name is palloc'ed in current memory context. */ -char * +static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) { char *slotname; @@ -1584,6 +1588,94 @@ FetchTableStates(bool *started_tx) return has_subrels; } +/* + * Execute the initial sync with error handling. Disable the subscription, + * if it's required. + * + * Allocate the slot name in long-lived context on return. Note that we don't + * handle FATAL errors which are probably because of system resource error and + * are not repeatable. + */ +static void +start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) +{ + char *syncslotname = NULL; + + Assert(am_tablesync_worker()); + + PG_TRY(); + { + /* Call initial sync. */ + syncslotname = LogicalRepSyncTableStart(origin_startpos); + } + PG_CATCH(); + { + if (MySubscription->disableonerr) + DisableSubscriptionAndExit(); + else + { + /* + * Report the worker failed during table synchronization. Abort + * the current transaction so that the stats message is sent in an + * idle state. + */ + AbortOutOfAnyTransaction(); + pgstat_report_subscription_error(MySubscription->oid, false); + + PG_RE_THROW(); + } + } + PG_END_TRY(); + + /* allocate slot name in long-lived context */ + *myslotname = MemoryContextStrdup(ApplyContext, syncslotname); + pfree(syncslotname); +} + +/* + * Runs the tablesync worker. + * + * It starts syncing tables. After a successful sync, sets streaming options + * and starts streaming to catchup. + */ +static void +run_tablesync_worker() +{ + char originname[NAMEDATALEN]; + XLogRecPtr origin_startpos = InvalidXLogRecPtr; + char *slotname = NULL; + WalRcvStreamOptions options; + + start_table_sync(&origin_startpos, &slotname); + + ReplicationOriginNameForLogicalRep(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + + set_apply_error_context_origin(originname); + + set_stream_options(&options, slotname, &origin_startpos); + + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + + /* Start applying changes to catchup. */ + start_apply(origin_startpos); +} + +/* Logical Replication Tablesync worker entry point */ +void +TablesyncWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + + SetupApplyOrSyncWorker(worker_slot); + + run_tablesync_worker(); + + finish_sync_worker(); +} + /* * If the subscription has no tables then return false. * diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 832b1cf764..8ed6fa7acf 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -396,8 +396,6 @@ static void stream_close_file(void); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); -static void DisableSubscriptionAndExit(void); - static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, @@ -4327,6 +4325,57 @@ stream_open_and_write_change(TransactionId xid, char action, StringInfo s) stream_stop_internal(xid); } +/* + * Sets streaming options including replication slot name and origin start + * position. Workers need these options for logical replication. + */ +void +set_stream_options(WalRcvStreamOptions *options, + char *slotname, + XLogRecPtr *origin_startpos) +{ + int server_version; + + options->logical = true; + options->startpoint = *origin_startpos; + options->slotname = slotname; + + server_version = walrcv_server_version(LogRepWorkerWalRcvConn); + options->proto.logical.proto_version = + server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM : + server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : + server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : + LOGICALREP_PROTO_VERSION_NUM; + + options->proto.logical.publication_names = MySubscription->publications; + options->proto.logical.binary = MySubscription->binary; + + /* + * Assign the appropriate option value for streaming option according to + * the 'streaming' mode and the publisher's ability to support that mode. + */ + if (server_version >= 160000 && + MySubscription->stream == LOGICALREP_STREAM_PARALLEL) + { + options->proto.logical.streaming_str = "parallel"; + MyLogicalRepWorker->parallel_apply = true; + } + else if (server_version >= 140000 && + MySubscription->stream != LOGICALREP_STREAM_OFF) + { + options->proto.logical.streaming_str = "on"; + MyLogicalRepWorker->parallel_apply = false; + } + else + { + options->proto.logical.streaming_str = NULL; + MyLogicalRepWorker->parallel_apply = false; + } + + options->proto.logical.twophase = false; + options->proto.logical.origin = pstrdup(MySubscription->origin); +} + /* * Cleanup the memory for subxacts and reset the related variables. */ @@ -4361,24 +4410,18 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid) } /* - * Execute the initial sync with error handling. Disable the subscription, - * if it's required. + * Common function to run the apply loop with error handling. Disable the + * subscription, if necessary. * - * Allocate the slot name in long-lived context on return. Note that we don't - * handle FATAL errors which are probably because of system resource error and - * are not repeatable. + * Note that we don't handle FATAL errors which are probably because + * of system resource error and are not repeatable. */ -static void -start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) +void +start_apply(XLogRecPtr origin_startpos) { - char *syncslotname = NULL; - - Assert(am_tablesync_worker()); - PG_TRY(); { - /* Call initial sync. */ - syncslotname = LogicalRepSyncTableStart(origin_startpos); + LogicalRepApplyLoop(origin_startpos); } PG_CATCH(); { @@ -4387,65 +4430,132 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) else { /* - * Report the worker failed during table synchronization. Abort - * the current transaction so that the stats message is sent in an + * Report the worker failed while applying changes. Abort the + * current transaction so that the stats message is sent in an * idle state. */ AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, false); + pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); PG_RE_THROW(); } } PG_END_TRY(); - - /* allocate slot name in long-lived context */ - *myslotname = MemoryContextStrdup(ApplyContext, syncslotname); - pfree(syncslotname); } /* - * Run the apply loop with error handling. Disable the subscription, - * if necessary. + * Runs the leader apply worker. * - * Note that we don't handle FATAL errors which are probably because - * of system resource error and are not repeatable. + * It sets up replication origin, streaming options and then starts streaming. */ static void -start_apply(XLogRecPtr origin_startpos) +run_apply_worker() { - PG_TRY(); + char originname[NAMEDATALEN]; + XLogRecPtr origin_startpos = InvalidXLogRecPtr; + char *slotname = NULL; + WalRcvStreamOptions options; + RepOriginId originid; + TimeLineID startpointTLI; + char *err; + bool must_use_password; + + slotname = MySubscription->slotname; + + /* + * This shouldn't happen if the subscription is enabled, but guard + * against DDL bugs or manual catalog changes. (libpqwalreceiver will + * crash if slot is NULL.) + */ + if (!slotname) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("subscription has no replication slot set"))); + + /* Setup replication origin tracking. */ + ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, + originname, sizeof(originname)); + StartTransactionCommand(); + originid = replorigin_by_name(originname, true); + if (!OidIsValid(originid)) + originid = replorigin_create(originname); + replorigin_session_setup(originid, 0); + replorigin_session_origin = originid; + origin_startpos = replorigin_session_get_progress(false); + + /* Is the use of a password mandatory? */ + must_use_password = MySubscription->passwordrequired && + !superuser_arg(MySubscription->owner); + + /* Note that the superuser_arg call can access the DB */ + CommitTransactionCommand(); + + LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, + must_use_password, + MySubscription->name, &err); + + if (LogRepWorkerWalRcvConn == NULL) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); + + /* + * We don't really use the output identify_system for anything but it + * does some initializations on the upstream so let's still call it. + */ + (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); + + set_apply_error_context_origin(originname); + + set_stream_options(&options, slotname, &origin_startpos); + + /* + * Even when the two_phase mode is requested by the user, it remains + * as the tri-state PENDING until all tablesyncs have reached READY + * state. Only then, can it become ENABLED. + * + * Note: If the subscription has no tables then leave the state as + * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) { - LogicalRepApplyLoop(origin_startpos); + /* Start streaming with two_phase enabled */ + options.proto.logical.twophase = true; + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + + StartTransactionCommand(); + UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); + MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + CommitTransactionCommand(); } - PG_CATCH(); + else { - if (MySubscription->disableonerr) - DisableSubscriptionAndExit(); - else - { - /* - * Report the worker failed while applying changes. Abort the - * current transaction so that the stats message is sent in an - * idle state. - */ - AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); - - PG_RE_THROW(); - } + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); } - PG_END_TRY(); + + ereport(DEBUG1, + (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s", + MySubscription->name, + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : + "?"))); + + /* Run the main loop. */ + start_apply(origin_startpos); } /* - * Common initialization for leader apply worker and parallel apply worker. + * Common initialization for leader apply worker, parallel apply worker and + * tablesync worker. * * Initialize the database connection, in-memory subscription and necessary * config options. */ void -InitializeApplyWorker(void) +InitializeLogRepWorker(void) { MemoryContext oldctx; @@ -4518,22 +4628,15 @@ InitializeApplyWorker(void) CommitTransactionCommand(); } -/* Logical Replication Apply worker entry point */ +/* Common function to setup the leader apply or tablesync worker. */ void -ApplyWorkerMain(Datum main_arg) +SetupApplyOrSyncWorker(int worker_slot) { - int worker_slot = DatumGetInt32(main_arg); - char originname[NAMEDATALEN]; - XLogRecPtr origin_startpos = InvalidXLogRecPtr; - char *myslotname = NULL; - WalRcvStreamOptions options; - int server_version; - - InitializingApplyWorker = true; - /* Attach to slot */ logicalrep_worker_attach(worker_slot); + Assert(am_tablesync_worker() || am_leader_apply_worker()); + /* Setup signal handling */ pqsignal(SIGHUP, SignalHandlerForConfigReload); pqsignal(SIGTERM, die); @@ -4551,79 +4654,12 @@ ApplyWorkerMain(Datum main_arg) /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); - InitializeApplyWorker(); - - InitializingApplyWorker = false; + InitializeLogRepWorker(); /* Connect to the origin and start the replication. */ elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); - if (am_tablesync_worker()) - { - start_table_sync(&origin_startpos, &myslotname); - - ReplicationOriginNameForLogicalRep(MySubscription->oid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); - set_apply_error_context_origin(originname); - } - else - { - /* This is the leader apply worker */ - RepOriginId originid; - TimeLineID startpointTLI; - char *err; - bool must_use_password; - - myslotname = MySubscription->slotname; - - /* - * This shouldn't happen if the subscription is enabled, but guard - * against DDL bugs or manual catalog changes. (libpqwalreceiver will - * crash if slot is NULL.) - */ - if (!myslotname) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("subscription has no replication slot set"))); - - /* Setup replication origin tracking. */ - StartTransactionCommand(); - ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, - originname, sizeof(originname)); - originid = replorigin_by_name(originname, true); - if (!OidIsValid(originid)) - originid = replorigin_create(originname); - replorigin_session_setup(originid, 0); - replorigin_session_origin = originid; - origin_startpos = replorigin_session_get_progress(false); - - /* Is the use of a password mandatory? */ - must_use_password = MySubscription->passwordrequired && - !superuser_arg(MySubscription->owner); - - /* Note that the superuser_arg call can access the DB */ - CommitTransactionCommand(); - - LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, - must_use_password, - MySubscription->name, &err); - if (LogRepWorkerWalRcvConn == NULL) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not connect to the publisher: %s", err))); - - /* - * We don't really use the output identify_system for anything but it - * does some initializations on the upstream so let's still call it. - */ - (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); - - set_apply_error_context_origin(originname); - } - /* * Setup callback for syscache so that we know when something changes in * the subscription relation state. @@ -4631,91 +4667,21 @@ ApplyWorkerMain(Datum main_arg) CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, invalidate_syncing_table_states, (Datum) 0); +} - /* Build logical replication streaming options. */ - options.logical = true; - options.startpoint = origin_startpos; - options.slotname = myslotname; - - server_version = walrcv_server_version(LogRepWorkerWalRcvConn); - options.proto.logical.proto_version = - server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM : - server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : - server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : - LOGICALREP_PROTO_VERSION_NUM; - - options.proto.logical.publication_names = MySubscription->publications; - options.proto.logical.binary = MySubscription->binary; - - /* - * Assign the appropriate option value for streaming option according to - * the 'streaming' mode and the publisher's ability to support that mode. - */ - if (server_version >= 160000 && - MySubscription->stream == LOGICALREP_STREAM_PARALLEL) - { - options.proto.logical.streaming_str = "parallel"; - MyLogicalRepWorker->parallel_apply = true; - } - else if (server_version >= 140000 && - MySubscription->stream != LOGICALREP_STREAM_OFF) - { - options.proto.logical.streaming_str = "on"; - MyLogicalRepWorker->parallel_apply = false; - } - else - { - options.proto.logical.streaming_str = NULL; - MyLogicalRepWorker->parallel_apply = false; - } - - options.proto.logical.twophase = false; - options.proto.logical.origin = pstrdup(MySubscription->origin); +/* Logical Replication Apply worker entry point */ +void +ApplyWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); - if (!am_tablesync_worker()) - { - /* - * Even when the two_phase mode is requested by the user, it remains - * as the tri-state PENDING until all tablesyncs have reached READY - * state. Only then, can it become ENABLED. - * - * Note: If the subscription has no tables then leave the state as - * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to - * work. - */ - if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && - AllTablesyncsReady()) - { - /* Start streaming with two_phase enabled */ - options.proto.logical.twophase = true; - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + InitializingApplyWorker = true; - StartTransactionCommand(); - UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); - MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; - CommitTransactionCommand(); - } - else - { - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); - } + SetupApplyOrSyncWorker(worker_slot); - ereport(DEBUG1, - (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s", - MySubscription->name, - MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : - MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : - MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : - "?"))); - } - else - { - /* Start normal logical streaming replication. */ - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); - } + InitializingApplyWorker = false; - /* Run the main loop. */ - start_apply(origin_startpos); + run_apply_worker(); proc_exit(0); } @@ -4724,7 +4690,7 @@ ApplyWorkerMain(Datum main_arg) * After error recovery, disable the subscription in a new transaction * and exit cleanly. */ -static void +void DisableSubscriptionAndExit(void) { /* diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 39588da79f..bbd71d0b42 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -18,6 +18,7 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); +extern void TablesyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 343e781896..fdbc1183f2 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -19,6 +19,7 @@ #include "datatype/timestamp.h" #include "miscadmin.h" #include "replication/logicalrelation.h" +#include "replication/walreceiver.h" #include "storage/buffile.h" #include "storage/fileset.h" #include "storage/lock.h" @@ -243,7 +244,6 @@ extern int logicalrep_sync_worker_count(Oid subid); extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname); -extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); extern bool AllTablesyncsReady(void); extern void UpdateTwoPhaseState(Oid suboid, char new_state); @@ -265,7 +265,17 @@ extern void maybe_reread_subscription(void); extern void stream_cleanup_files(Oid subid, TransactionId xid); -extern void InitializeApplyWorker(void); +extern void set_stream_options(WalRcvStreamOptions *options, + char *slotname, + XLogRecPtr *origin_startpos); + +extern void start_apply(XLogRecPtr origin_startpos); + +extern void InitializeLogRepWorker(void); + +extern void SetupApplyOrSyncWorker(int worker_slot); + +extern void DisableSubscriptionAndExit(void); extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn); @@ -305,6 +315,7 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); + #define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid) static inline bool -- 2.25.1
From 2c5223f7eb24798b1da0f5a080fe9c9038a250b5 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Mon, 31 Jul 2023 22:05:34 +0530 Subject: [PATCH] Fix for Table sync worker sending the feedback even after the streaming is ended. In the case when the tablesync worker has to apply the transactions after the table is synced, the tablesync worker sends the feedback of writepos, applypos and flushpos which results in "No copy in progress" error as the stream was ended already. Fixed it by exiting the streaming loop if the tablesync worker is done with the synchronization. --- src/backend/replication/logical/worker.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d25bf5bea2..23ed4adfa4 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3634,7 +3634,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) if (MyLogicalRepWorker->relsync_completed) { endofstream = true; - break; + goto streaming_done; } } @@ -3669,6 +3669,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) endofstream = true; } +streaming_done: /* Cleanup the memory. */ MemoryContextResetAndDeleteChildren(ApplyMessageContext); MemoryContextSwitchTo(TopMemoryContext); -- 2.34.1