Hi, here are some review comments for patch v16-0002. ====== Commit message
1. This commit allows reusing tablesync workers for syncing more than one table sequentially during their lifetime, instead of exiting after only syncing one table. Before this commit, tablesync workers were capable of syncing only one table. For each table, a new sync worker was launched and that worker would exit when done processing the table. Now, tablesync workers are not limited to processing only one table. When done, they can move to processing another table in the same subscription. ~ IMO that first paragraph can be removed because AFAIK the other paragraphs are saying exactly the same thing but worded differently. ====== src/backend/replication/logical/tablesync.c 2. General -- for clean_sync_worker and finish_sync_worker TBH, I found the separation of clean_sync_worker() and finish_sync_worker() to be confusing. Can't it be rearranged to keep the same function but just pass a boolean to tell it to exit or not exit? e.g. finish_sync_worker(bool reuse_worker) { ... } ~~~ 3. clean_sync_worker /* - * Commit any outstanding transaction. This is the usual case, unless - * there was nothing to do for the table. + * Commit any outstanding transaction. This is the usual case, unless there + * was nothing to do for the table. */ The word wrap seems OK, except the change seemed unrelated to this patch (??) ~~~ 4. + /* + * Disconnect from publisher. Otherwise reused sync workers causes + * exceeding max_wal_senders + */ Missing period, and not an English sentence. SUGGESTION (??) Disconnect from the publisher otherwise reusing the sync worker can error due to exceeding max_wal_senders. ~~~ 5. finish_sync_worker +/* + * Exit routine for synchronization worker. + */ +void +pg_attribute_noreturn() +finish_sync_worker(void) +{ + clean_sync_worker(); + /* And flush all writes. */ XLogFlush(GetXLogWriteRecPtr()); StartTransactionCommand(); ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); + (errmsg("logical replication table synchronization worker for subscription \"%s\" has finished", + MySubscription->name))); CommitTransactionCommand(); In the original code, the XLogFlush was in a slightly different order than in this refactored code. E.g. it came before signalling the apply worker. Is it OK to be changed? Keeping one function (suggested in #2) can maybe remove this potential issue. ====== src/backend/replication/logical/worker.c 6. LogicalRepApplyLoop + /* + * apply_dispatch() may have gone into apply_handle_commit() + * which can call process_syncing_tables_for_sync. + * + * process_syncing_tables_for_sync decides whether the sync of + * the current table is completed. If it is completed, + * streaming must be already ended. So, we can break the loop. + */ + if (MyLogicalRepWorker->is_sync_completed) + { + endofstream = true; + break; + } + and + /* + * If is_sync_completed is true, this means that the tablesync + * worker is done with synchronization. Streaming has already been + * ended by process_syncing_tables_for_sync. We should move to the + * next table if needed, or exit. + */ + if (MyLogicalRepWorker->is_sync_completed) + endofstream = true; ~ Instead of those code fragments above assigning 'endofstream' as a side-effect, would it be the same (but tidier) to just modify the other "breaking" condition below: BEFORE: /* Check if we need to exit the streaming loop. */ if (endofstream) break; AFTER: /* Check if we need to exit the streaming loop. */ if (endofstream || MyLogicalRepWorker->is_sync_completed) break; ~~~ 7. LogicalRepApplyLoop + /* + * Tablesync workers should end streaming before exiting the main loop to + * drop replication slot. Only end streaming here for apply workers. + */ + if (!am_tablesync_worker()) + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); This comment does not seem very clear. Maybe it can be reworded: SUGGESTION End streaming here only for apply workers. Ending streaming for tablesync workers is deferred until ... because ... ~~~ 8. TablesyncWorkerMain + StartTransactionCommand(); + ereport(LOG, + (errmsg("%s for subscription \"%s\" has moved to sync table \"%s\" with relid %u.", + get_worker_name(), + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid), + MyLogicalRepWorker->relid))); + CommitTransactionCommand(); The "has moved to..." terminology is unusual. If you say something "will be reused to..." then it matches better the commit message etc. ~~~ 9. + if (!is_table_found) + break; Instead of an infinite loop that is exited by this 'break' it might be better to rearrange the logic slightly so the 'for' loop can exit normally: BEFORE: for (;;) AFTER for (; !done;) ====== src/include/replication/worker_internal.h 10. XLogRecPtr relstate_lsn; slock_t relmutex; + /* + * Indicates whether tablesync worker has completed sycning its assigned + * table. If true, no need to continue with that table. + */ + bool is_sync_completed; + 10a. Typo /sycning/syncing/ ~ 10b. All the other tablesync-related fields of this struct are named as relXXX, so I wonder if is better for this to follow the same pattern. e.g. 'relsync_completed' ~ 10c. "If true, no need to continue with that table.". I am not sure if this sentence is adding anything useful. ------ Kind Regards, Peter Smith. Fujitsu Australia