On Mon, Sep 11, 2023 at 02:42:16PM +0900, bt23nguyent wrote: > A minor issue with the description here is that while the description for > the new flush argument in pg_logical_emit_message() with bytea type is > clearly declared, there is no description of flush argument in the former > pg_logical_emit_message() with text type at all.
Indeed, I forgot to update the first function signature. Fixed in the attached. > On a side note, could you also include a bit more information that "flush is > set to false by default" in the document as well? It could be helpful for > the users. With the function signature saying that, that did not seem stricly necessary to me, but no objections to add a few words about that. I'll need a bit more input from Fujii-san before doing anything about his comments, still it looks like a doc issue to me that may need a backpatch to clarify how the non-transactional case behaves. Attaching a v4 with the two doc changes, fow now. -- Michael
From fea24a7f6f5451388945307bdd2ebefd6e5ecd15 Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Mon, 11 Sep 2023 16:14:02 +0900 Subject: [PATCH v4] Add flush argument to pg_logical_emit_message() The default is false, to not flush messages. If set to true, the message is flushed before returning back to the caller. --- src/include/catalog/pg_proc.dat | 4 ++-- src/include/replication/message.h | 3 ++- src/backend/catalog/system_functions.sql | 20 +++++++++++++++++++ .../replication/logical/logicalfuncs.c | 3 ++- src/backend/replication/logical/message.c | 13 ++++++++++-- doc/src/sgml/func.sgml | 9 +++++++-- contrib/test_decoding/expected/messages.out | 5 +++-- contrib/test_decoding/sql/messages.sql | 5 +++-- 8 files changed, 50 insertions(+), 12 deletions(-) diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9805bc6118..3e52e0d4ac 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11158,11 +11158,11 @@ prosrc => 'pg_replication_slot_advance' }, { oid => '3577', descr => 'emit a textual logical decoding message', proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u', - prorettype => 'pg_lsn', proargtypes => 'bool text text', + prorettype => 'pg_lsn', proargtypes => 'bool text text bool', prosrc => 'pg_logical_emit_message_text' }, { oid => '3578', descr => 'emit a binary logical decoding message', proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u', - prorettype => 'pg_lsn', proargtypes => 'bool text bytea', + prorettype => 'pg_lsn', proargtypes => 'bool text bytea bool', prosrc => 'pg_logical_emit_message_bytea' }, # event triggers diff --git a/src/include/replication/message.h b/src/include/replication/message.h index 6ce7f2038b..0f168d572c 100644 --- a/src/include/replication/message.h +++ b/src/include/replication/message.h @@ -30,7 +30,8 @@ typedef struct xl_logical_message #define SizeOfLogicalMessage (offsetof(xl_logical_message, message)) extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, - size_t size, bool transactional); + size_t size, bool transactional, + bool flush); /* RMGR API */ #define XLOG_LOGICAL_MESSAGE 0x00 diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 07c0d89c4f..714f5da941 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -446,6 +446,26 @@ LANGUAGE INTERNAL VOLATILE ROWS 1000 COST 1000 AS 'pg_logical_slot_peek_binary_changes'; +CREATE OR REPLACE FUNCTION pg_catalog.pg_logical_emit_message( + transactional boolean, + prefix text, + message text, + flush boolean DEFAULT false) +RETURNS pg_lsn +LANGUAGE INTERNAL +VOLATILE STRICT +AS 'pg_logical_emit_message_text'; + +CREATE OR REPLACE FUNCTION pg_catalog.pg_logical_emit_message( + transactional boolean, + prefix text, + message bytea, + flush boolean DEFAULT false) +RETURNS pg_lsn +LANGUAGE INTERNAL +VOLATILE STRICT +AS 'pg_logical_emit_message_bytea'; + CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot( IN slot_name name, IN immediately_reserve boolean DEFAULT false, IN temporary boolean DEFAULT false, diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 197169d6b0..1067aca08f 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -362,10 +362,11 @@ pg_logical_emit_message_bytea(PG_FUNCTION_ARGS) bool transactional = PG_GETARG_BOOL(0); char *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1)); bytea *data = PG_GETARG_BYTEA_PP(2); + bool flush = PG_GETARG_BOOL(3); XLogRecPtr lsn; lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data), - transactional); + transactional, flush); PG_RETURN_LSN(lsn); } diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c index c5de14afc6..d49557f46e 100644 --- a/src/backend/replication/logical/message.c +++ b/src/backend/replication/logical/message.c @@ -44,9 +44,10 @@ */ XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, size_t size, - bool transactional) + bool transactional, bool flush) { xl_logical_message xlrec; + XLogRecPtr lsn; /* * Force xid to be allocated if we're emitting a transactional message. @@ -71,7 +72,15 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size, /* allow origin filtering */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); - return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE); + lsn = XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE); + + /* + * Make sure that the message hits disk before leaving if not emitting a + * transactional message, if flush is requested. + */ + if (!transactional && flush) + XLogFlush(lsn); + return lsn; } /* diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 24ad87f910..8c60539aeb 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -27642,11 +27642,11 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset <indexterm> <primary>pg_logical_emit_message</primary> </indexterm> - <function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type> ) + <function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type> [, <parameter>flush</parameter> <type>boolean</type> <literal>DEFAULT</literal> <literal>false</literal>] ) <returnvalue>pg_lsn</returnvalue> </para> <para role="func_signature"> - <function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>bytea</type> ) + <function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>bytea</type> [, <parameter>flush</parameter> <type>boolean</type> <literal>DEFAULT</literal> <literal>false</literal>] ) <returnvalue>pg_lsn</returnvalue> </para> <para> @@ -27660,6 +27660,11 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset recognize messages that are interesting for them. The <parameter>content</parameter> parameter is the content of the message, given either in text or binary form. + The <parameter>flush</parameter> parameter (default set to + <literal>false</literal>) controls if the message is immediately + flushed to WAL or not. <parameter>flush</parameter> has no effect + with <parameter>transactional</parameter>, as the message's WAL + record is flushed when its transaction is committed. </para></entry> </row> </tbody> diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out index 0fd70036bd..84baf8af3e 100644 --- a/contrib/test_decoding/expected/messages.out +++ b/contrib/test_decoding/expected/messages.out @@ -6,13 +6,14 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d init (1 row) -SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1'); +-- These two cover the path for the flush variant. +SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1', true); ?column? ---------- msg1 (1 row) -SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2'); +SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2', true); ?column? ---------- msg2 diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql index 3d8500f99c..1f3dcb63ee 100644 --- a/contrib/test_decoding/sql/messages.sql +++ b/contrib/test_decoding/sql/messages.sql @@ -3,8 +3,9 @@ SET synchronous_commit = on; SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); -SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1'); -SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2'); +-- These two cover the path for the flush variant. +SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1', true); +SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2', true); BEGIN; SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3'); -- 2.40.1
signature.asc
Description: PGP signature