Hi all, During implementing logical replication protocol for pgjdbc https://github.com/pgjdbc/pgjdbc/pull/550 I faced with strange behavior of the *walcender.c*:
1. When WAL consumer catchup master and change his state to streaming, not available normally complete replication by send CopyDone message until will not generate/create new WAL record. It occurs because logical decoding located in *WalSndWaitForWal* until will not available next WAL record, and it method receive message from consumer even reply on CopyDone with CopyDone but ignore exit from loop and we can wait many times waiting CommandStatus & ReadyForQuery packages on consumer. 2. Logical decoding ignore message from consumer during decoding and writing transaction in socket(*WalSndWriteData*). It affect long transactions with many changes. Because for example if we start decoding transaction that insert 1 million records and after consume 1% of it date we decide stop replication, it will be not available until whole million record will not send to consumer. And I see the following problems because of this: - It will generate many not necessary network traffic. - Not available normally stop logical replication, because consumer will fail with timeout and it broke scenario when consumer put data to external system asynchronously and by success callback send feedback to master about flushed LSN, but by fail callback stop replication, and restart it from the last successfully sent LSN to external system, because slot will be busy and it will increase latency for streaming system. - Consumer can send keepalive message with required reply flag to master that right now decoding and sending huge transaction, and after some time disconnect master because because reply on keepalive will not get. I faced with it problem during research bottledwater-pg <https://github.com/confluentinc/bottledwater-pg> extension that fail each time during receive transaction in that was modify 1 million record(restart_lsn was before huge transaction so extension fail again and again) disconnect master because not keep keepalive package too long. I prepare small patch that fix problems describe below. Now *WalSndWriteData *first check message from consumer and during decode transaction check that replication still active. KeppAlive message now not send if was get CopyDone package(keep alive now not necessary we preparing to complete). *WalSndWaitForWal *after get CopyDone exit from loop. With apply this patch I get next measurements Before ----- logical start and stopping: *15446ms* logical stopping: *13820ms* physical start and stopping: 462ms physical stopping: 348 After ----- logical start and stopping: 2424ms logical stopping: *26ms* physical start and stopping: 458ms physical stopping: 329ms Where for measurements was use code from pgjdbc For physical replicaion: LogSequenceNumber startLSN = getCurrentLSN(); > > Statement st = sqlConnection.createStatement(); > st.execute("insert into test_logic_table\n" > + " select id, md5(random()::text) as name from > generate_series(1, 1000000) as id;"); > st.close(); > > long start = System.nanoTime(); > > PGReplicationStream stream = > pgConnection > .replicationStream() > .physical() > .withStartPosition(startLSN) > .start(); > > //read single message > stream.read(); > long startStopping = System.nanoTime(); > > stream.close(); > > long now = System.nanoTime(); > > long startAndStopTime = now - start; > long stopTime = now - startStopping; > > System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime)); > System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime)); > For logical replication: LogSequenceNumber startLSN = getCurrentLSN(); > > Statement st = sqlConnection.createStatement(); > st.execute("insert into test_logic_table\n" > + " select id, md5(random()::text) as name from > generate_series(1, 1000000) as id;"); > st.close(); > > long start = System.nanoTime(); > > PGReplicationStream stream = > pgConnection > .replicationStream() > .logical() > .withSlotName(SLOT_NAME) > .withStartPosition(startLSN) > .withSlotOption("include-xids", false) > .withSlotOption("skip-empty-xacts", true) > .start(); > > //read single message > stream.read(); > long startStopping = System.nanoTime(); > > stream.close(); > > long now = System.nanoTime(); > > long startAndStopTime = now - start; > long stopTime = now - startStopping; > > System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime)); > System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime)); > https://github.com/Gordiychuk/postgres/tree/stopping_logical_replication
From 5f6f3b5241c535b452c1866b7fbe174ac27ce130 Mon Sep 17 00:00:00 2001 From: Vladimir Gordiychuk <fol...@gmail.com> Date: Fri, 6 May 2016 17:06:33 +0300 Subject: [PATCH] Stop logical decoding by get CopyDone Logical decoding during decode WALs ignore message that can reponse receiver on XLogData. So during big transaction for example that change 1 million record it can lead to two problem: 1. Receiver can disconect server because it not responce on keepalive message with required respose marker. 2. Receiver can't stop replication, until whole transaction will not send to receiver. Not available stop replication it's main problem. Because receiver will fail during stop replication with timeout and also backend will generate many not network traffic. This problem was found during implement physical\logical replication protocol in pgjdbc driver https://github.com/pgjdbc/pgjdbc/pull/550 And it broke scenario when WALs consumer receive decoded WALs and put it to external system asynchroneze were if some problem occurs callback say which LSN was fail, so we can rollback to last success process LSN and start logical replication again from it place. I measure stopping replication with fix and without by this test: For physical replicaion: LogSequenceNumber startLSN = getCurrentLSN(); Statement st = sqlConnection.createStatement(); st.execute("insert into test_logic_table\n" + " select id, md5(random()::text) as name from generate_series(1, 1000000) as id;"); st.close(); long start = System.nanoTime(); PGReplicationStream stream = pgConnection .replicationStream() .physical() .withStartPosition(startLSN) .start(); //read single message stream.read(); long startStopping = System.nanoTime(); stream.close(); long now = System.nanoTime(); long startAndStopTime = now - start; long stopTime = now - startStopping; System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime)); System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime)); For logical replication: LogSequenceNumber startLSN = getCurrentLSN(); Statement st = sqlConnection.createStatement(); st.execute("insert into test_logic_table\n" + " select id, md5(random()::text) as name from generate_series(1, 1000000) as id;"); st.close(); long start = System.nanoTime(); PGReplicationStream stream = pgConnection .replicationStream() .logical() .withSlotName(SLOT_NAME) .withStartPosition(startLSN) .withSlotOption("include-xids", false) .withSlotOption("skip-empty-xacts", true) .start(); //read single message stream.read(); long startStopping = System.nanoTime(); stream.close(); long now = System.nanoTime(); long startAndStopTime = now - start; long stopTime = now - startStopping; System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime)); System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime)); And get next timing: Before ----- logical start and stopping: 15446ms logical stopping: 13820ms physical start and stopping: 462ms physical stopping: 348 After ----- logical start and stopping: 2424ms logical stopping: 26ms physical start and stopping: 458ms physical stopping: 329ms As you can see, not it allow stop logical replication very fast. For do it, not we check replies first and only after that send decoded data. After get CopyDone from frontend we stoping decoding as soon as possible. The second part of fix, it disable sending keep alive message to frontend if already got CopyDone. --- src/backend/replication/logical/logical.c | 15 ++++++---- src/backend/replication/logical/logicalfuncs.c | 15 +++++++++- src/backend/replication/logical/reorderbuffer.c | 9 ++++-- src/backend/replication/slotfuncs.c | 2 +- src/backend/replication/walsender.c | 38 ++++++++++++++++--------- src/include/replication/logical.h | 13 +++++++-- src/include/replication/reorderbuffer.h | 7 +++++ 7 files changed, 74 insertions(+), 25 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5ccfd31..cfb4ae4 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -116,7 +116,8 @@ StartupDecodingContext(List *output_plugin_options, TransactionId xmin_horizon, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write) + LogicalOutputPluginWriterWrite do_write, + ReorderBufferIsActive is_active) { ReplicationSlot *slot; MemoryContext context, @@ -182,6 +183,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->apply_change = change_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + ctx->reorder->is_active = is_active; ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; @@ -214,7 +216,8 @@ CreateInitDecodingContext(char *plugin, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write) + LogicalOutputPluginWriterWrite do_write, + LogicalDecondingContextIsActive is_active) { TransactionId xmin_horizon = InvalidTransactionId; ReplicationSlot *slot; @@ -290,7 +293,7 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotSave(); ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, - read_page, prepare_write, do_write); + read_page, prepare_write, do_write, is_active); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -330,7 +333,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write) + LogicalOutputPluginWriterWrite do_write, + LogicalDecondingContextIsActive is_active) { LogicalDecodingContext *ctx; ReplicationSlot *slot; @@ -380,7 +384,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, - read_page, prepare_write, do_write); + read_page, prepare_write, do_write, + is_active); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 99112ac..e90e107 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -104,6 +104,17 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi p->returned_rows++; } +/** + * Stab function that necessary for LogicalDecondign context. + * Function always return true and it means that decoding WALs + * can't be interrupt in contrast of logical replication. + */ +static bool +LogicalContextAlwaysActive(void) +{ + return true; +} + static void check_permissions(void) { @@ -243,7 +254,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin options, logical_read_local_xlog_page, LogicalOutputPrepareWrite, - LogicalOutputWrite); + LogicalOutputWrite, + LogicalContextAlwaysActive /* converting to Datum(sql api) can't be interrupted in contrast of replication*/ + ); MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 57821c3..deb09dc 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1420,7 +1420,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, rb->begin(rb, txn); iterstate = ReorderBufferIterTXNInit(rb, txn); - while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) + while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL && rb->is_active()) { Relation relation = NULL; Oid reloid; @@ -1646,8 +1646,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferIterTXNFinish(rb, iterstate); iterstate = NULL; - /* call commit callback */ - rb->commit(rb, txn, commit_lsn); + if(rb->is_active()) + { + /* call commit callback */ + rb->commit(rb, txn, commit_lsn); + } /* this is just a sanity check against bad output plugin behaviour */ if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 9cc24ea..8b30bd3 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -127,7 +127,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) */ ctx = CreateInitDecodingContext( NameStr(*plugin), NIL, - logical_read_local_xlog_page, NULL, NULL); + logical_read_local_xlog_page, NULL, NULL, NULL); /* build initial snapshot, might take a while */ DecodingContextFindStartpoint(ctx); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 926a247..ff6fc6d 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -218,6 +218,8 @@ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); static void XLogRead(char *buf, XLogRecPtr startptr, Size count); +static bool IsStreamingActive(void); + /* Initialize walsender process before entering the main command loop */ void @@ -816,7 +818,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ctx = CreateInitDecodingContext(cmd->plugin, NIL, logical_read_xlog_page, - WalSndPrepareWrite, WalSndWriteData); + WalSndPrepareWrite, WalSndWriteData, + IsStreamingActive); /* * Signal that we don't need the timeout mechanism. We're just @@ -995,7 +998,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) logical_decoding_ctx = CreateDecodingContext( cmd->startpoint, cmd->options, logical_read_xlog_page, - WalSndPrepareWrite, WalSndWriteData); + WalSndPrepareWrite, WalSndWriteData, IsStreamingActive); /* Start reading WAL from the oldest required WAL. */ logical_startptr = MyReplicationSlot->data.restart_lsn; @@ -1086,14 +1089,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)], tmpbuf.data, sizeof(int64)); - /* fast path */ - /* Try to flush pending output to the client */ - if (pq_flush_if_writable() != 0) - WalSndShutdown(); - - if (!pq_is_send_pending()) - return; - for (;;) { int wakeEvents; @@ -1225,7 +1220,14 @@ WalSndWaitForWal(XLogRecPtr loc) break; /* - * We only send regular messages to the client for full decoded + * If we have received CopyDone from the client, sent CopyDone + * ourselves, it's time to exit streaming. + */ + if (!IsStreamingActive()) { + break; + } + + /* We only send regular messages to the client for full decoded * transactions, but a synchronous replication and walsender shutdown * possibly are waiting for a later location. So we send pings * containing the flush location every now and then. @@ -1853,7 +1855,7 @@ WalSndLoop(WalSndSendDataCallback send_data) * again until we've flushed it ... but we'd better assume we are not * caught up. */ - if (!pq_is_send_pending()) + if (!pq_is_send_pending() && !streamingDoneReceiving) send_data(); else WalSndCaughtUp = false; @@ -2911,7 +2913,11 @@ WalSndKeepaliveIfNecessary(TimestampTz now) if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0) return; - if (waiting_for_ping_response) + /* + * Keep alive should be send only if protocol in active state. When get or send CopyDone + * it means that protocol preparing to complete. + */ + if (waiting_for_ping_response || !IsStreamingActive()) return; /* @@ -2931,3 +2937,9 @@ WalSndKeepaliveIfNecessary(TimestampTz now) WalSndShutdown(); } } + +static +bool IsStreamingActive(void) +{ + return !streamingDoneReceiving && !streamingDoneSending; +} diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 947000e..ba54ddc 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -26,6 +26,13 @@ typedef void (*LogicalOutputPluginWriterWrite) ( typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite; +/* + * Callback function that allow interrupt logical replication during decoding. + * Function return true if decoding can be continue decode, but if function return false + * logical decoding will stop as soon as possible. + */ +typedef ReorderBufferIsActive LogicalDecondingContextIsActive; + typedef struct LogicalDecodingContext { /* memory context this is all allocated in */ @@ -81,13 +88,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write); + LogicalOutputPluginWriterWrite do_write, + LogicalDecondingContextIsActive is_active); extern LogicalDecodingContext *CreateDecodingContext( XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write); + LogicalOutputPluginWriterWrite do_write, + LogicalDecondingContextIsActive is_active); extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx); extern bool DecodingContextReady(LogicalDecodingContext *ctx); extern void FreeDecodingContext(LogicalDecodingContext *ctx); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index e070894..28bd692 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -291,6 +291,8 @@ typedef void (*ReorderBufferMessageCB) ( bool transactional, const char *prefix, Size sz, const char *message); +/* callback signature for check decoding status */ +typedef bool (*ReorderBufferIsActive) (void); struct ReorderBuffer { @@ -321,6 +323,11 @@ struct ReorderBuffer ReorderBufferMessageCB message; /* + * Callback to define status of decoding. Return false if decoding not necessary continue + */ + ReorderBufferIsActive is_active; + + /* * Pointer that will be passed untouched to the callbacks. */ void *private_data; -- 2.5.0
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers