On Thu, Apr 1, 2021, at 7:19 AM, Amit Kapila wrote: > Your ideas/suggestions look good to me. Don't we need to provide a > read function corresponding to logicalrep_write_message? We have it > for other write functions. Can you please combine all of your changes > into one patch? Thanks for taking a look at this patch. I didn't consider a logicalrep_read_message function because the protocol doesn't support it yet.
/* * Logical replication does not use generic logical messages yet. * Although, it could be used by other applications that use this * output plugin. */ Someone that is inspecting the code in the future could possibly check this discussion to understand why this function isn't available. This new patch set version has 2 patches that is because there are 2 separate changes: parse_output_parameters() refactor and logical decoding message support. -- Euler Taveira EDB https://www.enterprisedb.com/
From d17cda24259bbaa019bedd1175960375c2905112 Mon Sep 17 00:00:00 2001 From: Euler Taveira <euler.tave...@enterprisedb.com> Date: Mon, 16 Nov 2020 18:49:57 -0300 Subject: [PATCH v3 1/2] Refactor function parse_output_parameters Instead of using multiple parameters in parse_ouput_parameters function signature, use the struct PGOutputData that encapsulates all pgoutput options. It is the right approach to take in terms of maintainability. --- src/backend/replication/pgoutput/pgoutput.c | 24 ++++++++------------- src/include/replication/pgoutput.h | 1 + 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 1b993fb032..6146c5acdb 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -156,9 +156,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) } static void -parse_output_parameters(List *options, uint32 *protocol_version, - List **publication_names, bool *binary, - bool *enable_streaming) +parse_output_parameters(List *options, PGOutputData *data) { ListCell *lc; bool protocol_version_given = false; @@ -166,7 +164,8 @@ parse_output_parameters(List *options, uint32 *protocol_version, bool binary_option_given = false; bool streaming_given = false; - *binary = false; + data->binary = false; + data->streaming = false; foreach(lc, options) { @@ -196,7 +195,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, errmsg("proto_version \"%s\" out of range", strVal(defel->arg)))); - *protocol_version = (uint32) parsed; + data->protocol_version = (uint32) parsed; } else if (strcmp(defel->defname, "publication_names") == 0) { @@ -207,7 +206,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, publication_names_given = true; if (!SplitIdentifierString(strVal(defel->arg), ',', - publication_names)) + &data->publication_names)) ereport(ERROR, (errcode(ERRCODE_INVALID_NAME), errmsg("invalid publication_names syntax"))); @@ -220,7 +219,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, errmsg("conflicting or redundant options"))); binary_option_given = true; - *binary = defGetBoolean(defel); + data->binary = defGetBoolean(defel); } else if (strcmp(defel->defname, "streaming") == 0) { @@ -230,7 +229,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, errmsg("conflicting or redundant options"))); streaming_given = true; - *enable_streaming = defGetBoolean(defel); + data->streaming = defGetBoolean(defel); } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); @@ -244,7 +243,6 @@ static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) { - bool enable_streaming = false; PGOutputData *data = palloc0(sizeof(PGOutputData)); /* Create our memory context for private allocations. */ @@ -265,11 +263,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, if (!is_init) { /* Parse the params and ERROR if we see any we don't recognize */ - parse_output_parameters(ctx->output_plugin_options, - &data->protocol_version, - &data->publication_names, - &data->binary, - &enable_streaming); + parse_output_parameters(ctx->output_plugin_options, data); /* Check if we support requested protocol */ if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM) @@ -295,7 +289,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, * we only allow it with sufficient version of the protocol, and when * the output plugin supports it. */ - if (!enable_streaming) + if (!data->streaming) ctx->streaming = false; else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM) ereport(ERROR, diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 4ba052fe38..bb383d523e 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -25,6 +25,7 @@ typedef struct PGOutputData List *publication_names; List *publications; bool binary; + bool streaming; } PGOutputData; #endif /* PGOUTPUT_H */ -- 2.20.1
From e80c65d04e6b68fed34e44cb32962d078db6c28a Mon Sep 17 00:00:00 2001 From: Dave Pirotte <dpiro...@gmail.com> Date: Thu, 5 Nov 2020 03:14:54 +0000 Subject: [PATCH v3 2/2] Logical decoding message support to pgoutput This feature allows pgoutput to send logical decoding messages. The output plugin accepts a new parameter (messages) that controls if logical decoding messages are written into the replication stream. It is useful for those clients that uses pgoutput as output plugin and needs to process messages that was written by pg_logical_emit_message(). Although logical streaming replication protocol supports logical decoding messages now, logical replication does not use this feature yet. --- doc/src/sgml/protocol.sgml | 65 +++++++++ src/backend/replication/logical/proto.c | 28 ++++ src/backend/replication/logical/worker.c | 9 ++ src/backend/replication/pgoutput/pgoutput.c | 46 ++++++ src/include/replication/logicalproto.h | 3 + src/include/replication/pgoutput.h | 1 + src/test/subscription/t/020_messages.pl | 148 ++++++++++++++++++++ 7 files changed, 300 insertions(+) create mode 100644 src/test/subscription/t/020_messages.pl diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 43092fe62a..7c52d5ab70 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -6433,6 +6433,71 @@ Begin </listitem> </varlistentry> +<varlistentry> +<term> +Message +</term> +<listitem> +<para> + +<variablelist> +<varlistentry> +<term> + Byte1('M') +</term> +<listitem> +<para> + Identifies the message as a logical decoding message. +</para> +</listitem> +</varlistentry> +<varlistentry> +<term> + Int8 +</term> +<listitem> +<para> + Flags; Either 0 for no flags or 1 if the logical decoding + message is transactional. +</para> +</listitem> +</varlistentry> +<varlistentry> +<term> + Int64 +</term> +<listitem> +<para> + The LSN of the logical decoding message. +</para> +</listitem> +</varlistentry> +<varlistentry> +<term> + String +</term> +<listitem> +<para> + The prefix of the logical decoding message. +</para> +</listitem> +</varlistentry> +<varlistentry> +<term> + Byte<replaceable>n</replaceable> +</term> +<listitem> +<para> + The content of the logical decoding message. +</para> +</listitem> +</varlistentry> + +</variablelist> +</para> +</listitem> +</varlistentry> + <varlistentry> <term> Commit diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index f2c85cabb5..2a1f9830e0 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -25,6 +25,7 @@ */ #define LOGICALREP_IS_REPLICA_IDENTITY 1 +#define MESSAGE_TRANSACTIONAL (1<<0) #define TRUNCATE_CASCADE (1<<0) #define TRUNCATE_RESTART_SEQS (1<<1) @@ -361,6 +362,33 @@ logicalrep_read_truncate(StringInfo in, return relids; } +/* + * Write MESSAGE to stream + */ +void +logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, + bool transactional, const char *prefix, Size sz, + const char *message) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE); + + /* encode and send message flags */ + if (transactional) + flags |= MESSAGE_TRANSACTIONAL; + + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + + pq_sendint8(out, flags); + pq_sendint64(out, lsn); + pq_sendstring(out, prefix); + pq_sendint32(out, sz); + pq_sendbytes(out, message, sz); +} + /* * Write relation description to the output stream. */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 354fbe4b4b..74d538b5e3 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1939,6 +1939,15 @@ apply_dispatch(StringInfo s) apply_handle_origin(s); return; + case LOGICAL_REP_MSG_MESSAGE: + + /* + * Logical replication does not use generic logical messages yet. + * Although, it could be used by other applications that use this + * output plugin. + */ + return; + case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); return; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 6146c5acdb..2981b9c430 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -45,6 +45,10 @@ static void pgoutput_change(LogicalDecodingContext *ctx, static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); +static void pgoutput_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, + Size sz, const char *message); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, @@ -142,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->begin_cb = pgoutput_begin_txn; cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; + cb->message_cb = pgoutput_message; cb->commit_cb = pgoutput_commit_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; @@ -162,10 +167,12 @@ parse_output_parameters(List *options, PGOutputData *data) bool protocol_version_given = false; bool publication_names_given = false; bool binary_option_given = false; + bool messages_option_given = false; bool streaming_given = false; data->binary = false; data->streaming = false; + data->messages = false; foreach(lc, options) { @@ -221,6 +228,16 @@ parse_output_parameters(List *options, PGOutputData *data) data->binary = defGetBoolean(defel); } + else if (strcmp(defel->defname, "messages") == 0) + { + if (messages_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + messages_option_given = true; + + data->messages = defGetBoolean(defel); + } else if (strcmp(defel->defname, "streaming") == 0) { if (streaming_given) @@ -689,6 +706,35 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, MemoryContextReset(data->context); } +static void +pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, + const char *message) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + TransactionId xid = InvalidTransactionId; + + if (!data->messages) + return; + + /* + * Remember the xid for the message in streaming mode. See + * pgoutput_change. + */ + if (in_streaming) + xid = txn->xid; + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_message(ctx->out, + xid, + message_lsn, + transactional, + prefix, + sz, + message); + OutputPluginWrite(ctx, true); +} + /* * Currently we always forward. */ diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index fa4c37277b..55b90c03ea 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -54,6 +54,7 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_TRUNCATE = 'T', LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_TYPE = 'Y', + LOGICAL_REP_MSG_MESSAGE = 'M', LOGICAL_REP_MSG_STREAM_START = 'S', LOGICAL_REP_MSG_STREAM_END = 'E', LOGICAL_REP_MSG_STREAM_COMMIT = 'c', @@ -151,6 +152,8 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, bool cascade, bool restart_seqs); extern List *logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs); +extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, + bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index bb383d523e..51e7c0348d 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -26,6 +26,7 @@ typedef struct PGOutputData List *publications; bool binary; bool streaming; + bool messages; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl new file mode 100644 index 0000000000..d9123ed3ef --- /dev/null +++ b/src/test/subscription/t/020_messages.pl @@ -0,0 +1,148 @@ +# Tests that logical decoding messages +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 5; + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_test (a int primary key)"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_test (a int primary key)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_test"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" +); + +# Ensure a transactional logical decoding message shows up on the slot +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); + +$node_publisher->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')" +); + +my $result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 0) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true') +)); + +# 66 77 67 == B M C == BEGIN MESSAGE COMMIT +is($result, qq(66 +77 +67), + 'messages on slot are B M C with message option'); + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 1), encode(substr(data, 11, 8), 'escape') + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true') + OFFSET 1 LIMIT 1 +)); + +is($result, qq(1|pgoutput), + "flag transactional is set to 1 and prefix is pgoutput"); + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 0) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub') +)); + +# 66 67 == B C == BEGIN COMMIT +is($result, qq(66 +67), + 'option messages defaults to false so message (M) is not available on slot'); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE"); +$node_publisher->wait_for_catchup('tap_sub'); + +# ensure a non-transactional logical decoding message shows up on the slot +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)"); + +my $message_lsn = $node_publisher->safe_psql('postgres', + "SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message')"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (2)"); + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 0), get_byte(data, 1) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true') + WHERE lsn = '$message_lsn' AND xid = 0 +)); + +is($result, qq(77|0), 'non-transactional message on slot is M'); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE"); +$node_publisher->wait_for_catchup('tap_sub'); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); + +# wait for the replication connection to drop from the publisher +$node_publisher->poll_query_until('postgres', + 'SELECT COUNT(*) FROM pg_catalog.pg_stat_replication', 0); + +# Ensure a non-transactional logical decoding message shows up on the slot when +# it is emitted within an aborted transaction. The message won't emit until +# something advances the LSN, hence, we intentionally forces the server to +# switch to a new WAL file. +$node_publisher->safe_psql( + 'postgres', qq( + BEGIN; + SELECT pg_logical_emit_message(false, 'pgoutput', + 'a non-transactional message is available even if the transaction is aborted 1'); + INSERT INTO tab_test VALUES (3); + SELECT pg_logical_emit_message(true, 'pgoutput', + 'a transactional message is not available if the transaction is aborted'); + SELECT pg_logical_emit_message(false, 'pgoutput', + 'a non-transactional message is available even if the transaction is aborted 2'); + ROLLBACK; + SELECT pg_switch_wal(); +)); + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 0), get_byte(data, 1) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true') +)); + +is($result, qq(77|0 +77|0), + 'non-transactional message on slot from aborted transaction is M'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); -- 2.20.1