On Thu, Aug 18, 2022 at 6:57 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Thu, Aug 18, 2022 at 11:59 AM Peter Smith <smithpb2...@gmail.com> wrote: > > > > Here are my review comments for patch v21-0001: > > > > 4. Commit message > > > > In addition, the patch extends the logical replication STREAM_ABORT message > > so > > that abort_time and abort_lsn can also be sent which can be used to update > > the > > replication origin in apply background worker when the streaming > > transaction is > > aborted. > > > > 4a. > > Should this para also mention something about the introduction of > > protocol version 4? > > > > 4b. > > Should this para also mention that these extensions are not strictly > > mandatory for the parallel streaming to still work? > > > > Without parallel streaming/apply, we don't need to send this extra > message. So, I don't think it will be correct to say that.
See my reply to 47a below. > > > > > 46. src/backend/replication/logical/worker.c - apply_error_callback > > > > + if (errarg->remote_attnum < 0) > > + { > > + if (XLogRecPtrIsInvalid(errarg->finish_lsn)) > > + errcontext("processing remote data for replication origin \"%s\" > > during \"%s\" for replication target relation \"%s.%s\" in transaction > > %u", > > + errarg->origin_name, > > + logicalrep_message_type(errarg->command), > > + errarg->rel->remoterel.nspname, > > + errarg->rel->remoterel.relname, > > + errarg->remote_xid); > > + else > > + errcontext("processing remote data for replication origin \"%s\" > > during \"%s\" for replication target relation \"%s.%s\" in transaction > > %u finished at %X/%X", > > + errarg->origin_name, > > + logicalrep_message_type(errarg->command), > > + errarg->rel->remoterel.nspname, > > + errarg->rel->remoterel.relname, > > + errarg->remote_xid, > > + LSN_FORMAT_ARGS(errarg->finish_lsn)); > > + } > > + else > > + { > > + if (XLogRecPtrIsInvalid(errarg->finish_lsn)) > > + errcontext("processing remote data for replication origin \"%s\" > > during \"%s\" for replication target relation \"%s.%s\" column \"%s\" > > in transaction %u", > > + errarg->origin_name, > > + logicalrep_message_type(errarg->command), > > + errarg->rel->remoterel.nspname, > > + errarg->rel->remoterel.relname, > > + errarg->rel->remoterel.attnames[errarg->remote_attnum], > > + errarg->remote_xid); > > + else > > + errcontext("processing remote data for replication origin \"%s\" > > during \"%s\" for replication target relation \"%s.%s\" column \"%s\" > > in transaction %u finished at %X/%X", > > + errarg->origin_name, > > + logicalrep_message_type(errarg->command), > > + errarg->rel->remoterel.nspname, > > + errarg->rel->remoterel.relname, > > + errarg->rel->remoterel.attnames[errarg->remote_attnum], > > + errarg->remote_xid, > > + LSN_FORMAT_ARGS(errarg->finish_lsn)); > > + } > > + } > > > > Hou-san had asked [3](comment #14) me how the above code can be > > shortened. Below is one idea, but maybe you won't like it ;-) > > > > #define MSG_O_T_S_R "processing remote data for replication origin > > \"%s\" during \"%s\" for replication target relation \"%s.%s\" " > > #define O_T_S_R\ > > errarg->origin_name,\ > > logicalrep_message_type(errarg->command),\ > > errarg->rel->remoterel.nspname,\ > > errarg->rel->remoterel.relname > > > > if (errarg->remote_attnum < 0) > > { > > if (XLogRecPtrIsInvalid(errarg->finish_lsn)) > > errcontext(MSG_O_T_S_R "in transaction %u", > > O_T_S_R, > > errarg->remote_xid); > > else > > errcontext(MSG_O_T_S_R "in transaction %u finished at %X/%X", > > O_T_S_R, > > errarg->remote_xid, > > LSN_FORMAT_ARGS(errarg->finish_lsn)); > > } > > else > > { > > if (XLogRecPtrIsInvalid(errarg->finish_lsn)) > > errcontext(MSG_O_T_S_R "column \"%s\" in transaction %u", > > O_T_S_R, > > errarg->rel->remoterel.attnames[errarg->remote_attnum], > > errarg->remote_xid); > > else > > errcontext(MSG_O_T_S_R "column \"%s\" in transaction %u finished at %X/%X", > > O_T_S_R, > > errarg->rel->remoterel.attnames[errarg->remote_attnum], > > errarg->remote_xid, > > LSN_FORMAT_ARGS(errarg->finish_lsn)); > > } > > #undef O_T_S_R > > #undef MSG_O_T_S_R > > > > ====== > > > > I don't like this much. I think this reduces readability. I agree. That wasn't a very serious suggestion :-) > > > 47. src/include/replication/logicalproto.h > > > > @@ -32,12 +32,17 @@ > > * > > * LOGICALREP_PROTO_TWOPHASE_VERSION_NUM is the minimum protocol version > > with > > * support for two-phase commit decoding (at prepare time). Introduced in > > PG15. > > + * > > + * LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum protocol > > version > > + * with support for streaming large transactions using apply background > > + * workers. Introduced in PG16. > > */ > > #define LOGICALREP_PROTO_MIN_VERSION_NUM 1 > > #define LOGICALREP_PROTO_VERSION_NUM 1 > > #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 > > #define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3 > > -#define LOGICALREP_PROTO_MAX_VERSION_NUM > > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM > > +#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM 4 > > +#define LOGICALREP_PROTO_MAX_VERSION_NUM > > LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM > > > > 47a. > > I don't think that comment is strictly true. IIUC the new protocol > > version 4 is currently only affecting the *extra* STREAM_ABORT members > > – but in fact streaming=parallel is still functional without using > > those extra members, isn't it? So maybe this description needed to be > > modified a bit to be more accurate? > > > > The reason for sending this extra abort members is to ensure that > after aborting the transaction, if the subscriber/apply worker > restarts, it doesn't need to request the transaction again. Do you > have suggestions for improving this comment? > I gave three review comments for v21-0001 that were all related to this same point: i- #4b (commit message) ii- #7 (protocol pgdocs) iii- #47a (code comment) The point was: AFAIK protocol 4 is only to let the parallel streaming logic behave *better* in how it can handle restarts after aborts. But that does not mean that protocol 4 is a *pre-requisite* for "allowing" streaming=parallel to work in the first place. I thought that a PG15 publisher and PG16 subscriber can still work using streaming=parallel even with protocol 3, but it just won't be quite as clever for handling restarts after abort as protocol 4 (PG16 -> PG16) would be. If the above is correct, then the code comment can be changed to something like this: BEFORE LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum protocol version with support for streaming large transactions using apply background workers. Introduced in PG16. AFTER LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM improves how subscription parameter streaming=parallel (introduced in PG16) will handle restarts after aborts. Introduced in PG16. ~ The protocol pgdocs might be changed similarly... BEFORE Version <literal>4</literal> is supported only for server version 16 and above, and it allows applying stream of large in-progress transactions in parallel. AFTER Version <literal>4</literal> is supported only for server version 16 and above, and it improves how subscription parameter streaming=parallel (introduced in PG16) will handle restarts after aborts. ~~ And similar text again for the commit message... ------ Kind Regards, Peter Smith. Fujitsu Australia.