Hi,
here is another tidbit from our experience with using logical decoding.
The attached patch adds a callback to notify the output plugin of a
concurrent abort. I'll continue to describe the problem in more detail
and how this additional callback solves it.
Streamed transactions as well as two-phase commit transactions may get
decoded before they finish. At the point the begin_cb is invoked and
first changes are delivered to the output plugin, it is not necessarily
known whether the transaction will commit or abort.
This leads to the possibility of the transaction getting aborted
concurrent to logical decoding. In that case, it is likely for the
decoder to error on a catalog scan that conflicts with the abort of the
transaction. The reorderbuffer sports a PG_CATCH block to cleanup.
However, it does not currently inform the output plugin. From its point
of view, the transaction is left dangling until another one comes along
or until the final ROLLBACK or ROLLBACK PREPARED record from WAL gets
decoded. Therefore, what the output plugin might see in this case is:
* filter_prepare_cb (txn A)
* begin_prepare_cb (txn A)
* apply_change (txn A)
* apply_change (txn A)
* apply_change (txn A)
* begin_cb (txn B)
In other words, in this example, only the begin_cb of the following
transaction implicitly tells the output plugin that txn A could not be
fully decoded. And there's no upper time boundary on when that may
happen. (It could also be another filter_prepare_cb, if the subsequent
transaction happens to be a two-phase transaction as well. Or an
explicit rollback_prepared_cb or stream_abort if there's no other
transaction in between.)
An alternative and arguably cleaner approach for streamed transactions
may be to directly invoke stream_abort. However, the lsn argument
passed could not be that of the abort record, as that's not known at the
point in time of the concurrent abort. Plus, this seems like a bad fit
for two-phase commit transactions.
Again, this callback is especially important for output plugins that
invoke further actions on downstream nodes that delay the COMMIT
PREPARED of a transaction upstream, e.g. until prepared on other nodes.
Up until now, the output plugin has no way to learn about a concurrent
abort of the currently decoded (2PC or streamed) transaction (perhaps
short of continued polling on the transaction status).
I also think it generally improves the API by allowing the output plugin
to rely on such a callback, rather than having to implicitly deduce this
from other callbacks.
Thoughts or comments? If this is agreed on, I can look into adding
tests (concurrent aborts are not currently covered, it seems).
Regards
Markus
From: Markus Wanner <markus.wan...@enterprisedb.com>
Date: Thu, 11 Feb 2021 13:49:55 +0100
Subject: [PATCH] Add a concurrent_abort callback for the output plugin.
Logical decoding of a prepared or streamed transaction may fail if the
transaction got aborted after invoking the begin_cb (and likely having
sent some changes via change_cb), but before the necessary catalog scans
could be performed. In this case, decoding the transaction is neither
possible nor necessary (given it got rolled back).
To give the output plugin a chance to cleanup the aborted transaction as
well, introduce a concurrent_abort callback. It is only ever invoked to
terminate unfinished transactions, not for normal aborts.
Adjust contrib/test_decoding to define a concurrent_abort callback.
---
contrib/test_decoding/test_decoding.c | 29 ++++++++++++
doc/src/sgml/logicaldecoding.sgml | 43 +++++++++++++++++-
src/backend/replication/logical/logical.c | 45 +++++++++++++++++++
.../replication/logical/reorderbuffer.c | 6 +++
src/include/replication/output_plugin.h | 9 ++++
src/include/replication/reorderbuffer.h | 7 +++
6 files changed, 138 insertions(+), 1 deletion(-)
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..a5dd80e0957 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -83,6 +83,9 @@ static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
+static void pg_decode_concurrent_abort_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, bool streaming,
+ XLogRecPtr lsn);
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
@@ -137,6 +140,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->change_cb = pg_decode_change;
cb->truncate_cb = pg_decode_truncate;
cb->commit_cb = pg_decode_commit_txn;
+ cb->concurrent_abort_cb = pg_decode_concurrent_abort_txn;
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
cb->message_cb = pg_decode_message;
@@ -386,6 +390,31 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
+static void
+pg_decode_concurrent_abort_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, bool streaming,
+ XLogRecPtr lsn)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
+ bool xact_wrote_changes = txndata->xact_wrote_changes;
+
+ if (data->skip_empty_xacts && !xact_wrote_changes)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+ if (data->include_xids)
+ appendStringInfo(ctx->out, "<concurrent abort> %u", txn->xid);
+ else
+ appendStringInfoString(ctx->out, "<concurrent abort>");
+
+ if (data->include_timestamp)
+ appendStringInfo(ctx->out, " (at %s)",
+ timestamptz_to_str(txn->commit_time));
+
+ OutputPluginWrite(ctx, true);
+}
+
/* COMMIT PREPARED callback */
static void
pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d609a..c44dee55a02 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -441,6 +441,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodeFilterPrepareCB filter_prepare_cb;
LogicalDecodeBeginPrepareCB begin_prepare_cb;
LogicalDecodePrepareCB prepare_cb;
+ LogicalDecodeConcurrentAbortCB concurrent_abort_cb;
LogicalDecodeCommitPreparedCB commit_prepared_cb;
LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
LogicalDecodeStreamStartCB stream_start_cb;
@@ -459,7 +460,8 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
and <function>commit_cb</function> callbacks are required,
while <function>startup_cb</function>,
<function>filter_by_origin_cb</function>, <function>truncate_cb</function>,
- and <function>shutdown_cb</function> are optional.
+ <function>shutdown_cb</function>, and
+ <function>concurrent_abort_cb</function> are optional.
If <function>truncate_cb</function> is not set but a
<command>TRUNCATE</command> is to be decoded, the action will be ignored.
</para>
@@ -847,6 +849,45 @@ typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx
</para>
</sect3>
+ <sect3 id="logicaldecoding-output-plugin-concurrent-abort">
+ <title>Concurrent Abort Callback</title>
+
+ <para>
+ The optional <function>concurrent_abort_cb</function> callback is called
+ whenever a transaction got aborted in the middle of sending its changes
+ to the output plugin. This can happen in the case of decoding a
+ two-phase commit transaction or with streaming enabled, and only if the
+ transaction got aborted after its <function>begin_cb</function> has been
+ invoked. This is not ever the case for normal single-phase transactions
+ that are not streamed.
+<programlisting>
+typedef bool (*LogicalDecodeConcurrentAbortCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ bool streaming,
+ XLogRecPtr abort_lsn);
+</programlisting>
+ The <parameter>ctx</parameter> and <parameter>txn</parameter> parameters
+ have the same contents as for the <function>begin_cb</function>
+ and <function>commit_cb</function> callbacks.
+ The <parameter>streaming</parameter> parameter indicates whether this
+ happened during streaming or at the <command>PREPARE</command> of a
+ two-phase transaction. In case of a two-phase transaction,
+ the <parameter>abort_lsn</parameter> parameter additionally points to
+ the end of the prepare record in WAL, it is invalid for streaming
+ transactions.
+ </para>
+
+ <note>
+ <para>
+ This is just an early indication for the output plugin. For
+ concurrently aborted transactions, either
+ the <function>rollback_prepared_cb</function> or
+ the <function>stream_abort_cb</function> callback will be invoked once
+ the decoder reaches the corresponding WAL records.
+ </para>
+ </note>
+ </sect3>
+
<sect3 id="logicaldecoding-output-plugin-commit-prepared">
<title>Transaction Commit Prepared Callback</title>
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 37b75deb728..4b99fe5cc4e 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -66,6 +66,8 @@ static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *t
XLogRecPtr commit_lsn);
static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
+static void concurrent_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ bool streaming, XLogRecPtr end_lsn);
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -276,6 +278,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->prepare = prepare_cb_wrapper;
ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
+ ctx->reorder->concurrent_abort = concurrent_abort_cb_wrapper;
ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
@@ -1005,6 +1008,48 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+static void
+concurrent_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ bool streaming, XLogRecPtr end_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /*
+ * We're only supposed to call this when either two-phase commits are
+ * supported or if we're streaming.
+ */
+ Assert(ctx->twophase || streaming);
+
+ /* The callback is optional, invoke only if provided. */
+ if (ctx->callbacks.concurrent_abort_cb == NULL)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "concurrent_abort";
+ state.report_location = txn->final_lsn; /* beginning of record attempted to
+ decode */
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+ ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+ /* do the actual work: call callback */
+ ctx->callbacks.concurrent_abort_cb(ctx, txn, streaming, end_lsn);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
static void
change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c291b05a423..a6d044b870b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2488,6 +2488,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
errdata = NULL;
curtxn->concurrent_abort = true;
+ /*
+ * Call the cleanup hook to inform the output plugin that the
+ * transaction just started had to be aborted.
+ */
+ rb->concurrent_abort(rb, txn, streaming, commit_lsn);
+
/* Reset the TXN so that it is allowed to stream remaining data. */
ReorderBufferResetTXN(rb, txn, snapshot_now,
command_id, prev_lsn,
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2c2c964c55f..e122dc48139 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -77,6 +77,14 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
+/*
+ * Called for decoding errors due to concurent aborts.
+ */
+typedef void (*LogicalDecodeConcurrentAbortCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ bool streaming,
+ XLogRecPtr end_lsn);
+
/*
* Called for the generic logical decoding messages.
*/
@@ -227,6 +235,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodePrepareCB prepare_cb;
LogicalDecodeCommitPreparedCB commit_prepared_cb;
LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
+ LogicalDecodeConcurrentAbortCB concurrent_abort_cb;
/* streaming of changes */
LogicalDecodeStreamStartCB stream_start_cb;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 565a961d6ab..153474baa84 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -460,6 +460,12 @@ typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time);
+/* concurrent abort callback signature */
+typedef void (*ReorderBufferConcurrentAbortCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ bool streaming,
+ XLogRecPtr end_lsn);
+
/* start streaming transaction callback signature */
typedef void (*ReorderBufferStreamStartCB) (
ReorderBuffer *rb,
@@ -559,6 +565,7 @@ struct ReorderBuffer
ReorderBufferPrepareCB prepare;
ReorderBufferCommitPreparedCB commit_prepared;
ReorderBufferRollbackPreparedCB rollback_prepared;
+ ReorderBufferConcurrentAbortCB concurrent_abort;
/*
* Callbacks to be called when streaming a transaction.
--
2.30.2