Here are my review comments for v40-0001. ======
src/backend/replication/logical/worker.c 1. should_apply_changes_for_rel + else if (am_parallel_apply_worker()) + { + if (rel->state != SUBREL_STATE_READY) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication parallel apply worker for subscription \"%s\" will stop", + MySubscription->name), + errdetail("Cannot handle streamed replication transaction using parallel " + "apply workers until all tables are synchronized."))); 1a. "transaction" -> "transactions" 1b. "are synchronized" -> "have been synchronized." e.g. "Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized." ~~~ 2. maybe_reread_subscription + if (am_parallel_apply_worker()) + ereport(LOG, + (errmsg("logical replication parallel apply worker for subscription \"%s\" will " + "stop because the subscription was removed", + MySubscription->name))); + else + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will " + "stop because the subscription was removed", + MySubscription->name))); Maybe there is an easier way to code this instead of if/else and cut/paste message text: SUGGESTION ereport(LOG, (errmsg("logical replication %s for subscription \"%s\" will stop because the subscription was removed", am_parallel_apply_worker() ? "parallel apply worker" : "apply worker", MySubscription->name))); ~~~ 3. + if (am_parallel_apply_worker()) + ereport(LOG, + (errmsg("logical replication parallel apply worker for subscription \"%s\" will " + "stop because the subscription was disabled", + MySubscription->name))); + else + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will " + "stop because the subscription was disabled", + MySubscription->name))); These can be combined like comment #2 above SUGGESTION ereport(LOG, (errmsg("logical replication %s for subscription \"%s\" will stop because the subscription was disabled", am_parallel_apply_worker() ? "parallel apply worker" : "apply worker", MySubscription->name))); ~~~ 4. + if (am_parallel_apply_worker()) + ereport(LOG, + (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change", + MySubscription->name))); + else + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change", + MySubscription->name))); These can be combined like comment #2 above SUGGESTION ereport(LOG, (errmsg("logical replication %s for subscription \"%s\" will restart because of a parameter change", am_parallel_apply_worker() ? "parallel apply worker" : "apply worker", MySubscription->name))); ~~~~ 4. InitializeApplyWorker + if (am_parallel_apply_worker()) + ereport(LOG, + (errmsg("logical replication parallel apply worker for subscription %u will not " + "start because the subscription was removed during startup", + MyLogicalRepWorker->subid))); + else + ereport(LOG, + (errmsg("logical replication apply worker for subscription %u will not " + "start because the subscription was removed during startup", + MyLogicalRepWorker->subid))); These can be combined like comment #2 above SUGGESTION ereport(LOG, (errmsg("logical replication %s for subscription %u will not start because the subscription was removed during startup", am_parallel_apply_worker() ? "parallel apply worker" : "apply worker", MyLogicalRepWorker->subid))); ~~~ 5. + else if (am_parallel_apply_worker()) + ereport(LOG, + (errmsg("logical replication parallel apply worker for subscription \"%s\" has started", + MySubscription->name))); else ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" has started", MySubscription->name))); The last if/else can be combined same as comment #2 above SUGGESTION else ereport(LOG, (errmsg("logical replication %s for subscription \"%s\" has started", am_parallel_apply_worker() ? "parallel apply worker" : "apply worker", MySubscription->name))); ~~~ 6. IsLogicalParallelApplyWorker +bool +IsLogicalParallelApplyWorker(void) +{ + return IsLogicalWorker() && am_parallel_apply_worker(); +} Patch v40 added the IsLogicalWorker() to the condition, but why is that extra check necessary? ====== 7. src/include/replication/worker_internal.h +typedef struct ParallelApplyWorkerInfo +{ + shm_mq_handle *mq_handle; + + /* + * The queue used to transfer messages from the parallel apply worker to + * the leader apply worker. + */ + shm_mq_handle *error_mq_handle; In patch v40 the comment about the NULL error_mq_handle is removed, but since the code still explicitly set/checks NULL in different places isn't it still better to have some comment here to describe what NULL means? ------ Kind Regards, Peter Smith. Fujitsu Australia