Here are my review comments for v5-0001. I will take a look at the v5-0002 (TAP) patch another time.
====== 1. Commit message The message still refers to "apply background". Should that say "apply background worker"? Other parts just call this the "worker". Personally, I think it might be better to coin some new term for this thing (e.g. "apply-bgworker" or something like that of your choosing) so then you can just concisely *always* refer to that everywhere without any ambiguity. e.g same applies to every comment and every message in this patch. They should all use identical terminology (e.g. "apply-bgworker"). ~~~ 2. Commit message "We also need to allow stream_stop to complete by the apply background to finish it to..." Wording: ??? ~~~ 3. Commit message This patch also extends the subscription streaming option so that user can control whether apply the streaming transaction in a apply background or spill the change to disk. Wording: "user" -> "the user" Typo: "whether apply" -> "whether to apply" Typo: "a apply" -> "an apply" ~~~ 4. Commit message User can set the streaming option to 'on/off', 'apply'. For now, 'apply' means the streaming will be applied via a apply background if available. 'on' means the streaming transaction will be spilled to disk. I think "apply" might not be the best choice of values for this meaning, but I think Hou-san already said [1] that this was being reconsidered. ~~~ 5. doc/src/sgml/catalogs.sgml - formatting @@ -7863,11 +7863,15 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l <row> <entry role="catalog_table_entry"><para role="column_definition"> - <structfield>substream</structfield> <type>bool</type> + <structfield>substream</structfield> <type>char</type> </para> <para> - If true, the subscription will allow streaming of in-progress - transactions + Controls how to handle the streaming of in-progress transactions. + <literal>f</literal> = disallow streaming of in-progress transactions + <literal>o</literal> = spill the changes of in-progress transactions to + disk and apply at once after the transaction is committed on the + publisher. + <literal>a</literal> = apply changes directly using a background worker </para></entry> </row> Needs to be consistent with other value lists on this page. 5a. The first sentence to end with ":" 5b. List items to end with "," ~~~ 6. doc/src/sgml/ref/create_subscription.sgml + <para> + If set to <literal>apply</literal> incoming + changes are directly applied via one of the background worker, if + available. If no background worker is free to handle streaming + transaction then the changes are written to a file and applied after + the transaction is committed. Note that if error happen when applying + changes in background worker, it might not report the finish LSN of + the remote transaction in server log. </para> 6a. Typo: "one of the background worker," -> "one of the background workers," 6b. Wording BEFORE Note that if error happen when applying changes in background worker, it might not report the finish LSN of the remote transaction in server log. SUGGESTION Note that if an error happens when applying changes in a background worker, it might not report the finish LSN of the remote transaction in the server log. ~~~ 7. src/backend/commands/subscriptioncmds.c - defGetStreamingMode +static char +defGetStreamingMode(DefElem *def) +{ + /* + * If no parameter given, assume "true" is meant. + */ + if (def->arg == NULL) + return SUBSTREAM_ON; But is that right? IIUC all the docs said that the default is OFF. ~~~ 8. src/backend/commands/subscriptioncmds.c - defGetStreamingMode + /* + * The set of strings accepted here should match up with the + * grammar's opt_boolean_or_string production. + */ + if (pg_strcasecmp(sval, "true") == 0 || + pg_strcasecmp(sval, "on") == 0) + return SUBSTREAM_ON; + if (pg_strcasecmp(sval, "apply") == 0) + return SUBSTREAM_APPLY; + if (pg_strcasecmp(sval, "false") == 0 || + pg_strcasecmp(sval, "off") == 0) + return SUBSTREAM_OFF; Perhaps should re-order these OFF/ON/APPLY to be consistent with the T_Integer case above here. ~~~ 9. src/backend/replication/logical/launcher.c - logicalrep_worker_launch The "start new apply background worker ..." function comment feels a bit misleading now that seems what you are calling this new kind of worker. E.g. this is also called to start the sync worker. And also for the apply worker (which we are not really calling a "background worker" in other places). This comment is the same as [PSv4] #19. ~~~ 10. src/backend/replication/logical/launcher.c - logicalrep_worker_launch @@ -275,6 +280,9 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, int nsyncworkers; TimestampTz now; + /* We don't support table sync in subworker */ + Assert(!((subworker_dsm != DSM_HANDLE_INVALID) && OidIsValid(relid))); I think you should declare a new variable like: bool is_subworker = subworker_dsm != DSM_HANDLE_INVALID; Then this Assert can be simplified, and also you can re-use the 'is_subworker' later multiple times in this same function to simplify lots of other code also. ~~~ 11. src/backend/replication/logical/launcher.c - logicalrep_worker_stop_internal +/* + * Workhorse for logicalrep_worker_stop() and logicalrep_worker_detach(). Stop + * the worker and wait for wait for it to die. + */ +static void +logicalrep_worker_stop_internal(LogicalRepWorker *worker) Typo: "wait for" is repeated 2x. ~~~ 12. src/backend/replication/logical/origin.c - replorigin_session_setup @@ -1110,7 +1110,11 @@ replorigin_session_setup(RepOriginId node) if (curstate->roident != node) continue; - else if (curstate->acquired_by != 0) + /* + * We allow the apply worker to get the slot which is acquired by its + * leader process. + */ + else if (curstate->acquired_by != 0 && acquire) I still feel this is overly-cofusing. Shouldn't comment say "Allow the apply bgworker to get the slot...". Also the parameter name 'acquire' is hard to reconcile with the comment. E.g. I feel all this would be easier to understand if the param was was refactored with a name like 'bgworker' and the code was changed to: else if (curstate->acquired_by != 0 && !bgworker) Of course, the value true/false would need to be flipped on calls too. This is the same as my previous comment [PSv4] #26. ~~~ 13. src/backend/replication/logical/proto.c @@ -1138,14 +1138,11 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, /* * Read STREAM COMMIT from the output stream. */ -TransactionId +void logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data) { - TransactionId xid; uint8 flags; - xid = pq_getmsgint(in, 4); - /* read flags (unused for now) */ flags = pq_getmsgbyte(in); There is something incompatible with the read/write functions here. The write writes the txid before the flags, but the read_commit does not read it at all – if only reads the flags (???) if this is really correct then I think there need to be some comments to explain WHY it is correct. NOTE: See also review comment 28 where I proposed another way to write this code. ~~~ 14. src/backend/replication/logical/worker.c - comment The whole comment is similar to the commit message so any changes there should be made here also. ~~~ 15. src/backend/replication/logical/worker.c - ParallelState +/* + * Shared information among apply workers. + */ +typedef struct ParallelState It looks like there is already another typedef called "ParallelState" because it is already in the typedefs.list. Maybe this name should be changed or maybe make it static or something? ~~~ 16. src/backend/replication/logical/worker.c - defines +/* + * States for apply background worker. + */ +#define APPLY_BGWORKER_ATTACHED 'a' +#define APPLY_BGWORKER_READY 'r' +#define APPLY_BGWORKER_BUSY 'b' +#define APPLY_BGWORKER_FINISHED 'f' +#define APPLY_BGWORKER_EXIT 'e' Those char states all look independent. So wouldn’t this be represented better as an enum to reinforce that fact? ~~~ 17. src/backend/replication/logical/worker.c - functions +/* Worker setup and interactions */ +static WorkerState *apply_bgworker_setup(void); +static WorkerState *find_or_start_apply_bgworker(TransactionId xid, + bool start); Maybe rename to apply_bgworker_find_or_start() to match the pattern of the others? ~~~ 18. src/backend/replication/logical/worker.c - macros +#define am_apply_bgworker() (MyLogicalRepWorker->subworker) +#define applying_changes_in_bgworker() (in_streamed_transaction && stream_apply_worker != NULL) 18a. Somehow I felt these are not in the best place. - Maybe am_apply_bgworker() should be in worker_internal.h? - Maybe the applying_changes_in_bgworker() should be nearby the stream_apply_worker declaration 18b. Maybe applying_changes_in_bgworker should be renamed to something else to match the pattern of the others (e.g. "apply_bgworker_active" or something) ~~~ 19. src/backend/replication/logical/worker.c - handle_streamed_transaction + /* + * If we decided to apply the changes of this transaction in a apply + * background worker, pass the data to the worker. + */ Typo: "in a apply" -> "in an apply" ~~~ 20. src/backend/replication/logical/worker.c - handle_streamed_transaction + /* + * XXX The publisher side doesn't always send relation update message + * after the streaming transaction, so update the relation in main + * apply worker here. + */ Wording: "doesn't always send relation update message" -> "doesn't always send relation update messages" ?? ~~~ 21. src/backend/replication/logical/worker.c - apply_handle_commit_prepared + apply_bgworker_set_state(APPLY_BGWORKER_FINISHED); It seems somewhat confusing to see calls to apply_bgworker_set_state() when we may or may not even be an apply bgworker. I know it adds more code, but I somehow feel it is more readable if all these calls were changed to look below. Please consider it. SUGGESTION if (am_bgworker()) apply_bgworker_set_state(XXX); Then you can also change the apply_bgworker_set_state to Assert(am_apply_bgworker()); ~~~ 22. src/backend/replication/logical/worker.c - find_or_start_apply_bgworker + + if (!start && ApplyWorkersHash == NULL) + return NULL; + IIUC maybe this extra check is not really necessary. I see no harm to create the HashTable even if was called in this state. If the 'start' flag is false then nothing is going to be found anyway, so it will return NULL. e.g. Might as well make the code a few lines shorter/simpler by removing this check. ~~~ 23. src/backend/replication/logical/worker.c - apply_bgworker_free +/* + * Add the worker to the freelist and remove the entry from hash table. + */ +static void +apply_bgworker_free(WorkerState *wstate) +{ + bool found; + MemoryContext oldctx; + TransactionId xid = wstate->pstate->stream_xid; If you are not going to check the value of 'found' then why bother to pass this param at all? Can't you just pass NULL? ~~~ 24. src/backend/replication/logical/worker.c - apply_bgworker_free Should there be an Assert that the bgworker state really was FINISHED? I think I asked this already [PSv4] #48. ~~~ 24. src/backend/replication/logical/worker.c - apply_handle_stream_start @@ -1088,24 +1416,71 @@ apply_handle_stream_prepare(StringInfo s) logicalrep_read_stream_prepare(s, &prepare_data); set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn); - elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid); + /* + * If we are in a bgworker, just prepare the transaction. + */ + if (am_apply_bgworker()) Don’t need to say "If we are..." because the am_apply_worker() condition makes it clear this is true. ~~~ 25. src/backend/replication/logical/worker.c - apply_handle_stream_start - if (MyLogicalRepWorker->stream_fileset == NULL) + stream_apply_worker = find_or_start_apply_bgworker(stream_xid, first_segment); + + if (applying_changes_in_bgworker()) { IIUC this condition seems overkill. I think you can just say if (stream_apply_worker) ~~~ 26. src/backend/replication/logical/worker.c - apply_handle_stream_abort + if (found) + { + elog(LOG, "rolled back to savepoint %s", spname); + RollbackToSavepoint(spname); + CommitTransactionCommand(); + subxactlist = list_truncate(subxactlist, i + 1); + } Should that elog use the "[Apply BGW #%u]" format like the others for BGW? ~~~ 27. src/backend/replication/logical/worker.c - apply_handle_stream_abort Should this function be setting stream_apply_worker = NULL somewhere when all is done? ~~~ 28. src/backend/replication/logical/worker.c - apply_handle_stream_commit +/* + * Handle STREAM COMMIT message. + */ +static void +apply_handle_stream_commit(StringInfo s) +{ + LogicalRepCommitData commit_data; + TransactionId xid; + + if (in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("STREAM COMMIT message without STREAM STOP"))); + + xid = pq_getmsgint(s, 4); + logicalrep_read_stream_commit(s, &commit_data); + set_apply_error_context_xact(xid, commit_data.commit_lsn); There is something a bit odd about this code. I think the logicalrep_read_stream_commit() should take another param and the Txid be extracted/read only INSIDE that logicalrep_read_stream_commit function. See also review comment #13. ~~~ 29. src/backend/replication/logical/worker.c - apply_handle_stream_commit I am unsure, but should something be setting the stream_apply_worker = NULL somewhere when all is done? ~~~ 30. src/backend/replication/logical/worker.c - LogicalApplyBgwLoop 30a. + if (shmq_res != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("lost connection to the main apply worker"))); 30b. + default: + elog(ERROR, "unexpected message"); + break; Should both those error messages have the "[Apply BGW #%u]" prefix like the other BGW messages? ~~~ 31. src/backend/replication/logical/worker.c - ApplyBgwShutdown +/* + * Set the failed flag so that the main apply worker can realize we have + * shutdown. + */ +static void +ApplyBgwShutdown(int code, Datum arg) The comment does not seem to be in sync with the code. E.g. Wording: "failed flag" -> "exit state" ?? ~~~ 32. src/backend/replication/logical/worker.c - ApplyBgwShutdown +/* + * Set the failed flag so that the main apply worker can realize we have + * shutdown. + */ +static void +ApplyBgwShutdown(int code, Datum arg) If the 'code' param is deliberately unused it might be better to say so in the comment... ~~~ 33. src/backend/replication/logical/worker.c - LogicalApplyBgwMain 33a. + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to map dynamic shared memory segment"))); 33b. + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + 33c. + ereport(LOG, + (errmsg("logical replication apply worker for subscription %u will not " + "start because the subscription was removed during startup", + MyLogicalRepWorker->subid))); Should all these messages have "[Apply BGW ?]" prefix even though they are not yet attached? ~~~ 34. src/backend/replication/logical/worker.c - setup_dsm + * We need one key to register the location of the header, and we need + * nworkers keys to track the locations of the message queues. + */ This comment about 'nworkers' seems stale because that variable no longer exists. ~~~ 35. src/backend/replication/logical/worker.c - apply_bgworker_setup +/* + * Start apply worker background worker process and allocat shared memory for + * it. + */ +static WorkerState * +apply_bgworker_setup(void) typo: "allocat" -> "allocate" ~~~ 36. src/backend/replication/logical/worker.c - apply_bgworker_setup + elog(LOG, "setting up apply worker #%u", list_length(ApplyWorkersList) + 1) Should this message have the standard "[Apply BGW %u]" pattern? ~~~ 37. src/backend/replication/logical/worker.c - apply_bgworker_setup + if (launched) + { + /* Wait for worker to become ready. */ + apply_bgworker_wait_for(wstate, APPLY_BGWORKER_ATTACHED); + + ApplyWorkersList = lappend(ApplyWorkersList, wstate); + } Since there is a state APPLY_BGWORKER_READY I think either this comment is wrong or this passed parameter ATTACHED must be wrong. ~~~ 38. src/backend/replication/logical/worker.c - apply_bgworker_send_data + if (result != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send tuples to shared-memory queue"))); +} Wording: Is it right to ocall these "tuples" or better just say "data"? I am not sure. Already asked this in [PSv4] #68 ~~~ 39. src/backend/replication/logical/worker.c - apply_bgworker_wait_for +/* + * Wait until the state of apply background worker reach the 'wait_for_state' + */ +static void +apply_bgworker_wait_for(WorkerState *wstate, char wait_for_state) typo: "reach" -> "reaches" ~~~ 40. src/backend/replication/logical/worker.c - apply_bgworker_wait_for + /* If the worker is ready, we have succeeded. */ + SpinLockAcquire(&wstate->pstate->mutex); + status = wstate->pstate->state; + SpinLockRelease(&wstate->pstate->mutex); + + if (status == wait_for_state) + break; 40a. What does this mention "ready". This function might be waiting for a different state to that. 40b. Anyway, I think this comment should be a few lines lower, above the if (status == wait_for_state) ~~~ 41. src/backend/replication/logical/worker.c - apply_bgworker_wait_for + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Background worker %u failed to apply transaction %u", + wstate->pstate->n, wstate->pstate->stream_xid))); Should this message have the standard "[Apply BGW %u]" pattern? ~~~ 42. src/backend/replication/logical/worker.c - check_workers_status + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Background worker %u exited unexpectedly", + wstate->pstate->n))); Should this message have the standard "[Apply BGW %u]" pattern? Or if this is just from Apply worker maybe it should be clearer like "Apply worker detected apply bgworker %u exited unexpectedly". ~~~ 43. src/backend/replication/logical/worker.c - check_workers_status + ereport(LOG, + (errmsg("logical replication apply workers for subscription \"%s\" will restart", + MySubscription->name), + errdetail("Cannot start table synchronization while bgworkers are " + "handling streamed replication transaction"))); I am not sure, but isn't the message backwards? e.g. Should it say more like: "Cannot handle streamed transactions using bgworkers while table synchronization is still in progress". ~~~ 44. src/backend/replication/logical/worker.c - apply_bgworker_set_state + elog(LOG, "[Apply BGW #%u] set state to %c", + MyParallelState->n, state); The line wrapping seemed overkill here. ~~~ 45. src/backend/utils/activity/wait_event.c @@ -388,6 +388,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT: event_name = "HashGrowBucketsReinsert"; break; + case WAIT_EVENT_LOGICAL_APPLY_WORKER_READY: + event_name = "LogicalApplyWorkerReady"; + break; I am not sure this is the best name for this event since the only place it is used (in apply_bgworker_wait_for) is not only waiting for READY state. Maybe a name like WAIT_EVENT_LOGICAL_APPLY_BGWORKER or WAIT_EVENT_LOGICAL_APPLY_WORKER_SYNC would be more appropriate? Need to change the wait_event.h also. ~~~ 46. src/include/catalog/pg_subscription.h +/* Disallow streaming in-progress transactions */ +#define SUBSTREAM_OFF 'f' + +/* + * Streaming transactions are written to a temporary file and applied only + * after the transaction is committed on upstream. + */ +#define SUBSTREAM_ON 'o' + +/* Streaming transactions are appied immediately via a background worker */ +#define SUBSTREAM_APPLY 'a' 46a. There is not really any overarching comment that associates these #defines back to the new 'stream' field so you are just supposed to guess that's what they are for? 46b. I also feel that using 'o' for ON is not consistent with the 'f' of OFF. IMO better to use 't/f' for true/false instead of 'o/f'. Also don't forget update docs, pg_dump.c etc. 46c. Typo: "appied" -> "applied" ~~~~ 47. src/test/regress/expected/subscription.out - missting test Missing some test cases for all new option values? E.g. Where is the test using streaming value is set to 'apply'. Same comment as [PSv4] #81 ------ [1] https://www.postgresql.org/message-id/OS0PR01MB5716E8D536552467EFB512EF94FC9%40OS0PR01MB5716.jpnprd01.prod.outlook.com [PSv4] https://www.postgresql.org/message-id/CAHut%2BPuqYP5eD5wcSCtk%3Da6KuMjat2UCzqyGoE7sieCaBsVskQ%40mail.gmail.com Kind Regards, Peter Smith. Fujitsu Australia