On Fri, Oct 13, 2023 at 03:20:30PM +0530, Amit Kapila wrote:
> I would prefer to associate the new parameter 'flush' with
> non-transactional messages as per the proposed patch.

Check.

> Is there a reason to make the functions strict now when they were not earlier?

These two are already STRICT on HEAD:
=# select proname, provolatile, proisstrict from pg_proc
     where proname ~ 'message';
         proname         | provolatile | proisstrict
-------------------------+-------------+-------------
 pg_logical_emit_message | v           | t
 pg_logical_emit_message | v           | t
(2 rows)

> 2.
> +        The <parameter>flush</parameter> parameter (default set to
> +        <literal>false</literal>) controls if the message is immediately
> +        flushed to WAL or not. <parameter>flush</parameter> has no effect
> +        with <parameter>transactional</parameter>, as the message's WAL
> +        record is flushed when its transaction is committed.
> 
> The last part of the message sounds a bit too specific (".. as the
> message's WAL record is flushed when its transaction is committed.")
> because sometimes the WAL could be flushed by walwriter even before
> the commit. Can we say something along the lines: ".. as the message's
> WAL record is flushed along with its transaction."?

Fine by me.

> 3.
> + /*
> + * Make sure that the message hits disk before leaving if not emitting a
> + * transactional message, if flush is requested.
> + */
> + if (!transactional && flush)
> 
> Two ifs in the above comment sound a bit odd but if we want to keep it
> like that then adding 'and' before the second if may slightly improve
> it.

Sure, I've improved this comment.

An updated version is attached.  How does it look?
--
Michael
From c74c97dd33958e6a4a3adc2b3f5bd5bfe786d256 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Mon, 16 Oct 2023 16:12:05 +0900
Subject: [PATCH v5] Add flush argument to pg_logical_emit_message()

The default is false, to not flush messages.  If set to true, the
message is flushed before returning back to the caller.

Note: Bump catalog version.
---
 src/include/catalog/pg_proc.dat               |  4 ++--
 src/include/replication/message.h             |  3 ++-
 src/backend/catalog/system_functions.sql      | 20 +++++++++++++++++++
 .../replication/logical/logicalfuncs.c        |  3 ++-
 src/backend/replication/logical/message.c     | 13 ++++++++++--
 doc/src/sgml/func.sgml                        |  9 +++++++--
 contrib/test_decoding/expected/messages.out   |  5 +++--
 contrib/test_decoding/sql/messages.sql        |  5 +++--
 8 files changed, 50 insertions(+), 12 deletions(-)

diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 72ea4aa8b8..c92d0631a0 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11167,11 +11167,11 @@
   prosrc => 'pg_replication_slot_advance' },
 { oid => '3577', descr => 'emit a textual logical decoding message',
   proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
-  prorettype => 'pg_lsn', proargtypes => 'bool text text',
+  prorettype => 'pg_lsn', proargtypes => 'bool text text bool',
   prosrc => 'pg_logical_emit_message_text' },
 { oid => '3578', descr => 'emit a binary logical decoding message',
   proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
-  prorettype => 'pg_lsn', proargtypes => 'bool text bytea',
+  prorettype => 'pg_lsn', proargtypes => 'bool text bytea bool',
   prosrc => 'pg_logical_emit_message_bytea' },
 
 # event triggers
diff --git a/src/include/replication/message.h b/src/include/replication/message.h
index 6ce7f2038b..0f168d572c 100644
--- a/src/include/replication/message.h
+++ b/src/include/replication/message.h
@@ -30,7 +30,8 @@ typedef struct xl_logical_message
 #define SizeOfLogicalMessage	(offsetof(xl_logical_message, message))
 
 extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
-									size_t size, bool transactional);
+									size_t size, bool transactional,
+									bool flush);
 
 /* RMGR API */
 #define XLOG_LOGICAL_MESSAGE	0x00
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 07c0d89c4f..714f5da941 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -446,6 +446,26 @@ LANGUAGE INTERNAL
 VOLATILE ROWS 1000 COST 1000
 AS 'pg_logical_slot_peek_binary_changes';
 
