On Thu, Aug 18, 2022 at 11:59 AM Peter Smith <smithpb2...@gmail.com> wrote: > > Here are my review comments for patch v21-0001: > > 4. Commit message > > In addition, the patch extends the logical replication STREAM_ABORT message so > that abort_time and abort_lsn can also be sent which can be used to update the > replication origin in apply background worker when the streaming transaction > is > aborted. > > 4a. > Should this para also mention something about the introduction of > protocol version 4? > > 4b. > Should this para also mention that these extensions are not strictly > mandatory for the parallel streaming to still work? >
Without parallel streaming/apply, we don't need to send this extra message. So, I don't think it will be correct to say that. > > 46. src/backend/replication/logical/worker.c - apply_error_callback > > + if (errarg->remote_attnum < 0) > + { > + if (XLogRecPtrIsInvalid(errarg->finish_lsn)) > + errcontext("processing remote data for replication origin \"%s\" > during \"%s\" for replication target relation \"%s.%s\" in transaction > %u", > + errarg->origin_name, > + logicalrep_message_type(errarg->command), > + errarg->rel->remoterel.nspname, > + errarg->rel->remoterel.relname, > + errarg->remote_xid); > + else > + errcontext("processing remote data for replication origin \"%s\" > during \"%s\" for replication target relation \"%s.%s\" in transaction > %u finished at %X/%X", > + errarg->origin_name, > + logicalrep_message_type(errarg->command), > + errarg->rel->remoterel.nspname, > + errarg->rel->remoterel.relname, > + errarg->remote_xid, > + LSN_FORMAT_ARGS(errarg->finish_lsn)); > + } > + else > + { > + if (XLogRecPtrIsInvalid(errarg->finish_lsn)) > + errcontext("processing remote data for replication origin \"%s\" > during \"%s\" for replication target relation \"%s.%s\" column \"%s\" > in transaction %u", > + errarg->origin_name, > + logicalrep_message_type(errarg->command), > + errarg->rel->remoterel.nspname, > + errarg->rel->remoterel.relname, > + errarg->rel->remoterel.attnames[errarg->remote_attnum], > + errarg->remote_xid); > + else > + errcontext("processing remote data for replication origin \"%s\" > during \"%s\" for replication target relation \"%s.%s\" column \"%s\" > in transaction %u finished at %X/%X", > + errarg->origin_name, > + logicalrep_message_type(errarg->command), > + errarg->rel->remoterel.nspname, > + errarg->rel->remoterel.relname, > + errarg->rel->remoterel.attnames[errarg->remote_attnum], > + errarg->remote_xid, > + LSN_FORMAT_ARGS(errarg->finish_lsn)); > + } > + } > > Hou-san had asked [3](comment #14) me how the above code can be > shortened. Below is one idea, but maybe you won't like it ;-) > > #define MSG_O_T_S_R "processing remote data for replication origin > \"%s\" during \"%s\" for replication target relation \"%s.%s\" " > #define O_T_S_R\ > errarg->origin_name,\ > logicalrep_message_type(errarg->command),\ > errarg->rel->remoterel.nspname,\ > errarg->rel->remoterel.relname > > if (errarg->remote_attnum < 0) > { > if (XLogRecPtrIsInvalid(errarg->finish_lsn)) > errcontext(MSG_O_T_S_R "in transaction %u", > O_T_S_R, > errarg->remote_xid); > else > errcontext(MSG_O_T_S_R "in transaction %u finished at %X/%X", > O_T_S_R, > errarg->remote_xid, > LSN_FORMAT_ARGS(errarg->finish_lsn)); > } > else > { > if (XLogRecPtrIsInvalid(errarg->finish_lsn)) > errcontext(MSG_O_T_S_R "column \"%s\" in transaction %u", > O_T_S_R, > errarg->rel->remoterel.attnames[errarg->remote_attnum], > errarg->remote_xid); > else > errcontext(MSG_O_T_S_R "column \"%s\" in transaction %u finished at %X/%X", > O_T_S_R, > errarg->rel->remoterel.attnames[errarg->remote_attnum], > errarg->remote_xid, > LSN_FORMAT_ARGS(errarg->finish_lsn)); > } > #undef O_T_S_R > #undef MSG_O_T_S_R > > ====== > I don't like this much. I think this reduces readability. > 47. src/include/replication/logicalproto.h > > @@ -32,12 +32,17 @@ > * > * LOGICALREP_PROTO_TWOPHASE_VERSION_NUM is the minimum protocol version with > * support for two-phase commit decoding (at prepare time). Introduced in > PG15. > + * > + * LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum protocol > version > + * with support for streaming large transactions using apply background > + * workers. Introduced in PG16. > */ > #define LOGICALREP_PROTO_MIN_VERSION_NUM 1 > #define LOGICALREP_PROTO_VERSION_NUM 1 > #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 > #define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3 > -#define LOGICALREP_PROTO_MAX_VERSION_NUM > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM > +#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM 4 > +#define LOGICALREP_PROTO_MAX_VERSION_NUM > LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM > > 47a. > I don't think that comment is strictly true. IIUC the new protocol > version 4 is currently only affecting the *extra* STREAM_ABORT members > – but in fact streaming=parallel is still functional without using > those extra members, isn't it? So maybe this description needed to be > modified a bit to be more accurate? > The reason for sending this extra abort members is to ensure that after aborting the transaction, if the subscriber/apply worker restarts, it doesn't need to request the transaction again. Do you have suggestions for improving this comment? > > 52. > > +/* Apply background worker setup and interactions */ > +extern ApplyBgworkerInfo *apply_bgworker_start(TransactionId xid); > +extern ApplyBgworkerInfo *apply_bgworker_find(TransactionId xid); > +extern void apply_bgworker_wait_for(ApplyBgworkerInfo *wstate, > + ApplyBgworkerStatus wait_for_status); > +extern void apply_bgworker_send_data(ApplyBgworkerInfo *wstate, Size nbytes, > + const void *data); > +extern void apply_bgworker_free(ApplyBgworkerInfo *wstate); > +extern void apply_bgworker_check_status(void); > +extern void apply_bgworker_set_status(ApplyBgworkerStatus status); > +extern void apply_bgworker_subxact_info_add(TransactionId current_xid); > +extern void apply_bgworker_savepoint_name(Oid suboid, Oid relid, > + char *spname, int szsp); > > This big block of similarly named externs might as well be in > alphabetical order instead of apparently random. > I think it is better to order them based on related functionality if they are not already instead of using alphabetical order. -- With Regards, Amit Kapila.