On Mon, Jan 4, 2021 at 2:38 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > Few other comments: > ================= >
Few more comments on v9: ====================== 1. + /* Drop the tablesync slot. */ + { + char *syncslotname = ReplicationSlotNameForTablesync(subid, relid); + + /* + * If the subscription slotname is NONE/NULL and the connection to publisher is + * broken, but the DropSubscription should still be allowed to complete. + * But without a connection it is not possible to drop any tablesync slots. + */ + if (!wrconn) + { + /* FIXME - OK to just log a warning? */ + elog(WARNING, "!!>> DropSubscription: no connection. Cannot drop tablesync slot \"%s\".", + syncslotname); + } Why is this not an ERROR? We don't want to keep the table slots lingering after DropSubscription. If there is any tablesync slot that needs to be dropped and the publisher is not available then we should raise an error. 2. + /* + * Tablesync resource cleanup (slots and origins). + * + * Any READY-state relations would already have dealt with clean-ups. + */ + { There is no need to start a separate block '{' here. 3. +#define SUBREL_STATE_COPYDONE 'C' /* tablesync copy phase is completed */ You can mention in the comments that sublsn will be NULL for this state as it is mentioned for other similar states. Can we think of using any letter in lower case for this as all other states are in lower-case except for this which makes it a look bit odd? We can use 'f' or 'e' and describe it as 'copy finished' or 'copy end'. I am fine if you have any better ideas. 4. LogicalRepSyncTableStart() { .. .. +copy_table_done: + + /* Setup replication origin tracking. */ + { + char originname[NAMEDATALEN]; + RepOriginId originid; + + snprintf(originname, sizeof(originname), "pg_%u_%u", MySubscription->oid, MyLogicalRepWorker->relid); + originid = replorigin_by_name(originname, true); + if (!OidIsValid(originid)) + { + /* + * Origin tracking does not exist. Create it now, and advance to LSN got from walrcv_create_slot. + */ + elog(LOG, "!!>> LogicalRepSyncTableStart: 1 replorigin_create \"%s\".", originname); + originid = replorigin_create(originname); + elog(LOG, "!!>> LogicalRepSyncTableStart: 1 replorigin_session_setup \"%s\".", originname); + replorigin_session_setup(originid); + replorigin_session_origin = originid; + elog(LOG, "!!>> LogicalRepSyncTableStart: 1 replorigin_advance \"%s\".", originname); + replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr, + true /* go backward */ , true /* WAL log */ ); + } + else + { + /* + * Origin tracking already exists. + */ + elog(LOG, "!!>> LogicalRepSyncTableStart: 2 replorigin_session_setup \"%s\".", originname); + replorigin_session_setup(originid); + replorigin_session_origin = originid; + elog(LOG, "!!>> LogicalRepSyncTableStart: 2 replorigin_session_get_progress \"%s\".", originname); + *origin_startpos = replorigin_session_get_progress(false); + } .. .. } I am not sure if this code is correct because, for the very first time when the copy is done, we don't expect replication origin to exist whereas this code will silently use already existing replication origin which can lead to a wrong start position for the slot. In such a case it should error out. I guess we should create the replication origin before making the state as copydone. I feel we should even have a test case for this as it is not difficult to have a pre-existing replication origin. 5. Is it possible to write a testcase where we fail (say due to pk violation or some other error) after the initial copy is done, then remove the conflicting row and allow a copy to be completed? If we already have any such test then it is fine. 6. +/* + * Drop the replication slot at the publisher node + * using the replication connection. + */ This comment looks a bit odd. The first line appears to be too short. We have limit of 80 chars but this is much lesser than that. 7. @@ -905,7 +905,7 @@ replorigin_advance(RepOriginId node, LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE); /* Make sure it's not used by somebody else */ - if (replication_state->acquired_by != 0) + if (replication_state->acquired_by != 0 && replication_state->acquired_by != MyProcPid) { I think you won't need this change if you do replorigin_advance before replorigin_session_setup in your patch. 8. - * that ensures we won't loose knowledge about that after a crash if the + * that ensures we won't lose knowledge about that after a crash if the It is better to submit this as a separate patch. -- With Regards, Amit Kapila.