+CREATE OR REPLACE FUNCTION pg_catalog.pg_logical_emit_message(
+    transactional boolean,
+    prefix text,
+    message text,
+    flush boolean DEFAULT false)
+RETURNS pg_lsn
+LANGUAGE INTERNAL
+VOLATILE STRICT
+AS 'pg_logical_emit_message_text';
+
+CREATE OR REPLACE FUNCTION pg_catalog.pg_logical_emit_message(
+    transactional boolean,
+    prefix text,
+    message bytea,
+    flush boolean DEFAULT false)
+RETURNS pg_lsn
+LANGUAGE INTERNAL
+VOLATILE STRICT
+AS 'pg_logical_emit_message_bytea';
+
 CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
     IN slot_name name, IN immediately_reserve boolean DEFAULT false,
     IN temporary boolean DEFAULT false,
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 197169d6b0..1067aca08f 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -362,10 +362,11 @@ pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
 	bool		transactional = PG_GETARG_BOOL(0);
 	char	   *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
 	bytea	   *data = PG_GETARG_BYTEA_PP(2);
+	bool		flush = PG_GETARG_BOOL(3);
 	XLogRecPtr	lsn;
 
 	lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
-							transactional);
+							transactional, flush);
 	PG_RETURN_LSN(lsn);
 }
 
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c
index c5de14afc6..b5d29382f5 100644
--- a/src/backend/replication/logical/message.c
+++ b/src/backend/replication/logical/message.c
@@ -44,9 +44,10 @@
  */
 XLogRecPtr
 LogLogicalMessage(const char *prefix, const char *message, size_t size,
-				  bool transactional)
+				  bool transactional, bool flush)
 {
 	xl_logical_message xlrec;
+	XLogRecPtr	lsn;
 
 	/*
 	 * Force xid to be allocated if we're emitting a transactional message.
@@ -71,7 +72,15 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
 	/* allow origin filtering */
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
-	return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
+	lsn = XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
+
+	/*
+	 * Make sure that the message hits disk before leaving if emitting a
+	 * non-transactional message when flush is requested.
+	 */
+	if (!transactional && flush)
+		XLogFlush(lsn);
+	return lsn;
 }
 
 /*
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index affd1254bb..7c3e940afe 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -27740,11 +27740,11 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <indexterm>
          <primary>pg_logical_emit_message</primary>
         </indexterm>
-        <function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type> )
+        <function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type> [, <parameter>flush</parameter> <type>boolean</type> <literal>DEFAULT</literal> <literal>false</literal>] )
         <returnvalue>pg_lsn</returnvalue>
        </para>
        <para role="func_signature">
-        <function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>bytea</type> )
+        <function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>bytea</type> [, <parameter>flush</parameter> <type>boolean</type> <literal>DEFAULT</literal> <literal>false</literal>] )
         <returnvalue>pg_lsn</returnvalue>
        </para>
        <para>
@@ -27758,6 +27758,11 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         recognize messages that are interesting for them.
         The <parameter>content</parameter> parameter is the content of the
         message, given either in text or binary form.
+        The <parameter>flush</parameter> parameter (default set to
+        <literal>false</literal>) controls if the message is immediately
+        flushed to WAL or not. <parameter>flush</parameter> has no effect
+        with <parameter>transactional</parameter>, as the message's WAL
+        record is flushed along with its transaction.
        </para></entry>
       </row>
      </tbody>
diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
index 0fd70036bd..84baf8af3e 100644
--- a/contrib/test_decoding/expected/messages.out
+++ b/contrib/test_decoding/expected/messages.out
@@ -6,13 +6,14 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
  init
 (1 row)
 
-SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
+-- These two cover the path for the flush variant.
+SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1', true);
  ?column? 
 ----------
  msg1
 (1 row)
 
-SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
+SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2', true);
  ?column? 
 ----------
  msg2
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
index 3d8500f99c..1f3dcb63ee 100644
--- a/contrib/test_decoding/sql/messages.sql
+++ b/contrib/test_decoding/sql/messages.sql
@@ -3,8 +3,9 @@ SET synchronous_commit = on;
 
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 
-SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
-SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
+-- These two cover the path for the flush variant.
+SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1', true);
+SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2', true);
 
 BEGIN;
 SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');
-- 
2.42.0

Attachment: signature.asc
Description: PGP signature

Reply via email to