Hi all, As there has been a dump of CATALOG_VERSION_NO on REL9_4_STABLE recently, I am coming back to the options OUTPUT_PLUGIN_* that are rather confusing for developers of decoder plugins. In short, when extracting changes from a replication slot, a decoder plugin is free to set one option that influences the output that is fetched by a logical receiver: OUTPUT_PLUGIN_TEXTUAL_OUTPUT or OUTPUT_PLUGIN_BINARY_OUTPUT. The interesting point about this format option is that it does not directly influence the changes generated by an output plugin: its value only has effect on the set of functions pg_logical_[get|peek]_[binary_|]changes that can be used by a non-replication connection to get individual changes from a repslot.
So a receiver fetching changes using PQgetCopyData is not really impacted by this format option. Even better it is even possible to use a custom option that is part of output_plugin_options to switch the OUTPUT_PLUGIN_* value (cf option force-binary in contrib/test_decoding). My point is: logical decoding is presenting in its infrastructure API a format option that could be added as a custom option in a decoder plugin. Isn't that orthogonal? To illustrate this argument here is an example: we could remove force-binary in test_decoding and replace it by a option that allows the user to directly choose the output format, like format=[binary|textual] and do the conversion in the plugin. In the same manner, the functions pg_logical_[get|peek]_binary_changes are equivalent to their cousin pg_logical_[get|peek]_changes casted to bytea. I am raising this point before the 9.4 ship sails, thinking long-term and to faciliate the maintenance of existing code. Attached is a patch that simplifies the current logical decoding API regarding that. Regards, -- Michael
From dea334ff685c9587b9a28fe4feb324c4dcc870c2 Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@otacoo.com> Date: Thu, 18 Sep 2014 01:53:22 -0500 Subject: [PATCH] Remove OUTPUT_PLUGIN_* from existing logical decoding APIs This is confusing for users as this option only influences the output of get/peek options that the logical decoding facility is offering. Also, there exist other possibilities for the user to generate binary output: - Use a bytea cast on the textual get/peek functions - integrate an option within the plugin that generates binary output at will. In short, the concept of options should remain exclusively in output_plugin_options and not elsewhere. --- contrib/test_decoding/Makefile | 2 +- contrib/test_decoding/sql/binary.sql | 14 ------- contrib/test_decoding/test_decoding.c | 21 +--------- doc/src/sgml/func.sgml | 36 ----------------- doc/src/sgml/logicaldecoding.sgml | 13 +------ src/backend/catalog/system_views.sql | 16 -------- src/backend/replication/logical/logical.c | 11 +++--- src/backend/replication/logical/logicalfuncs.c | 53 ++++---------------------- src/include/catalog/pg_proc.h | 6 +-- src/include/replication/logical.h | 1 - src/include/replication/logicalfuncs.h | 2 - src/include/replication/output_plugin.h | 15 -------- src/tools/pgindent/typedefs.list | 2 - 13 files changed, 17 insertions(+), 175 deletions(-) delete mode 100644 contrib/test_decoding/sql/binary.sql diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index d7f32c3..9f48947 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -37,7 +37,7 @@ submake-isolation: submake-test_decoding: $(MAKE) -C $(top_builddir)/contrib/test_decoding -REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact binary prepared +REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact prepared regresscheck: all | submake-regress submake-test_decoding $(MKDIR_P) regression_output diff --git a/contrib/test_decoding/sql/binary.sql b/contrib/test_decoding/sql/binary.sql deleted file mode 100644 index 619f00b..0000000 --- a/contrib/test_decoding/sql/binary.sql +++ /dev/null @@ -1,14 +0,0 @@ --- predictability -SET synchronous_commit = on; - -SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); --- succeeds, textual plugin, textual consumer -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0'); --- fails, binary plugin, textual consumer -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '1'); --- succeeds, textual plugin, binary consumer -SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot', NULL, NULL, 'force-binary', '0'); --- succeeds, binary plugin, binary consumer -SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot', NULL, NULL, 'force-binary', '1'); - -SELECT 'init' FROM pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index fdbd313..b5ccd45 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -45,7 +45,7 @@ typedef struct bool xact_wrote_changes; } TestDecodingData; -static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, +static void pg_decode_startup(LogicalDecodingContext *ctx, bool is_init); static void pg_decode_shutdown(LogicalDecodingContext *ctx); static void pg_decode_begin_txn(LogicalDecodingContext *ctx, @@ -82,7 +82,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) /* initialize this plugin */ static void -pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, +pg_decode_startup(LogicalDecodingContext *ctx, bool is_init) { ListCell *option; @@ -100,8 +100,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ctx->output_plugin_private = data; - opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; - foreach(option, ctx->output_plugin_options) { DefElem *elem = lfirst(option); @@ -129,21 +127,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } - else if (strcmp(elem->defname, "force-binary") == 0) - { - bool force_binary; - - if (elem->arg == NULL) - continue; - else if (!parse_bool(strVal(elem->arg), &force_binary)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("could not parse value \"%s\" for parameter \"%s\"", - strVal(elem->arg), elem->defname))); - - if (force_binary) - opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; - } else if (strcmp(elem->defname, "skip-empty-xacts") == 0) { diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 7195df8..fee25db 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -16725,42 +16725,6 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); again on future calls. </entry> </row> - - <row> - <entry> - <indexterm> - <primary>pg_logical_slot_get_binary_changes</primary> - </indexterm> - <literal><function>pg_logical_slot_get_binary_changes(<parameter>slot_name</parameter> <type>name</type>, <parameter>upto_lsn</parameter> <type>pg_lsn</type>, <parameter>upto_nchanges</parameter> <type>int</type>, VARIADIC <parameter>options</parameter> <type>text[]</type>)</function></literal> - </entry> - <entry> - (<parameter>location</parameter> <type>pg_lsn</type>, <parameter>xid</parameter> <type>xid</type>, <parameter>data</parameter> <type>bytea</type>) - </entry> - <entry> - Behaves just like - the <function>pg_logical_slot_get_changes()</function> function, - except that changes are returned as <type>bytea</type>. - </entry> - </row> - - <row> - <entry> - <indexterm> - <primary>pg_logical_slot_peek_binary_changes</primary> - </indexterm> - <literal><function>pg_logical_slot_peek_binary_changes(<parameter>slot_name</parameter> <type>name</type>, <parameter>upto_lsn</parameter> <type>pg_lsn</type>, <parameter>upto_nchanges</parameter> <type>int</type>, VARIADIC <parameter>options</parameter> <type>text[]</type>)</function></literal> - </entry> - <entry> - (<parameter>location</parameter> <type>pg_lsn</type>, <parameter>xid</parameter> <type>xid</type>, <parameter>data</parameter> <type>bytea</type>) - </entry> - <entry> - Behaves just like - the <function>pg_logical_slot_get_changes()</function> function, - except that changes are returned as <type>bytea</type> and that - changes are not consumed; that is, they will be returned again - on future calls. - </entry> - </row> </tbody> </tgroup> </table> diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index e6880d0..5de8629 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -389,23 +389,12 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); <programlisting> typedef void (*LogicalDecodeStartupCB) ( struct LogicalDecodingContext *ctx, - OutputPluginOptions *options, bool is_init ); </programlisting> The <literal>is_init</literal> parameter will be true when the replication slot is being created and false - otherwise. <parameter>options</parameter> points to a struct of options - that output plugins can set: -<programlisting> -typedef struct OutputPluginOptions -{ - OutputPluginOutputType output_type; -} OutputPluginOptions; -</programlisting> - <literal>output_type</literal> has to either be set to - <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal> - or <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal>. + otherwise. </para> <para> The startup callback should validate the options present in diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 22663c3..afeed3f 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -843,22 +843,6 @@ LANGUAGE INTERNAL VOLATILE ROWS 1000 COST 1000 AS 'pg_logical_slot_peek_changes'; -CREATE OR REPLACE FUNCTION pg_logical_slot_get_binary_changes( - IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}', - OUT location pg_lsn, OUT xid xid, OUT data bytea) -RETURNS SETOF RECORD -LANGUAGE INTERNAL -VOLATILE ROWS 1000 COST 1000 -AS 'pg_logical_slot_get_binary_changes'; - -CREATE OR REPLACE FUNCTION pg_logical_slot_peek_binary_changes( - IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}', - OUT location pg_lsn, OUT xid xid, OUT data bytea) -RETURNS SETOF RECORD -LANGUAGE INTERNAL -VOLATILE ROWS 1000 COST 1000 -AS 'pg_logical_slot_peek_binary_changes'; - CREATE OR REPLACE FUNCTION make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0, days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0, diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 49f9c7d..b5c7ca4 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -55,8 +55,7 @@ typedef struct LogicalErrorCallbackState /* wrappers around output plugin callbacks */ static void output_plugin_error_callback(void *arg); -static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, - bool is_init); +static void startup_cb_wrapper(LogicalDecodingContext *ctx, bool is_init); static void shutdown_cb_wrapper(LogicalDecodingContext *ctx); static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn); static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -325,7 +324,7 @@ CreateInitDecodingContext(char *plugin, /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); if (ctx->callbacks.startup_cb != NULL) - startup_cb_wrapper(ctx, &ctx->options, true); + startup_cb_wrapper(ctx, true); MemoryContextSwitchTo(old_context); return ctx; @@ -406,7 +405,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); if (ctx->callbacks.startup_cb != NULL) - startup_cb_wrapper(ctx, &ctx->options, false); + startup_cb_wrapper(ctx, false); MemoryContextSwitchTo(old_context); ereport(LOG, @@ -561,7 +560,7 @@ output_plugin_error_callback(void *arg) } static void -startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) +startup_cb_wrapper(LogicalDecodingContext *ctx, bool is_init) { LogicalErrorCallbackState state; ErrorContextCallback errcallback; @@ -579,7 +578,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i ctx->accept_writes = false; /* do the actual work: call callback */ - ctx->callbacks.startup_cb(ctx, opt, is_init); + ctx->callbacks.startup_cb(ctx, is_init); /* Pop the error context stack */ error_context_stack = errcallback.previous; diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 9692f98..7cb12d4 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -46,7 +46,6 @@ typedef struct DecodingOutputState { Tuplestorestate *tupstore; TupleDesc tupdesc; - bool binary_output; int64 returned_rows; } DecodingOutputState; @@ -81,14 +80,10 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi values[0] = LSNGetDatum(lsn); values[1] = TransactionIdGetDatum(xid); - /* - * Assert ctx->out is in database encoding when we're writing textual - * output. - */ - if (!p->binary_output) - Assert(pg_verify_mbstr(GetDatabaseEncoding(), - ctx->out->data, ctx->out->len, - false)); + /* Check that ctx->out is in database encoding */ + Assert(pg_verify_mbstr(GetDatabaseEncoding(), + ctx->out->data, ctx->out->len, + false)); /* ick, but cstring_to_text_with_len works for bytea perfectly fine */ values[2] = PointerGetDatum( @@ -272,7 +267,7 @@ logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * Helper function for the various SQL callable logical decoding functions. */ static Datum -pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary) +pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm) { Name name = PG_GETARG_NAME(0); XLogRecPtr upto_lsn; @@ -316,8 +311,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* state to write output to */ p = palloc0(sizeof(DecodingOutputState)); - p->binary_output = binary; - /* Build a tuple descriptor for our result type */ if (get_call_result_type(fcinfo, NULL, &p->tupdesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); @@ -393,16 +386,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin MemoryContextSwitchTo(oldcontext); - /* - * Check whether the output plugin writes textual output if that's - * what we need. - */ - if (!binary && - ctx->options.output_type != OUTPUT_PLUGIN_TEXTUAL_OUTPUT) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("output plugin cannot produce binary output"))); - ctx->output_writer_private = p; startptr = MyReplicationSlot->data.restart_lsn; @@ -476,7 +459,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS) { - Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, false); + Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true); return ret; } @@ -487,29 +470,7 @@ pg_logical_slot_get_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS) { - Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, false); - - return ret; -} - -/* - * SQL function returning the changestream in binary, consuming the data. - */ -Datum -pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS) -{ - Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, true); - - return ret; -} - -/* - * SQL function returning the changestream in binary, only peeking ahead. - */ -Datum -pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS) -{ - Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, true); + Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false); return ret; } diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index e66430d..c0fdbc4 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -4976,12 +4976,8 @@ DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 DESCR("set up a logical replication slot"); DATA(insert OID = 3782 ( pg_logical_slot_get_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ pg_logical_slot_get_changes _null_ _null_ _null_ )); DESCR("get changes from replication slot"); -DATA(insert OID = 3783 ( pg_logical_slot_get_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ pg_logical_slot_get_binary_changes _null_ _null_ _null_ )); -DESCR("get binary changes from replication slot"); -DATA(insert OID = 3784 ( pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ pg_logical_slot_peek_changes _null_ _null_ _null_ )); +DATA(insert OID = 3783 ( pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ pg_logical_slot_peek_changes _null_ _null_ _null_ )); DESCR("peek at changes from replication slot"); -DATA(insert OID = 3785 ( pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ )); -DESCR("peek at binary changes from replication slot"); /* event triggers */ DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s 0 0 2249 "" "{26,26,23,25,25,25,25}" "{o,o,o,o,o,o,o}" "{classid, objid, objsubid, object_type, schema_name, object_name, object_identity}" _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ )); diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 26be127..5b78382 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -38,7 +38,6 @@ typedef struct LogicalDecodingContext struct SnapBuild *snapshot_builder; OutputPluginCallbacks callbacks; - OutputPluginOptions options; /* * User specified options diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h index 21bf44e..06373be 100644 --- a/src/include/replication/logicalfuncs.h +++ b/src/include/replication/logicalfuncs.h @@ -17,8 +17,6 @@ extern int logical_read_local_xlog_page(XLogReaderState *state, char *cur_page, TimeLineID *pageTLI); extern Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS); -extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS); extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS); -extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS); #endif diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index a58e68d..764a822 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -14,20 +14,6 @@ struct LogicalDecodingContext; struct OutputPluginCallbacks; -typedef enum OutputPluginOutputType -{ - OUTPUT_PLUGIN_BINARY_OUTPUT, - OUTPUT_PLUGIN_TEXTUAL_OUTPUT -} OutputPluginOutputType; - -/* - * Options set by the output plugin, in the startup callback. - */ -typedef struct OutputPluginOptions -{ - OutputPluginOutputType output_type; -} OutputPluginOptions; - /* * Type of the shared library symbol _PG_output_plugin_init that is looked up * when loading an output plugin shared library. @@ -43,7 +29,6 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); */ typedef void (*LogicalDecodeStartupCB) ( struct LogicalDecodingContext *ctx, - OutputPluginOptions *options, bool is_init ); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index ab36aa3..73cbeb1 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1102,8 +1102,6 @@ OprProofCacheEntry OprProofCacheKey OutputContext OutputPluginCallbacks -OutputPluginOptions -OutputPluginOutputType OverrideSearchPath OverrideStackEntry PACE_HEADER -- 2.1.0
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers