At Thu, 17 May 2018 13:54:07 +0300, Arseny Sher <a.s...@postgrespro.ru> wrote in <87in7md034.fsf@ars-thinkpad> > > Konstantin Knizhnik <k.knizh...@postgrespro.ru> writes: > > > I think that using restart_lsn instead of confirmed_flush is not right > > approach. > > If restart_lsn is not available and confirmed_flush is pointing to page > > boundary, then in any case we should somehow handle this case and adjust > > startlsn to point on the valid record position (by jjust adding page header > > size?). > > Well, restart_lsn is always available on live slot: it is initially set > in ReplicationSlotReserveWal during slot creation.
restart_lsn stays at the beginning of a transaction until the transaction ends so just using restart_lsn allows repeated decoding of a transaction, in short, rewinding occurs. The function works only for inactive slot so the current code works fine on this point. Addition to that restart_lsn also can be on a page bounary. We can see the problem easily. 1. Just create a logical replication slot with setting current LSN. select pg_create_logical_replication_slot('s1', 'pgoutput'); 2. Advance LSN by two or three pages by doing anyting. 3. Advance the slot to a page bounadry. e.g. select pg_replication_slot_advance('s1', '0/9624000'); 4. advance the slot further, then crash. So directly set ctx->reader->EndRecPtr by startlsn fixes the problem, but I found another problem here. The function accepts any LSN even if it is not at the begiining of a record. We will see errors or crashs or infinite waiting or maybe any kind of trouble by such values. The moved LSN must always be at the "end of a record" (that is, at the start of the next recored). The attached patch also fixes this. The documentation doesn't look requiring a fix. regards. -- Kyotaro Horiguchi NTT Open Source Software Center
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d9e10263bb..d3cb777f9f 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -318,6 +318,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) /* * Helper function for advancing physical replication slot forward. + * + * This function accepts arbitrary LSN even if the LSN is not at the beginning + * of a record. This can lead to any kind of misbehavior but currently the + * value is used only to determine up to what wal segment to keep and + * successive implicit advancing fixes the state. */ static XLogRecPtr pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) @@ -344,6 +349,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; XLogRecPtr retlsn = InvalidXLogRecPtr; + XLogRecPtr upto; PG_TRY(); { @@ -354,6 +360,13 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) logical_read_local_xlog_page, NULL, NULL, NULL); + /* + * startlsn can be on page boundary but it is not accepted as explicit + * parameter to XLogReadRecord. Set it in reader context. + */ + Assert(startlsn != InvalidXLogRecPtr); + upto = ctx->reader->EndRecPtr = startlsn; + CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner, "logical decoding"); @@ -361,22 +374,18 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) InvalidateSystemCaches(); /* Decode until we run out of records */ - while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) || - (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto)) + while (ctx->reader->EndRecPtr <= moveto) { XLogRecord *record; char *errm = NULL; + + /* ctx->reader->EndRecPtr cannot be go backward here */ + upto = ctx->reader->EndRecPtr; - record = XLogReadRecord(ctx->reader, startlsn, &errm); + record = XLogReadRecord(ctx->reader, InvalidXLogRecPtr, &errm); if (errm) elog(ERROR, "%s", errm); - /* - * Now that we've set up the xlog reader state, subsequent calls - * pass InvalidXLogRecPtr to say "continue from last record" - */ - startlsn = InvalidXLogRecPtr; - /* * The {begin_txn,change,commit_txn}_wrapper callbacks above will * store the description into our tuplestore. @@ -384,18 +393,14 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) if (record != NULL) LogicalDecodingProcessRecord(ctx, ctx->reader); - /* check limits */ - if (moveto <= ctx->reader->EndRecPtr) - break; - CHECK_FOR_INTERRUPTS(); } CurrentResourceOwner = old_resowner; - if (ctx->reader->EndRecPtr != InvalidXLogRecPtr) + if (startlsn != upto) { - LogicalConfirmReceivedLocation(moveto); + LogicalConfirmReceivedLocation(upto); /* * If only the confirmed_flush_lsn has changed the slot won't get