Hi, Below are my review comments for the PoC patch 0001.
In addition, the patch needed rebasing, and, after I rebased it locally in my private environment there were still test failures: a) The 'make check' tests fail but only in a minor way due to changes colname b) the subscription TAP test did not work at all for me -- many errors. ====== Commit message. 1. - Add oid column to the pg_subscription_rel. - use it as the primary key. - use it in the names of origin and slot the tablesync workers use. ~ IIUC, I think there were lots of variables called 'subrelid' referring to this new 'oid' field. But, somehow I found that very confusing with the other similarly named 'relid'. I wonder if all those can be named like 'sroid' or 'srid' to reduce the confusion of such similar names? ====== src/backend/catalog/pg_subscription.c 2. AddSubscriptionRelState I felt should be some sanity check Asserts for the args here. E.g. Cannot have valid relid when copy_schema == true, etc. ~~~ 3. + if (nspname) + values[Anum_pg_subscription_rel_srnspname - 1] = CStringGetDatum(nspname); + else + nulls[Anum_pg_subscription_rel_srnspname - 1] = true; + + if (relname) + values[Anum_pg_subscription_rel_srrelname - 1] = CStringGetDatum(relname); + else + nulls[Anum_pg_subscription_rel_srrelname - 1] = true; Here is where I was wondering why not pass the nspname and relname all the time, even for valid 'relid' (when copy_schema is false). It should simplify some code, as well as putting more useful/readable information into the catalog. ~~~ 4. UpdateSubscriptionRelRelid + /* XXX: need to distinguish from message in UpdateSubscriptionRelState() */ + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription table %u in subscription %u does not exist", + subrelid, subid); Is that ERROR msg correct? IIUC the 'subrelid' is the Oid of the row in the catalog -- it is not the "subscription table" Oid. ~~~ 5. UpdateSubscriptionRelState if (!HeapTupleIsValid(tup)) elog(ERROR, "subscription table %u in subscription %u does not exist", - relid, subid); + subrelid, subid); (ditto previous review comment) Is that ERROR msg correct? IIUC the subrelid is the Oid of the row in the catalog -- it is not the "subscription table" Oid. ~~~ 6. GetSubscriptoinRelStateByRelid There is a spelling mistake in this function name /Subscriptoin/Subscription/ ~~~ 7. + ScanKeyInit(&skey[0], + Anum_pg_subscription_rel_srrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(relid)); + ScanKeyInit(&skey[1], + Anum_pg_subscription_rel_srsubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(subid)); Won't it be better to swap the order of these so it matches the function comment "(srsubid, srrelid)". ~~~ 8. + tup = systable_getnext(scan); + + + if (!HeapTupleIsValid(tup)) Double blank lines ~~~ 9. /* Get palloc'ed SubscriptionRelState of the given subrelid */ SubscriptionRelState * GetSubscriptionRelByOid(Oid subrelid) ~ There seems some function name confusion because the struct is called SubscriptionRelState and it also has a 'state' field. e.g. The functions named GetSubscriptionRelStateXXX return only the state field of the struct. OTOH, this function returns the SubscriptionRelState* but it is NOT called GetSubscriptionRelStateByOid (??). ~~~ 10. deconstruct_subrelstate + /* syncflags */ + relstate->syncflags = + (((subrel_form->srsyncschema) ? SUBREL_SYNC_KIND_SCHEMA : 0) | + ((subrel_form->srsyncdata) ? SUBREL_SYNC_KIND_DATA : 0)); Seems excessive parens. ~~~ 11. + return relstate; +} /* * Drop subscription relation mapping. These can be for a particular * subscription, or for a particular relation, or both. */ void -RemoveSubscriptionRel(Oid subid, Oid relid) +RemoveSubscriptionRel(Oid subid, Oid relid, Oid subrelid) ~ There is no blank line before this function ~~~ 12. RemoveSubscriptionRel -RemoveSubscriptionRel(Oid subid, Oid relid) +RemoveSubscriptionRel(Oid subid, Oid relid, Oid subrelid) { ~ IIUC what you called 'subrelid' is the PK, so would it make more sense for that to be the 1st parameter for this function? ====== src/backend/commands/subscriptioncmds.c 13. struct SubOpts bool copy_data; + /* XXX: want to choose synchronizing only tables or all objects? */ + bool copy_schema; I wonder if it would be more natural to put the 'copy_schema' field before the 'copy_data' field? ~~~ 14. parse_subscription_options if (IsSet(supported_opts, SUBOPT_COPY_DATA)) opts->copy_data = true; + if (IsSet(supported_opts, SUBOPT_COPY_SCHEMA)) + opts->copy_data = true; 14a. I wonder if it would be more natural to put the COPY_SCHEMA logic before the COPY_DATA logic? ~ 14b. Is this a bug? Why is this assigning copy_data = true, instead of copy_schema = true? ~~~ 15. opts->specified_opts |= SUBOPT_COPY_DATA; opts->copy_data = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_COPY_SCHEMA) && + strcmp(defel->defname, "copy_schema") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_COPY_SCHEMA)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_COPY_SCHEMA; + opts->copy_schema = defGetBoolean(defel); + } I wonder if it would be more natural to put the COPY_SCHEMA logic before the COPY_DATA logic? ~~~ 16. + if (opts->copy_schema && + IsSet(opts->specified_opts, SUBOPT_COPY_SCHEMA)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s and %s are mutually exclusive options", + "connect = false", "copy_schema = true"))); + I wonder if it would be more natural to put the COPY_SCHEMA logic before the COPY_DATA logic? ~~~ 17. CreateSubscription * Set sync state based on if we were asked to do data copy or * not. */ - table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + if (opts.copy_data || opts.copy_schema) + table_state = SUBREL_STATE_INIT; + else + table_state = SUBREL_STATE_READY; The comment prior to this code needs updating, it still only mentions "data copy". ~~~ 18. AlterSubscription_refresh + sub_remove_rels[remove_rel_len].relid = subrelid; sub_remove_rels[remove_rel_len++].state = state; ~ Is that right? IIUC that 'subrelid' is the OID PK of the row in pg_subscription_rel, which is not the same as the 'relid'. Shouldn't this be sub_remove_rels[remove_rel_len].relid = relstate->relid; ~~~ 19. + if (OidIsValid(relstate->relid)) + ereport(DEBUG1, + (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"", + get_namespace_name(get_rel_namespace(relstate->relid)), + get_rel_name(relstate->relid), + sub->name))); + else + ereport(DEBUG1, + (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"", + relstate->nspname, relstate->relname, + sub->name))); I wondered why can't we just always store nspname and relname even for the valid 'relid' when there is no copy_schema? Won't that simplify code such as this? ====== src/backend/replication/logical/launcher.c 20. logicalrep_worker_find - if (w->in_use && w->subid == subid && w->relid == relid && + if (w->in_use && w->subid == subid && w->subrelid == subrelid && (!only_running || w->proc)) { ~ Maybe I misunderstand something, but somehow it seems strange to be checking both the 'subid' and the the Oid PK ('subrelid') here. Isn't it that when subrelid is valid you need to test only 'subrelid' (aka tablesync) for equality? But when subrelid is InvalidOid (aka not a tablesync worker) you only need to test subid for equality? ~~~ 21. logicalrep_worker_launch bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID); /* Sanity check - tablesync worker cannot be a subworker */ - Assert(!(is_parallel_apply_worker && OidIsValid(relid))); + Assert(!(is_parallel_apply_worker && OidIsValid(subrelid))); IIUC I thought this code might be easier to understand if you introduced another variable bool is_tabslync_worker = OidIsValid(subrelid); ~~~ 22. + if (OidIsValid(subrelid) && nsyncworkers >= max_sync_workers_per_subscription) (ditto previous comment) ~~~ 23. - if (OidIsValid(relid)) + if (OidIsValid(subrelid)) snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication worker for subscription %u sync %u", subid, relid); + "logical replication worker for subscription %u sync %u", subid, subrelid); This name seems somehow less useful to the user now. IIUC 'subrelid' is just the PK of the pg_subscription_rel_catalog instead of the relid. Does this require changes to the documentation that might have been saying this is the relid? ~~~ 24. logicalrep_worker_stop * Stop the logical replication worker for subid/relid, if any. */ void -logicalrep_worker_stop(Oid subid, Oid relid) +logicalrep_worker_stop(Oid subid, Oid subrelid) The function comment still is talking about relid. ====== src/backend/replication/logical/snapbuild.c 25. SnapBuildExportSnapshot -SnapBuildExportSnapshot(SnapBuild *builder) +SnapBuildExportSnapshot(SnapBuild *builder, bool use_it) 'use_it' does not see a good parameter name. At least, maybe the function comment can describe the meaning of use_it. ~~~ 26. - /* There doesn't seem to a nice API to set these */ - XactIsoLevel = XACT_REPEATABLE_READ; - XactReadOnly = true; + /* There doesn't seem to a nice API to set these */ + XactIsoLevel = XACT_REPEATABLE_READ; + XactReadOnly = true; + } + else + Assert(IsTransactionBlock()); Although it is not introduced by this patch, since you change the indent on this line you might as well at the same time fix the typo on this line. /seem to be nice/seem to be a nice/ ====== src/backend/replication/logical/tablesync.c 27. process_syncing_tables_for_sync UpdateSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, + MyLogicalRepWorker->subrelid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); IIUC the 'subrelid' is now the PK. Isn't it better for that to be the 1st param? ~~~ 28. + if ((syncflags & SUBREL_SYNC_KIND_SCHEMA) != 0) There are several checks like the code shown above. Would it be better to have some macro for that expression? Or maybe simply assign this result to a local variable instead of testing the same thing multiple times. ~~~ 29. synchronize_table_schema FILE *handle; Oid relid; Oid nspoid; StringInfoData command; StringInfoData querybuf; char full_path[MAXPGPATH]; char buf[1024]; int ret; if (find_my_exec("pg_dump", full_path) < 0) elog(ERROR, "\"%s\" was not found", "pg_dump") ~ Something is not quite right with the indentation in this new function. ~~~ 30. + * XXX what if the table already doesn't exist? I didn't understand the meaning of the comment. Is it supposed to say "What if the table already exists?" (??) ====== src/backend/replication/logical/worker.c 31. InitializeApplyWorker + { + if (OidIsValid(MyLogicalRepWorker->relid)) + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid)))); + else + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\", relid %u has started", + MySubscription->name, + MyLogicalRepWorker->subrelid))); + } ~ IIUC it doesn't seem right to say "relid %u has started". Because that's not really a relid is it? I thought it is just a PK Oid of the row in the catalog. ====== src/include/catalog/pg_subscription_rel.h 32. pg_subscription_rel + /* What part do we need to synchronize? */ + bool srsyncschema; + bool srsyncdata; These aren't really "parts". SUGGESTION /* What to synchronize? */ ~~~ 33. typedef struct SubscriptionRelState { + Oid oid; Is that the pg_subscription_rel's oid? Maybe it would be better to call this field 'sroid'? (see the general comment in the commit message) ====== src/include/replication/walsender.h 34. CRSSnapshotAction CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, - CRS_USE_SNAPSHOT + CRS_USE_SNAPSHOT, + CRS_EXPORT_USE_SNAPSHOT } CRSSnapshotAction; ~ Should the CRS_USE_SNAPSHOT be renamed to CRS_NOEXOPRT_USE_SNAPSHOT to have a more consistent naming pattern? ====== src/include/replication/worker_internal.h 35. - /* Used for initial table synchronization. */ + /* + * Used for initial table synchronization. + * + * relid is an invalid oid if the table is not created on the subscriber + * yet. + */ + Oid subrelid; Oid relid; It would be good to have more explanation what is the different meaning of 'subrelid' versus 'relid' (see also the general comment suggesting to rename this) ------ Kind Regards, Peter Smith. Fujitsu Australia