On Wed, 18 Nov 2020 at 03:04, David Pirotte <dpiro...@gmail.com> wrote:
> On Fri, Nov 6, 2020 at 7:05 AM Ashutosh Bapat < > ashutosh.bapat....@gmail.com> wrote: > >> +/* >> + * Write MESSAGE to stream >> + */ >> +void >> +logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, >> XLogRecPtr lsn, >> + bool transactional, const char *prefix, Size sz, >> + const char *message) >> +{ >> + uint8 flags = 0; >> + >> + pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE); >> + >> >> Similar to the UPDATE/DELETE/INSERT records decoded when streaming is >> being >> used, we need to add transaction id for transactional messages. May be we >> add >> that even in case of non-streaming case and use it to decide whether it's >> a >> transactional message or not. That might save us a byte when we are >> adding a >> transaction id. >> > > I also reviewed your patch. This feature would be really useful for replication scenarios. Supporting this feature means that you don't need to use a table to pass messages from one node to another one. Here are a few comments/ideas. @@ -1936,6 +1936,9 @@ apply_dispatch(StringInfo s) apply_handle_origin(s); return; + case LOGICAL_REP_MSG_MESSAGE: + return; + I added a comment explaining that this message is not used by logical replication but it could possibly be useful for other applications using pgoutput. See patch 0003. Andres mentioned in this thread [1] that we could simplify the parse_output_parameters. I refactored this function to pass only PGOutputData to it and also move enable_streaming to this struct. I use a similar approach in wal2json; it is easier to get the options since it is available in the logical decoding context. See patch 0004. > My preference is to add in the xid when streaming is enabled. (1) It is a > more consistent implementation with the other message types, and (2) it > saves 3 bytes when streaming is disabled. I've attached an updated patch. > It is not a strong preference, though, if you suggest a different approach. > > I agree with this approach. xid is available in the BEGIN message if the MESSAGE is transactional. For non-transactional messages, xid is not available. Your implementation is not consistent with the other pgoutput_XXX functions that check in_streaming in the pgoutput_XXX and pass parameters to other functions that require it. See patch 005. The last patch 0006 overhauls your tests. I added/changed some comments, replaced identifiers with uppercase letters, used 'pgoutput' as prefix, checked the prefix, and avoided a checkpoint during the test. There are possibly other improvements that I didn't mention here. Maybe you can use encode(substr(data, 1, 1), 'escape') instead of comparing the ASCII code (77). > Should we add the logical message to the WAL downstream so that it flows >> > further down to a cascaded logical replica. Should that be controlled >> by an option? >> > > Hmm, I can't think of a use case for this, but perhaps someone could. Do > you, or does anyone, have something in mind? I think we provide a lot of > value with logical messages in pgoutput without supporting consumption from > a downstream replica, so perhaps this is better considered separately. > > If we want this, I think we would add a "messages" option on the > subscription. If present, the subscriber will receive messages and pass > them to any downstream subscribers. I started working on this and it does > expand the change's footprint. As is, a developer would consume messages by > connecting to a pgoutput slot on the message's origin. (e.g. via Debezium > or a custom client) The subscription and logical worker infrastructure > don't know about messages, but they would need to in order to support > consuming an origin's messages on a downstream logical replica. In > any case, I'll keep working on it so we can see what it looks like. > > The decision to send received messages to downstream nodes should be made by the subscriber. If the subscriber wants to replicate messages to downstream nodes, the worker should call LogLogicalMessage. This does not belong to this patch but when/if this patch is committed, I will submit a patch to filter messages by prefix. wal2json has a similar (filter-msg-prefixes / add-msg-prefixes) feature and it is useful for cases where you are handling multiple output plugins like wal2json and pgoutput. The idea is to avoid sending useless messages to some node that (i) don't know how to process it and (ii) has no interest in it. PS> I'm attaching David's patches (0001 and 0002) again to keep cfbot happy. [1] https://www.postgresql.org/message-id/20200908191823.pmsoobzearkrmtg4%40alap3.anarazel.de -- Euler Taveira http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
From 18a619617c2cc19d0e304e33fb11e72d23731061 Mon Sep 17 00:00:00 2001 From: Dave Pirotte <dpiro...@gmail.com> Date: Thu, 5 Nov 2020 03:14:54 +0000 Subject: [PATCH 1/6] Add logical decoding messages to pgoutput This patch adds a "messages" option to the pgoutput output plugin. When "messages" is true, logical decoding messages (i.e. generated via pg_logical_emit_message) are sent to the slot consumer. --- doc/src/sgml/protocol.sgml | 65 ++++++++ src/backend/replication/logical/proto.c | 24 +++ src/backend/replication/logical/worker.c | 3 + src/backend/replication/pgoutput/pgoutput.c | 40 ++++- src/include/replication/logicalproto.h | 3 + src/include/replication/pgoutput.h | 1 + src/test/subscription/t/020_messages.pl | 158 ++++++++++++++++++++ 7 files changed, 293 insertions(+), 1 deletion(-) create mode 100644 src/test/subscription/t/020_messages.pl diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index cee28889e1..02449bf792 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -6413,6 +6413,71 @@ Begin </listitem> </varlistentry> +<varlistentry> +<term> +Message +</term> +<listitem> +<para> + +<variablelist> +<varlistentry> +<term> + Byte1('M') +</term> +<listitem> +<para> + Identifies the message as a logical decoding message. +</para> +</listitem> +</varlistentry> +<varlistentry> +<term> + Int8 +</term> +<listitem> +<para> + Flags; Either 0 for no flags or 1 if the logical decoding + message is transactional. +</para> +</listitem> +</varlistentry> +<varlistentry> +<term> + Int64 +</term> +<listitem> +<para> + The LSN of the logical decoding message. +</para> +</listitem> +</varlistentry> +<varlistentry> +<term> + String +</term> +<listitem> +<para> + The prefix of the logical decoding message. +</para> +</listitem> +</varlistentry> +<varlistentry> +<term> + Byte<replaceable>n</replaceable> +</term> +<listitem> +<para> + The content of the logical decoding message. +</para> +</listitem> +</varlistentry> + +</variablelist> +</para> +</listitem> +</varlistentry> + <varlistentry> <term> Commit diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index fdb31182d7..deba2a321c 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -25,6 +25,7 @@ */ #define LOGICALREP_IS_REPLICA_IDENTITY 1 +#define MESSAGE_TRANSACTIONAL (1<<0) #define TRUNCATE_CASCADE (1<<0) #define TRUNCATE_RESTART_SEQS (1<<1) @@ -361,6 +362,29 @@ logicalrep_read_truncate(StringInfo in, return relids; } +/* + * Write MESSAGE to stream + */ +void +logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn, + bool transactional, const char *prefix, Size sz, + const char *message) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE); + + /* encode and send message flags */ + if (transactional) + flags |= MESSAGE_TRANSACTIONAL; + + pq_sendint8(out, flags); + pq_sendint64(out, lsn); + pq_sendstring(out, prefix); + pq_sendint32(out, sz); + pq_sendbytes(out, message, sz); +} + /* * Write relation description to the output stream. */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 04684912de..6890603622 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1936,6 +1936,9 @@ apply_dispatch(StringInfo s) apply_handle_origin(s); return; + case LOGICAL_REP_MSG_MESSAGE: + return; + case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); return; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 9c997aed83..cd849c10a4 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -45,6 +45,10 @@ static void pgoutput_change(LogicalDecodingContext *ctx, static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); +static void pgoutput_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, + Size sz, const char *message); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, @@ -142,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->begin_cb = pgoutput_begin_txn; cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; + cb->message_cb = pgoutput_message; cb->commit_cb = pgoutput_commit_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; @@ -158,15 +163,17 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) static void parse_output_parameters(List *options, uint32 *protocol_version, List **publication_names, bool *binary, - bool *enable_streaming) + bool *messages, bool *enable_streaming) { ListCell *lc; bool protocol_version_given = false; bool publication_names_given = false; bool binary_option_given = false; + bool messages_option_given = false; bool streaming_given = false; *binary = false; + *messages = false; foreach(lc, options) { @@ -222,6 +229,16 @@ parse_output_parameters(List *options, uint32 *protocol_version, *binary = defGetBoolean(defel); } + else if (strcmp(defel->defname, "messages") == 0) + { + if (messages_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + messages_option_given = true; + + *messages = defGetBoolean(defel); + } else if (strcmp(defel->defname, "streaming") == 0) { if (streaming_given) @@ -269,6 +286,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, &data->protocol_version, &data->publication_names, &data->binary, + &data->messages, &enable_streaming); /* Check if we support requested protocol */ @@ -683,6 +701,26 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, MemoryContextReset(data->context); } +static void +pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, + const char *message) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + if (!data->messages) + return; + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_message(ctx->out, + txn, + message_lsn, + transactional, + prefix, + sz, + message); + OutputPluginWrite(ctx, true); +} + /* * Currently we always forward. */ diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 1f2535df80..f3c8f95e2c 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -54,6 +54,7 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_TRUNCATE = 'T', LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_TYPE = 'Y', + LOGICAL_REP_MSG_MESSAGE = 'M', LOGICAL_REP_MSG_STREAM_START = 'S', LOGICAL_REP_MSG_STREAM_END = 'E', LOGICAL_REP_MSG_STREAM_COMMIT = 'c', @@ -151,6 +152,8 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, bool cascade, bool restart_seqs); extern List *logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs); +extern void logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn, + bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index a8c676ed23..3b7273bd89 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -25,6 +25,7 @@ typedef struct PGOutputData List *publication_names; List *publications; bool binary; + bool messages; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl new file mode 100644 index 0000000000..8e59e324e3 --- /dev/null +++ b/src/test/subscription/t/020_messages.pl @@ -0,0 +1,158 @@ +# Tests that logical decoding messages are emitted and that +# they do not break subscribers +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 7; + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +# +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab (a int PRIMARY KEY)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab (a int PRIMARY KEY)"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub FOR TABLE tab"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub" +); + +# ensure a transactional logical decoding message shows up on the slot +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE"); + +$node_publisher->safe_psql('postgres', + "select pg_logical_emit_message(true, 'a prefix', 'a transactional message')" +); + +my $slot_codes_with_message = $node_publisher->safe_psql( + 'postgres', qq( + select get_byte(data, 0) + from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'pub', + 'messages', 'true') +)); + +# 66 77 67 == B M C == BEGIN MESSAGE COMMIT +is($slot_codes_with_message, "66\n77\n67", + 'messages on slot are B M C with message option'); + +my $transactional_message_flags = $node_publisher->safe_psql( + 'postgres', qq( + select get_byte(data, 1) + from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'pub', + 'messages', 'true') + offset 1 limit 1 +)); + +is($transactional_message_flags, "1", + "transactional message flags are set to 1"); + +my $slot_codes_without_message = $node_publisher->safe_psql( + 'postgres', qq( + select get_byte(data, 0) + from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'pub') +)); + +# 66 67 == B C == BEGIN COMMIT +is($slot_codes_without_message, "66\n67", + 'messages on slot are B C without message option'); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); +$node_publisher->wait_for_catchup('sub'); + +# ensure a non-transactional logical decoding message shows up on the slot +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab VALUES (3)"); + +my $message_lsn = $node_publisher->safe_psql('postgres', + "select pg_logical_emit_message(false, 'prefix', 'nontransactional')"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab VALUES (4)"); + +my $slot_message_code = $node_publisher->safe_psql( + 'postgres', qq( + select get_byte(data, 0) + from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'pub', + 'messages', 'true') + where lsn = '$message_lsn' and xid = 0 +)); + +is($slot_message_code, "77", "non-transactional message on slot is M"); + +my $nontransactional_message_flags = $node_publisher->safe_psql( + 'postgres', qq( + select get_byte(data, 1) + from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'pub', + 'messages', 'true') + offset 1 limit 1 +)); + +is($nontransactional_message_flags, + "0", "non-transactional message flags are set to 0"); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); +$node_publisher->wait_for_catchup('sub'); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE"); +$node_subscriber->safe_psql('postgres', "checkpoint;"); + +# wait for the replication connection to drop from the publisher +$node_publisher->poll_query_until('postgres', + 'SELECT count(*) from pg_catalog.pg_stat_replication', 0); + +# ensure a non-transactional logical decoding message shows up on the slot +# when it is emitted within an aborted transaction. the message won't emit +# until something advances the LSN, which we intentionally do here with a +# checkpoint. +$node_publisher->safe_psql( + 'postgres', qq( + BEGIN; + SELECT pg_logical_emit_message(false, 'prefix', + 'nontransactional aborted 1'); + INSERT INTO tab VALUES (5); + SELECT pg_logical_emit_message(false, 'prefix', + 'nontransactional aborted 2'); + ROLLBACK; + CHECKPOINT; +)); + +my $aborted_txn_message_codes = $node_publisher->safe_psql( + 'postgres', qq( + select get_byte(data, 0) + from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'pub', + 'messages', 'true') +)); + +is($aborted_txn_message_codes, "77\n77", + "non-transactional message on slot from aborted transaction is M"); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); +$node_publisher->wait_for_catchup('sub'); + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab"); +is($result, qq(2), 'rows move'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); -- 2.20.1
From 520eeba6d022cbd0b5fa1a7f54971ecba57fff2d Mon Sep 17 00:00:00 2001 From: Euler Taveira <euler.tave...@enterprisedb.com> Date: Mon, 16 Nov 2020 16:19:44 -0300 Subject: [PATCH 3/6] Explain why this message is ignored This message is ignored in the logical replication worker. However, it could be used by applications that use pgoutput as output plugin. --- src/backend/replication/logical/worker.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 6890603622..452eb02600 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1937,6 +1937,12 @@ apply_dispatch(StringInfo s) return; case LOGICAL_REP_MSG_MESSAGE: + + /* + * Logical replication does not use generic logical messages yet. + * Although, it could be used by other applications that use this + * output plugin. + */ return; case LOGICAL_REP_MSG_STREAM_START: -- 2.20.1
From 5a39e28d9508d5ad7c3759100da5d64d5ff9c08e Mon Sep 17 00:00:00 2001 From: Euler Taveira <euler.tave...@enterprisedb.com> Date: Thu, 19 Nov 2020 20:14:08 -0300 Subject: [PATCH 5/6] Adjust in_streaming for messages Use the same pattern as pgoutput_XXX functions, e.g., check in_streaming into pgoutput_message and then pass xid as argument to logicalrep_write_message. Since ReorderBufferTXN is not used in logicalrep_write_message, remove it. --- src/backend/replication/logical/proto.c | 16 ++++------------ src/backend/replication/pgoutput/pgoutput.c | 11 +++++++++-- src/include/replication/logicalproto.h | 3 +-- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index d5eeee4784..a598658a8d 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -366,28 +366,20 @@ logicalrep_read_truncate(StringInfo in, * Write MESSAGE to stream */ void -logicalrep_write_message(StringInfo out, - bool in_streaming, - ReorderBufferTXN *txn, - XLogRecPtr lsn, - bool transactional, - const char *prefix, - Size sz, +logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, + bool transactional, const char *prefix, Size sz, const char *message) { uint8 flags = 0; - TransactionId xid = InvalidTransactionId; pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE); /* encode and send message flags */ if (transactional) - { flags |= MESSAGE_TRANSACTIONAL; - xid = txn->xid; - } - if (in_streaming) + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) pq_sendint32(out, xid); pq_sendint8(out, flags); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index b25e67edcb..f9bc15da40 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -700,14 +700,21 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, const char *message) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + TransactionId xid = InvalidTransactionId; if (!data->messages) return; + /* + * Remember the xid for the message in streaming mode. See + * pgoutput_change. + */ + if (in_streaming) + xid = txn->xid; + OutputPluginPrepareWrite(ctx, true); logicalrep_write_message(ctx->out, - in_streaming, - txn, + xid, message_lsn, transactional, prefix, diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index f22c9436e0..d3d656dfb2 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -152,8 +152,7 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, bool cascade, bool restart_seqs); extern List *logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs); -extern void logicalrep_write_message(StringInfo out, bool in_streaming, - ReorderBufferTXN *txn, XLogRecPtr lsn, +extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel); -- 2.20.1
From bf991e458df1f03708450ddd76a75a9476b5809a Mon Sep 17 00:00:00 2001 From: Dave Pirotte <dpiro...@gmail.com> Date: Tue, 17 Nov 2020 04:01:34 +0000 Subject: [PATCH 2/6] Add xid to messages when streaming --- src/backend/replication/logical/proto.c | 16 ++++++++++++++-- src/backend/replication/pgoutput/pgoutput.c | 2 ++ src/include/replication/logicalproto.h | 3 ++- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index deba2a321c..d5eeee4784 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -366,17 +366,29 @@ logicalrep_read_truncate(StringInfo in, * Write MESSAGE to stream */ void -logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn, - bool transactional, const char *prefix, Size sz, +logicalrep_write_message(StringInfo out, + bool in_streaming, + ReorderBufferTXN *txn, + XLogRecPtr lsn, + bool transactional, + const char *prefix, + Size sz, const char *message) { uint8 flags = 0; + TransactionId xid = InvalidTransactionId; pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE); /* encode and send message flags */ if (transactional) + { flags |= MESSAGE_TRANSACTIONAL; + xid = txn->xid; + } + + if (in_streaming) + pq_sendint32(out, xid); pq_sendint8(out, flags); pq_sendint64(out, lsn); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index cd849c10a4..bd3c2a3b99 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -707,11 +707,13 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, const char *message) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + if (!data->messages) return; OutputPluginPrepareWrite(ctx, true); logicalrep_write_message(ctx->out, + in_streaming, txn, message_lsn, transactional, diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index f3c8f95e2c..f22c9436e0 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -152,7 +152,8 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, bool cascade, bool restart_seqs); extern List *logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs); -extern void logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn, +extern void logicalrep_write_message(StringInfo out, bool in_streaming, + ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel); -- 2.20.1
From 2cc05a67f69cab1e98c295ce23ae95592f167422 Mon Sep 17 00:00:00 2001 From: Euler Taveira <euler.tave...@enterprisedb.com> Date: Mon, 16 Nov 2020 18:49:57 -0300 Subject: [PATCH 4/6] Simplify parse_output_parameters function Instead of individual variables, pass PGOutputData to parse_output_parameters. It is easier to read and maintain while adding new options to pgoutput. --- src/backend/replication/pgoutput/pgoutput.c | 29 ++++++++------------- src/include/replication/pgoutput.h | 1 + 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index bd3c2a3b99..b25e67edcb 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -161,9 +161,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) } static void -parse_output_parameters(List *options, uint32 *protocol_version, - List **publication_names, bool *binary, - bool *messages, bool *enable_streaming) +parse_output_parameters(List *options, PGOutputData *data) { ListCell *lc; bool protocol_version_given = false; @@ -172,8 +170,9 @@ parse_output_parameters(List *options, uint32 *protocol_version, bool messages_option_given = false; bool streaming_given = false; - *binary = false; - *messages = false; + data->binary = false; + data->messages = false; + data->streaming = false; foreach(lc, options) { @@ -203,7 +202,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, errmsg("proto_version \"%s\" out of range", strVal(defel->arg)))); - *protocol_version = (uint32) parsed; + data->protocol_version = (uint32) parsed; } else if (strcmp(defel->defname, "publication_names") == 0) { @@ -214,7 +213,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, publication_names_given = true; if (!SplitIdentifierString(strVal(defel->arg), ',', - publication_names)) + &data->publication_names)) ereport(ERROR, (errcode(ERRCODE_INVALID_NAME), errmsg("invalid publication_names syntax"))); @@ -227,7 +226,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, errmsg("conflicting or redundant options"))); binary_option_given = true; - *binary = defGetBoolean(defel); + data->binary = defGetBoolean(defel); } else if (strcmp(defel->defname, "messages") == 0) { @@ -237,7 +236,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, errmsg("conflicting or redundant options"))); messages_option_given = true; - *messages = defGetBoolean(defel); + data->messages = defGetBoolean(defel); } else if (strcmp(defel->defname, "streaming") == 0) { @@ -247,7 +246,7 @@ parse_output_parameters(List *options, uint32 *protocol_version, errmsg("conflicting or redundant options"))); streaming_given = true; - *enable_streaming = defGetBoolean(defel); + data->streaming = defGetBoolean(defel); } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); @@ -261,7 +260,6 @@ static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) { - bool enable_streaming = false; PGOutputData *data = palloc0(sizeof(PGOutputData)); /* Create our memory context for private allocations. */ @@ -282,12 +280,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, if (!is_init) { /* Parse the params and ERROR if we see any we don't recognize */ - parse_output_parameters(ctx->output_plugin_options, - &data->protocol_version, - &data->publication_names, - &data->binary, - &data->messages, - &enable_streaming); + parse_output_parameters(ctx->output_plugin_options, data); /* Check if we support requested protocol */ if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM) @@ -313,7 +306,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, * we only allow it with sufficient version of the protocol, and when * the output plugin supports it. */ - if (!enable_streaming) + if (!data->streaming) ctx->streaming = false; else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM) ereport(ERROR, diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 3b7273bd89..62a7d0a6d3 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -26,6 +26,7 @@ typedef struct PGOutputData List *publications; bool binary; bool messages; + bool streaming; } PGOutputData; #endif /* PGOUTPUT_H */ -- 2.20.1
From 73444c7e6979e6493043f011600a3c49bcea2aa6 Mon Sep 17 00:00:00 2001 From: Euler Taveira <euler.tave...@enterprisedb.com> Date: Thu, 19 Nov 2020 21:53:29 -0300 Subject: [PATCH 6/6] Overhaul tests --- src/test/subscription/t/020_messages.pl | 160 +++++++++++------------- 1 file changed, 75 insertions(+), 85 deletions(-) diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl index 8e59e324e3..d9123ed3ef 100644 --- a/src/test/subscription/t/020_messages.pl +++ b/src/test/subscription/t/020_messages.pl @@ -1,158 +1,148 @@ -# Tests that logical decoding messages are emitted and that -# they do not break subscribers +# Tests that logical decoding messages use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 7; +use Test::More tests => 5; +# Create publisher node my $node_publisher = get_new_node('publisher'); $node_publisher->init(allows_streaming => 'logical'); $node_publisher->start; +# Create subscriber node my $node_subscriber = get_new_node('subscriber'); $node_subscriber->init(allows_streaming => 'logical'); $node_subscriber->start; +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_test (a int primary key)"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_test (a int primary key)"); + +# Setup logical replication my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; -# -$node_publisher->safe_psql('postgres', - "CREATE TABLE tab (a int PRIMARY KEY)"); +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_test"); + $node_subscriber->safe_psql('postgres', - "CREATE TABLE tab (a int PRIMARY KEY)"); -$node_publisher->safe_psql('postgres', - "CREATE PUBLICATION pub FOR TABLE tab"); -$node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" ); -# ensure a transactional logical decoding message shows up on the slot -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE"); +# Ensure a transactional logical decoding message shows up on the slot +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); $node_publisher->safe_psql('postgres', - "select pg_logical_emit_message(true, 'a prefix', 'a transactional message')" + "SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')" ); -my $slot_codes_with_message = $node_publisher->safe_psql( +my $result = $node_publisher->safe_psql( 'postgres', qq( - select get_byte(data, 0) - from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + SELECT get_byte(data, 0) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, 'proto_version', '1', - 'publication_names', 'pub', + 'publication_names', 'tap_pub', 'messages', 'true') )); # 66 77 67 == B M C == BEGIN MESSAGE COMMIT -is($slot_codes_with_message, "66\n77\n67", +is($result, qq(66 +77 +67), 'messages on slot are B M C with message option'); -my $transactional_message_flags = $node_publisher->safe_psql( +$result = $node_publisher->safe_psql( 'postgres', qq( - select get_byte(data, 1) - from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + SELECT get_byte(data, 1), encode(substr(data, 11, 8), 'escape') + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, 'proto_version', '1', - 'publication_names', 'pub', + 'publication_names', 'tap_pub', 'messages', 'true') - offset 1 limit 1 + OFFSET 1 LIMIT 1 )); -is($transactional_message_flags, "1", - "transactional message flags are set to 1"); +is($result, qq(1|pgoutput), + "flag transactional is set to 1 and prefix is pgoutput"); -my $slot_codes_without_message = $node_publisher->safe_psql( +$result = $node_publisher->safe_psql( 'postgres', qq( - select get_byte(data, 0) - from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + SELECT get_byte(data, 0) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, 'proto_version', '1', - 'publication_names', 'pub') + 'publication_names', 'tap_pub') )); # 66 67 == B C == BEGIN COMMIT -is($slot_codes_without_message, "66\n67", - 'messages on slot are B C without message option'); +is($result, qq(66 +67), + 'option messages defaults to false so message (M) is not available on slot'); -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); -$node_publisher->wait_for_catchup('sub'); +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE"); +$node_publisher->wait_for_catchup('tap_sub'); # ensure a non-transactional logical decoding message shows up on the slot -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE"); +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); -$node_publisher->safe_psql('postgres', "INSERT INTO tab VALUES (3)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)"); my $message_lsn = $node_publisher->safe_psql('postgres', - "select pg_logical_emit_message(false, 'prefix', 'nontransactional')"); + "SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message')"); -$node_publisher->safe_psql('postgres', "INSERT INTO tab VALUES (4)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (2)"); -my $slot_message_code = $node_publisher->safe_psql( +$result = $node_publisher->safe_psql( 'postgres', qq( - select get_byte(data, 0) - from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + SELECT get_byte(data, 0), get_byte(data, 1) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, 'proto_version', '1', - 'publication_names', 'pub', + 'publication_names', 'tap_pub', 'messages', 'true') - where lsn = '$message_lsn' and xid = 0 + WHERE lsn = '$message_lsn' AND xid = 0 )); -is($slot_message_code, "77", "non-transactional message on slot is M"); +is($result, qq(77|0), 'non-transactional message on slot is M'); -my $nontransactional_message_flags = $node_publisher->safe_psql( - 'postgres', qq( - select get_byte(data, 1) - from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, - 'proto_version', '1', - 'publication_names', 'pub', - 'messages', 'true') - offset 1 limit 1 -)); +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE"); +$node_publisher->wait_for_catchup('tap_sub'); -is($nontransactional_message_flags, - "0", "non-transactional message flags are set to 0"); - -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); -$node_publisher->wait_for_catchup('sub'); - -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE"); -$node_subscriber->safe_psql('postgres', "checkpoint;"); +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); # wait for the replication connection to drop from the publisher $node_publisher->poll_query_until('postgres', - 'SELECT count(*) from pg_catalog.pg_stat_replication', 0); + 'SELECT COUNT(*) FROM pg_catalog.pg_stat_replication', 0); -# ensure a non-transactional logical decoding message shows up on the slot -# when it is emitted within an aborted transaction. the message won't emit -# until something advances the LSN, which we intentionally do here with a -# checkpoint. +# Ensure a non-transactional logical decoding message shows up on the slot when +# it is emitted within an aborted transaction. The message won't emit until +# something advances the LSN, hence, we intentionally forces the server to +# switch to a new WAL file. $node_publisher->safe_psql( 'postgres', qq( BEGIN; - SELECT pg_logical_emit_message(false, 'prefix', - 'nontransactional aborted 1'); - INSERT INTO tab VALUES (5); - SELECT pg_logical_emit_message(false, 'prefix', - 'nontransactional aborted 2'); + SELECT pg_logical_emit_message(false, 'pgoutput', + 'a non-transactional message is available even if the transaction is aborted 1'); + INSERT INTO tab_test VALUES (3); + SELECT pg_logical_emit_message(true, 'pgoutput', + 'a transactional message is not available if the transaction is aborted'); + SELECT pg_logical_emit_message(false, 'pgoutput', + 'a non-transactional message is available even if the transaction is aborted 2'); ROLLBACK; - CHECKPOINT; + SELECT pg_switch_wal(); )); -my $aborted_txn_message_codes = $node_publisher->safe_psql( +$result = $node_publisher->safe_psql( 'postgres', qq( - select get_byte(data, 0) - from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + SELECT get_byte(data, 0), get_byte(data, 1) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, 'proto_version', '1', - 'publication_names', 'pub', + 'publication_names', 'tap_pub', 'messages', 'true') )); -is($aborted_txn_message_codes, "77\n77", - "non-transactional message on slot from aborted transaction is M"); - -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); -$node_publisher->wait_for_catchup('sub'); - -my $result = - $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab"); -is($result, qq(2), 'rows move'); +is($result, qq(77|0 +77|0), + 'non-transactional message on slot from aborted transaction is M'); $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.20.1