Hi,
thanks for looking Andres,
On 27/02/16 01:05, Andres Freund wrote:
Hi,
I'm not really convinced by RegisterStandbyMsgPrefix() et al. There's
not much documentation about what it actually is supposed to
acomplish. Afaics you're basically forced to use
shared_preload_libraries with it right now? Also, iterating through a
linked list everytime something is logged doesn't seem very satisfying?
Well, my reasoning there was to stop multiple plugins from using same
prefix and for that you need some kind of registry. Making this a shared
catalog seemed like huge overkill given the potentially transient nature
of output plugins and this was the best I could come up with. And yes it
requires you to load your plugin before you can log a message for it.
I am not married to this solution so if you have better ideas or if you
think the clashes are not read issue, we can change it.
On 2016-02-24 18:35:16 +0100, Petr Jelinek wrote:
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot',
'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+SELECT 'msg1' FROM pg_logical_send_message(true, 'test', 'msg1');
+ ?column?
+----------
+ msg1
+(1 row)
Hm. Somehow 'sending' a message seems wrong here. Maybe 'emit'?
+ <row>
+ <entry id="pg-logical-send-message-text">
+ <indexterm>
+ <primary>pg_logical_send_message</primary>
+ </indexterm>
+ <literal><function>pg_logical_send_message(<parameter>transactional</parameter> <type>bool</type>,
<parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter>
<type>text</type>)</function></literal>
+ </entry>
+ <entry>
+ void
+ </entry>
+ <entry>
+ Write text logical decoding message. This can be used to pass generic
+ messages to logical decoding plugins through WAL. The parameter
+ <parameter>transactional</parameter> specifies if the message should
+ be part of current transaction or if it should be written and decoded
+ immediately. The <parameter>prefix</parameter> has to be prefix which
+ was registered by a plugin. The <parameter>content</parameter> is
+ content of the message.
+ </entry>
+ </row>
It's not decoded immediately, even if emitted non-transactionally.
Okay, immediately is somewhat misleading. How does "should be written
immediately and decoded as soon as logical decoding reads the WAL
record" sound ?
+ <sect3 id="logicaldecoding-output-plugin-message">
+ <title>Generic Message Callback</title>
+
+ <para>
+ The optional <function>message_cb</function> callback is called whenever
+ a logical decoding message has been decoded.
+<programlisting>
+typedef void (*LogicalDecodeMessageCB) (
+ struct LogicalDecodingContext *,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ Size message_size,
+ const char *message
+);
We should at least document what txn is set to if not transactional.
Will do (it's NULL).
+void
+logicalmsg_desc(StringInfo buf, XLogReaderState *record)
+{
+ char *rec = XLogRecGetData(record);
+ xl_logical_message *xlrec = (xl_logical_message *) rec;
+
+ appendStringInfo(buf, "%s message size %zu bytes",
+ xlrec->transactional ? "transactional" :
"nontransactional",
+ xlrec->message_size);
+}
Shouldn't we check
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
if XLogRecGetInfo(record) == XLOG_LOGICAL_MESSAGE
here?
I thought it's kinda pointless, but we seem to be doing it in other
places so will add.
+void
+ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
+ bool transactional, const
char *prefix, Size msg_sz,
+ const char *msg)
+{
+ ReorderBufferTXN *txn = NULL;
+
+ if (transactional)
+ {
+ ReorderBufferChange *change;
+
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+ Assert(xid != InvalidTransactionId);
+ Assert(txn != NULL);
+
+ change = ReorderBufferGetChange(rb);
+ change->action = REORDER_BUFFER_CHANGE_MESSAGE;
+ change->data.msg.transactional = true;
+ change->data.msg.prefix = pstrdup(prefix);
+ change->data.msg.message_size = msg_sz;
+ change->data.msg.message = palloc(msg_sz);
+ memcpy(change->data.msg.message, msg, msg_sz);
+
+ ReorderBufferQueueChange(rb, xid, lsn, change);
+ }
+ else
+ {
+ rb->message(rb, txn, lsn, transactional, prefix, msg_sz, msg);
+ }
+}
This approach prohibts catalog access when processing a nontransaction
message as there's no snapshot set up.
Hmm I do see usefulness in having snapshot, although I wonder if that
does not kill the point of non-transactional messages. Question is then
though which snapshot should the message see, base_snapshot of
transaction? That would mean we'd have to call SnapBuildProcessChange
for non-transactional messages which we currently avoid. Alternatively
we could probably invent lighter version of that interface that would
just make sure builder->snapshot is valid and if not then build it but
I am honestly sure if that's a win or not.
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers