Here are some review comments for v36-0001. ======
1. GENERAL Houzj wrote ([1] #3a): The word 'streaming' is the same as the actual option name, so personally I think it's fine. But if others also agreed that the name can be improved, I can change it. ~ Sure, I was not really complaining that the name is "wrong". Only I did not think it was a good idea to have multiple struct members called 'streaming' when they don't have the same meaning. e.g. one is the internal character mode equivalent of the parameter, and one is the parameter value as a string. That's why I thought they should be different names. e.g. Make the 2nd one 'streaming_valstr' or something. ====== 2. doc/src/sgml/config.sgml Previously I suggested there should be xrefsto the "Configuration Settings" page but Houzj wrote ([1] #4): Not sure about this as we don't have similar thing in the document of max_logical_replication_workers and max_sync_workers_per_subscription. ~ Fair enough, but IMO perhaps all those others should also xref to the "Configuration Settings" chapter. So if such a change does not belong in this patch, then how about if I make another independent thread to post this suggestion? ====== .../replication/logical/applyparallelworker.c 3. parallel_apply_find_worker +parallel_apply_find_worker(TransactionId xid) +{ + bool found; + ParallelApplyWorkerEntry *entry = NULL; + + if (!TransactionIdIsValid(xid)) + return NULL; + + if (ParallelApplyWorkersHash == NULL) + return NULL; + + /* Return the cached parallel apply worker if valid. */ + if (stream_apply_worker != NULL) + return stream_apply_worker; + + /* + * Find entry for requested transaction. + */ + entry = hash_search(ParallelApplyWorkersHash, &xid, HASH_FIND, &found); In function parallel_apply_start_worker() you removed the entry assignment to NULL because it is never needed. Can do the same here too. ~~~ 4. parallel_apply_free_worker +/* + * Remove the parallel apply worker entry from the hash table. And stop the + * worker if there are enough workers in the pool. For more information about + * the worker pool, see comments atop worker.c. + */ +void +parallel_apply_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid) "And stop" -> "Stop" ~~~ 5. parallel_apply_free_worker + * Although some error messages may be lost in rare scenarios, but + * since the parallel apply worker has finished processing the + * transaction, and error messages may be lost even if we detach the + * error queue after terminating the process. So it should be ok. + */ SUGGESTION (minor rewording) Some error messages may be lost in rare scenarios, but it should be OK because the parallel apply worker has finished processing the transaction, and error messages may be lost even if we detached the error queue after terminating the process. ~~~ 6. LogicalParallelApplyLoop + for (;;) + { + void *data; + Size len; + int c; + StringInfoData s; + MemoryContext oldctx; + + CHECK_FOR_INTERRUPTS(); + + /* Ensure we are reading the data into our memory context. */ + oldctx = MemoryContextSwitchTo(ApplyMessageContext); + ... + + MemoryContextSwitchTo(oldctx); + MemoryContextReset(ApplyMessageContext); + } Do those memory context switches need to happen inside the for(;;) loop like that? I thought perhaps those can be done *outside* of the loop instead of always switching and switching back on the next iteration. ~~~ 7. LogicalParallelApplyLoop Previous I suggested maybe the name (e.g. the 2nd param) should be changed to "ParallelApplyMessageContext"? Houzj wrote ([1] #13): Not sure about this, because ApplyMessageContext is used in both worker.c and applyparallelworker.c. ~ But I thought those are completely independent ApplyMessageContext's in different processes that happen to have the same name. Shouldn't they have a name appropriate to who owns them? ~~~ 8. ParallelApplyWorkerMain + /* + * Allocate the origin name in a long-lived context for error context + * message. + */ + snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid); Now that ReplicationOriginNameForLogicalRep patch is pushed [2] please make use of this common function. ~~~ 9. HandleParallelApplyMessage + case 'X': /* Terminate, indicating clean exit */ + { + shm_mq_detach(winfo->error_mq_handle); + winfo->error_mq_handle = NULL; + break; + } + + /* + * Don't need to do anything about NoticeResponse and + * NotifyResponse as the logical replication worker doesn't need + * to send messages to the client. + */ + case 'N': + case 'A': + break; + default: + { + elog(ERROR, "unrecognized message type received from parallel apply worker: %c (message length %d bytes)", + msgtype, msg->len); + } 9a. case 'X': There are no variable declarations here so the statement block {} is not needed ~ 9b. default: There are no variable declarations here so the statement block {} is not needed ~~~ 10. parallel_apply_stream_abort + int i; + bool found = false; + char spname[MAXPGPATH]; + + parallel_apply_savepoint_name(MySubscription->oid, subxid, spname, + sizeof(spname)); I posted about using NAMEDATALEN in a previous review ([3] #21) but I think only one place was fixed and this one was missed. ~~~ 11. parallel_apply_replorigin_setup + snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid); + originid = replorigin_by_name(originname, false); + replorigin_session_setup(originid); + replorigin_session_origin = originid; Same as #8. Please call the new ReplicationOriginNameForLogicalRep function. ====== src/backend/replication/logical/launcher.c 12. logicalrep_worker_launch Previously I suggested may the apply process name should change FROM "logical replication worker for subscription %u" TO "logical replication apply worker for subscription %u" and Houz wrote ([1] #13) I am not sure if it's a good idea to change existing process description. ~ But that seems inconsistent to me because elsewhere this patch is already exposing the name to the user (like when it says "logical replication apply worker for subscription \"%s\" has started". Shouldn’t the process name match these logs? ====== src/backend/replication/logical/worker.c 13. apply_handle_stream_start + * + * XXX We can avoid sending pairs of the START messages to the parallel worker + * because unlike apply worker it will process only one transaction-at-a-time. + * However, it is not clear whether that is worth the effort because it is sent + * after logical_decoding_work_mem changes. */ static void apply_handle_stream_start(StringInfo s) 13a. "transaction-at-a-time." -> "transaction at a time." ~ 13b. I was not sure what does that last sentence mean? Does it mean something like: "However, it is not clear whether doing this is worth the effort because pairs of START messages occur only after logical_decoding_work_mem changes." ~~~ 14. apply_handle_stream_start + ParallelApplyWorkerInfo *winfo = NULL; The declaration *winfo assignment to NULL is not needed because get_transaction_apply_action will always do this anyway. ~~~ 15. apply_handle_stream_start + + case TRANS_PARALLEL_APPLY: + break; I had previously suggested this include a comment explaining why there is nothing to do ([3] #44), but I think there was no reply. ~~~ 16. apply_handle_stream_stop apply_handle_stream_stop(StringInfo s) { + ParallelApplyWorkerInfo *winfo = NULL; + TransApplyAction apply_action The declaration *winfo assignment to NULL is not needed because get_transaction_apply_action will always do this anyway. ~~~ 17. serialize_stream_abort + ParallelApplyWorkerInfo *winfo = NULL; + TransApplyAction apply_action; The declaration *winfo assignment to NULL is not needed because get_transaction_apply_action will always do this anyway. ~~~ 18. apply_handle_stream_commit LogicalRepCommitData commit_data; + ParallelApplyWorkerInfo *winfo = NULL; + TransApplyAction apply_action; The declaration *winfo assignment to NULL is not needed because get_transaction_apply_action will always do this anyway. ~~~ 19. ApplyWorkerMain + +/* Logical Replication Apply worker entry point */ +void +ApplyWorkerMain(Datum main_arg) Previously I suugested changing "Apply worker" to "apply worker", and Houzj ([1] #48) replied: Since it's the existing comment, I feel we can leave this. ~ Normally I agree don't change the original code unrelated to the patch, but in practice, I think no patch would be accepted that just changes just "A" to "a", so if you don't change it here in this patch to be consistent then it will never happen. That's why I think should be part of this patch. ~~~ 20. ApplyWorkerMain + /* + * We don't currently need any ResourceOwner in a walreceiver process, but + * if we did, we could call CreateAuxProcessResourceOwner here. + */ Previously I suggested prefixing this as "XXX" and Houzj replied ([1] #48): I am not sure as this comment is just a reminder. ~ OK, then maybe since it is a reminder "Note" then it should be changed: "We don't currently..." -> "Note: We don't currently..." ~~~ 21. ApplyWorkerMain + if (server_version >= 160000 && + MySubscription->stream == SUBSTREAM_PARALLEL) + { + options.proto.logical.streaming = pstrdup("parallel"); + MyLogicalRepWorker->parallel_apply = true; + } + else if (server_version >= 140000 && + MySubscription->stream != SUBSTREAM_OFF) + { + options.proto.logical.streaming = pstrdup("on"); + MyLogicalRepWorker->parallel_apply = false; + } + else + { + options.proto.logical.streaming = NULL; + MyLogicalRepWorker->parallel_apply = false; + } I think the block of if/else is only for assigning the streaming/parallel members so should have some comment to say that: SUGGESTION Assign the appropriate streaming flag according to the 'streaming' mode and the publisher's ability to support that mode. ~~~ 22. get_transaction_apply_action +static TransApplyAction +get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) +{ + *winfo = NULL; + + if (am_parallel_apply_worker()) + { + return TRANS_PARALLEL_APPLY; + } + else if (in_remote_transaction) + { + return TRANS_LEADER_APPLY; + } + + /* + * Check if we are processing this transaction using a parallel apply + * worker and if so, send the changes to that worker. + */ + else if ((*winfo = parallel_apply_find_worker(xid))) + { + return TRANS_LEADER_SEND_TO_PARALLEL; + } + else + { + return TRANS_LEADER_SERIALIZE; + } +} 22a. Previously I suggested the statement blocks are overkill and all the {} should be removed, and Houzj ([1] #52a) wrote: I feel this style is fine. ~ Sure, it is fine, but FWIW I thought it is not the normal PG coding convention to use unnecessary {} unless it would seem strange to omit them. ~~ 22b. Also previously I had suggested > Can a tablesync worker ever get here? It might be better to > Assert(!am_tablesync_worker()); at top of this function? and Houzj ([1] #52b) replied: Not sure if it's necessary or not. ~ OTOH you could say no Assert is ever really necessary, but IMO adding one here would at least be a sanity check and help to document the function better. ====== 23. src/test/regress/sql/subscription.sql Previously I mentioned testing the 'streaming' option with no value. Houzj replied ([1] I didn't find similar tests for no value explicitly specified cases, so I didn't add this for now. But as I also responded ([4] #58) already to Amit: IMO this one is a bit different because it's not really a boolean option anymore - it's a kind of a hybrid boolean/enum. That's why I thought this ought to be tested regardless if there are existing tests for the (normal) boolean options. Anyway, you can decide what you want. ------ [1] Houzj replies to my v35 review https://www.postgresql.org/message-id/OS0PR01MB5716B400CD81565E868616DB945F9%40OS0PR01MB5716.jpnprd01.prod.outlook.com [2] ReplicationOriginNameForLogicalRep https://github.com/postgres/postgres/commit/776e1c8a5d1494e345e5e1b16a5eba5e98aaddca [3] My review v35 https://www.postgresql.org/message-id/CAHut%2BPvFENKb5fcMko5HHtNEAaZyNwGhu3PASrcBt%2BHFoFL%3DFw%40mail.gmail.com [4] Explaining some v35 review comments https://www.postgresql.org/message-id/CAHut%2BPscac%2BipFSFx89ACmacjPe4Dn%3DqVq8T0V%3DnQkv38QgnBw%40mail.gmail.com Kind Regards, Peter Smith. Fujitsu Australia