Here are some review comments for v47-0001 (This review is a WIP - I will post more comments for this patch next week)
====== .../replication/logical/applyparallelworker.c 1. + * Copyright (c) 2022, PostgreSQL Global Development Group + * + * IDENTIFICATION src/backend/replication/logical/applyparallelworker.c + * This IDENTIFICATION should be on 2 lines like it previously was instead of wrapped into one line. For consistency with all other file headers. ~~~ 2. File header comment + * Since the database structure (schema of subscription tables, etc.) of + * publisher and subscriber may be different. Incomplete sentence? ~~~ 3. + * When the following two scenarios occur, a deadlock occurs. Actually, you described three scenarios in this comment. Not two. SUGGESTION The following scenarios can cause a deadlock. ~~~ 4. + * LA (waiting to acquire the local transaction lock) -> PA1 (waiting to + * acquire the lock on the unique index) -> PA2 (waiting to acquire the lock on + * the remote transaction) -> LA "PA1" -> "PA-1" "PA2" -> "PA-2" ~~~ 5. + * To resolve this issue, we use non-blocking write and wait with a timeout. If + * timeout is exceeded, the LA report an error and restart logical replication. "report" --> "reports" "restart" -> "restarts" OR "LA report" -> "LA will report" ~~~ 6. pa_wait_for_xact_state +/* + * Wait until the parallel apply worker's transaction state reach or exceed the + * given xact_state. + */ +static void +pa_wait_for_xact_state(ParallelApplyWorkerShared *wshared, + ParallelTransState xact_state) "reach or exceed" -> "reaches or exceeds" ~~~ 7. pa_stream_abort + /* + * Although the lock can be automatically released during transaction + * rollback, but we still release the lock here as we may not in a + * transaction. + */ + pa_unlock_transaction(xid, AccessShareLock); "but we still" -> "we still" "we may not in a" -> "we may not be in a" ~~~ 8. + pa_savepoint_name(MySubscription->oid, subxid, spname, + sizeof(spname)); + Unnecessary wrapping ~~~ 9. + for (i = list_length(subxactlist) - 1; i >= 0; i--) + { + TransactionId xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i)); + + if (xid_tmp == subxid) + { + found = true; + break; + } + } + + if (found) + { + RollbackToSavepoint(spname); + CommitTransactionCommand(); + subxactlist = list_truncate(subxactlist, i + 1); + } This code logic does not seem to require the 'found' flag. You can do the RollbackToSavepoint/CommitTransactionCommand/list_truncate before the break. ~~~ 10. pa_lock/unlock _stream/_transaction +/* + * Helper functions to acquire and release a lock for each stream block. + * + * Set locktag_field4 to 0 to indicate that it's a stream lock. + */ +/* + * Helper functions to acquire and release a lock for each local transaction. + * + * Set locktag_field4 to 1 to indicate that it's a transaction lock. Should constants/defines/enums replace those magic numbers 0 and 1? ~~~ 11. pa_lock_transaction + * Note that all the callers are passing remote transaction ID instead of local + * transaction ID as xid. This is because the local transaction ID will only be + * assigned while applying the first change in the parallel apply, but it's + * possible that the first change in parallel apply worker is blocked by a + * concurrently executing transaction in another parallel apply worker causing + * the leader cannot get local transaction ID. "causing the leader cannot" -> "which means the leader cannot" (??) ====== src/backend/replication/logical/worker.c 12. TransApplyAction +/* + * What action to take for the transaction. + * + * TRANS_LEADER_APPLY: + * The action means that we are in the leader apply worker and changes of the + * transaction are applied directly in the worker. + * + * TRANS_LEADER_SERIALIZE: + * It means that we are in the leader apply worker or table sync worker. + * Changes are written to temporary files and then applied when the final + * commit arrives. + * + * TRANS_LEADER_SEND_TO_PARALLEL: + * The action means that we are in the leader apply worker and need to send the + * changes to the parallel apply worker. + * + * TRANS_PARALLEL_APPLY: + * The action that we are in the parallel apply worker and changes of the + * transaction are applied directly in the worker. + */ +typedef enum 12a Too many various ways of saying the same thing: "The action means that we..." "It means that we..." "The action that we..." (typo?) Please word all these comments consistently ~ 12b. "directly in the worker" -> "directly by the worker" (??) 2x ~~~ 13. get_worker_name +/* + * Return the name of the logical replication worker. + */ +static const char * +get_worker_name(void) +{ + if (am_tablesync_worker()) + return _("logical replication table synchronization worker"); + else if (am_parallel_apply_worker()) + return _("logical replication parallel apply worker"); + else + return _("logical replication apply worker"); +} This function belongs nearer the top of the module (above all the error messages that are using it). ------ Kind Regards, Peter Smith. Fujitsu Australia