On Fri, Jan 15, 2016 at 5:31 PM, Shulgin, Oleksandr < oleksandr.shul...@zalando.de> wrote:
> > POC patch attached. Findings: > > 1) Needs an actual slot for all the decode machinery to work (code depends > on MyReplicationSlot being set). > 2) Requires a core patch. > 3) Currently only supports textual output, adding binary is trivial. > > > Acquiring a slot means this cannot be run in parallel from multiple > backends. Any ideas on how to overcome this (except for opening multiple > slots with the same LSN)? > To obtain a consistent snapshot, the client still needs to take care of > preserving and setting transaction snapshot properly. > Testing revealed a number of problems with memory handling in this code, a corrected v2 is attached. Completely another problem is proper handling of SPI stack and releasing the replication slot. The latter I'd like to avoid dealing with, because at the moment it's not possible to stream a number of relations in parallel using this POC function, so I'd rather move in a direction of not acquiring the replication slot at all. The SPI problem manifests itself if I place a LIMIT on top of the query: # SELECT pg_logical_slot_stream_relation('slot1', 'pg_catalog', 'pg_class') LIMIT 5; WARNING: relcache reference leak: relation "pg_class" not closed WARNING: transaction left non-empty SPI stack HINT: Check for missing "SPI_finish" calls. I wonder if there is a way to install some sort of cleanup handler that will be called by executor? -- Alex
From 83c2c754066f43111d0f21ff088cc5503e910aab Mon Sep 17 00:00:00 2001 From: Oleksandr Shulgin <oleksandr.shul...@zalando.de> Date: Fri, 15 Jan 2016 17:30:04 +0100 Subject: [PATCH] POC: pg_logical_slot_stream_relation --- src/backend/catalog/system_views.sql | 9 + src/backend/replication/logical/logicalfuncs.c | 355 +++++++++++++++++++++--- src/backend/replication/logical/reorderbuffer.c | 6 +- src/include/catalog/pg_proc.h | 2 + src/include/replication/logicalfuncs.h | 1 + src/include/replication/reorderbuffer.h | 3 + 6 files changed, 333 insertions(+), 43 deletions(-) diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 923fe58..5431b61 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -941,6 +941,15 @@ LANGUAGE INTERNAL VOLATILE ROWS 1000 COST 1000 AS 'pg_logical_slot_peek_binary_changes'; +CREATE OR REPLACE FUNCTION pg_logical_slot_stream_relation( + IN slot_name name, IN relnamespace name, IN relname name, IN nochildren bool DEFAULT FALSE, + VARIADIC options text[] DEFAULT '{}', + OUT data text) +RETURNS SETOF TEXT +LANGUAGE INTERNAL +VOLATILE ROWS 1000 COST 1000 +AS 'pg_logical_slot_stream_relation'; + CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot( IN slot_name name, IN immediately_reserve boolean DEFAULT false, OUT slot_name name, OUT xlog_position pg_lsn) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 562c8f6..c1605de 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -21,12 +21,18 @@ #include "funcapi.h" #include "miscadmin.h" +#include "access/htup_details.h" #include "access/xlog_internal.h" +#include "executor/spi.h" + +#include "catalog/namespace.h" #include "catalog/pg_type.h" #include "nodes/makefuncs.h" +#include "lib/stringinfo.h" + #include "mb/pg_wchar.h" #include "utils/array.h" @@ -40,6 +46,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/logicalfuncs.h" +#include "replication/reorderbuffer.h" #include "storage/fd.h" @@ -50,6 +57,12 @@ typedef struct DecodingOutputState TupleDesc tupdesc; bool binary_output; int64 returned_rows; + + /* for pg_logical_stream_relation */ + MemoryContext context; + Relation rel; + Portal cursor; + TupleTableSlot *tupslot; } DecodingOutputState; /* @@ -270,6 +283,53 @@ logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, return count; } +static List * +deconstruct_options_array(ArrayType *arr) +{ + Size ndim; + List *options = NIL; + + ndim = ARR_NDIM(arr); + if (ndim > 1) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("array must be one-dimensional"))); + } + else if (array_contains_nulls(arr)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("array must not contain nulls"))); + } + else if (ndim == 1) + { + int nelems; + Datum *datum_opts; + int i; + + Assert(ARR_ELEMTYPE(arr) == TEXTOID); + + deconstruct_array(arr, TEXTOID, -1, false, 'i', + &datum_opts, NULL, &nelems); + + if (nelems % 2 != 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("array must have even number of elements"))); + + for (i = 0; i < nelems; i += 2) + { + char *name = TextDatumGetCString(datum_opts[i]); + char *opt = TextDatumGetCString(datum_opts[i + 1]); + + options = lappend(options, makeDefElem(name, (Node *) makeString(opt))); + } + } + + return options; +} + /* * Helper function for the various SQL callable logical decoding functions. */ @@ -287,7 +347,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; ArrayType *arr; - Size ndim; List *options = NIL; DecodingOutputState *p; @@ -339,44 +398,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; oldcontext = MemoryContextSwitchTo(per_query_ctx); - /* Deconstruct options array */ - ndim = ARR_NDIM(arr); - if (ndim > 1) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("array must be one-dimensional"))); - } - else if (array_contains_nulls(arr)) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("array must not contain nulls"))); - } - else if (ndim == 1) - { - int nelems; - Datum *datum_opts; - int i; - - Assert(ARR_ELEMTYPE(arr) == TEXTOID); - - deconstruct_array(arr, TEXTOID, -1, false, 'i', - &datum_opts, NULL, &nelems); - - if (nelems % 2 != 0) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("array must have even number of elements"))); - - for (i = 0; i < nelems; i += 2) - { - char *name = TextDatumGetCString(datum_opts[i]); - char *opt = TextDatumGetCString(datum_opts[i + 1]); - - options = lappend(options, makeDefElem(name, (Node *) makeString(opt))); - } - } + options = deconstruct_options_array(arr); p->tupstore = tuplestore_begin_heap(true, false, work_mem); rsinfo->returnMode = SFRM_Materialize; @@ -515,3 +537,258 @@ pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS) { return pg_logical_slot_get_changes_guts(fcinfo, false, true); } + +Datum +pg_logical_slot_stream_relation(PG_FUNCTION_ARGS) +{ + Name name; + Name relnamespace; + Name relname; + bool nochildren; + ArrayType *arr; + List *options = NIL; + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + + LogicalDecodingContext *ctx; + DecodingOutputState *p; + ReorderBufferTXN *txn; + ReorderBufferChange *change; + + FuncCallContext *funcctx; + MemoryContext oldcontext; + const char *relident; + int ret; + SPIPlanPtr plan; + StringInfoData query; + Oid nspoid; + + HeapTuple tuple; + bool isnull; + Datum result; + + oldcontext = CurrentMemoryContext; + + if (SRF_IS_FIRSTCALL()) + { + check_permissions(); + + CheckLogicalDecodingRequirements(); + + if (PG_ARGISNULL(0)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("slot name must not be null"))); + name = PG_GETARG_NAME(0); + + if (PG_ARGISNULL(1)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("relnamespace cannot be null"))); + relnamespace = PG_GETARG_NAME(1); + + if (PG_ARGISNULL(2)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("relname cannot be null"))); + relname = PG_GETARG_NAME(2); + + if (PG_ARGISNULL(3)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("nochildren must not be null"))); + nochildren = PG_GETARG_BOOL(3); + + if (PG_ARGISNULL(4)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("options array must not be null"))); + arr = PG_GETARG_ARRAYTYPE_P(4); + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + /* + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + */ + + MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory); + + options = deconstruct_options_array(arr); + + funcctx = SRF_FIRSTCALL_INIT(); + + /* Things allocated in this memory context will live until SRF_RETURN_DONE(). */ + MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + ReplicationSlotAcquire(NameStr(*name)); + + ctx = CreateDecodingContext(InvalidXLogRecPtr, + options, + NULL /*logical_read_local_xlog_page*/, + LogicalOutputPrepareWrite, + LogicalOutputWrite); + funcctx->user_fctx = ctx; + + /* + * Check whether the output plugin writes textual output if that's + * what we need. + */ + if (/*!binary &&*/ ctx->options.output_type != OUTPUT_PLUGIN_TEXTUAL_OUTPUT) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data", + NameStr(MyReplicationSlot->data.plugin), + format_procedure(fcinfo->flinfo->fn_oid)))); + + p = palloc0(sizeof(DecodingOutputState)); + ctx->output_writer_private = p; + + p->binary_output = false /*binary*/; + + /* Build a simple "SELECT ... FROM [ONLY] schema.table" query. */ + initStringInfo(&query); + appendStringInfoString(&query, "SELECT * FROM "); + + /* Exclude data from children tables? */ + if (nochildren) + appendStringInfoString(&query, " ONLY"); + + relident = quote_qualified_identifier(NameStr(*relnamespace), NameStr(*relname)); + appendStringInfoString(&query, relident); + + nspoid = get_namespace_oid(NameStr(*relnamespace), false); + p->rel = RelationIdGetRelation(get_relname_relid(NameStr(*relname), nspoid)); + + /* Build a tuple descriptor for our result type. */ + if (get_func_result_type(3784 /* XXX: piggyback on pg_logical_slot_peek_changes */, + NULL, &p->tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + p->context = AllocSetContextCreate(CurrentMemoryContext, + "pg_logical_slot_stream_relation context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContextSwitchTo(p->context); + + /* Make the tuple store. */ + p->tupstore = tuplestore_begin_heap(true, false, work_mem); + + /* And we need a slot to fetch the tuples back. */ + p->tupslot = MakeSingleTupleTableSlot(p->tupdesc); + + /* Initialize SPI (this switches to its own memory context). */ + if ((ret = SPI_connect()) < 0) + elog(ERROR, "SPI_connect returned %d", ret); + + plan = SPI_prepare_cursor(query.data, 0, NULL, CURSOR_OPT_NO_SCROLL); + if (!plan) + elog(ERROR, "SPI_prepare_cursor failed with error %d", SPI_result); + + p->cursor = SPI_cursor_open(NULL, plan, NULL, NULL, true); + + /* XXX: emit BEGIN? */ + } + + funcctx = SRF_PERCALL_SETUP(); + + ctx = (LogicalDecodingContext *) funcctx->user_fctx; + p = (DecodingOutputState *) ctx->output_writer_private; + + MemoryContextSwitchTo(p->context); + + /* + * A single call to logical decoding plugin callback might emit a number + * of writes, so if we have some stuff in tupstore from previous call we + * give it away now. + */ + if (tuplestore_gettupleslot(p->tupstore, /* forward = */ true, /* copy = */ false, + p->tupslot)) + { + tuple = ExecCopySlotTuple(p->tupslot); + result = fastgetattr(tuple, 3, p->tupdesc, &isnull); + + MemoryContextSwitchTo(oldcontext); + + /* XXX: Assert(!isnull) ? */ + if (isnull) + SRF_RETURN_NEXT_NULL(funcctx); + else + SRF_RETURN_NEXT(funcctx, result); + } + + /* Fetch next tuple from the seq scan. */ + SPI_cursor_fetch(p->cursor, true, 1); + if (SPI_processed == 0) + { + /* We're done, release the slot and other resources. */ + /* XXX: emit COMMIT? */ + RelationClose(p->rel); + ExecDropSingleTupleTableSlot(p->tupslot); + + SPI_cursor_close(p->cursor); + SPI_freetuptable(SPI_tuptable); + SPI_finish(); + + FreeDecodingContext(ctx); + ReplicationSlotRelease(); + + MemoryContextSwitchTo(oldcontext); + + SRF_RETURN_DONE(funcctx); + } + if (SPI_processed != 1) + elog(ERROR, "expected exactly 1 row from cursor, but got %d rows", SPI_processed); + + /* SPI_cursor_fetch() leaves us in the SPI memory context, switch back. */ + MemoryContextSwitchTo(p->context); + + /* We're done with the last time results: reset the context, tuplestore and slot. */ + MemoryContextReset(p->context); + p->tupstore = tuplestore_begin_heap(true, false, work_mem); + p->tupslot = MakeSingleTupleTableSlot(p->tupdesc); + + /* emit INSERT */ + txn = ReorderBufferGetTXN(ctx->reorder); + + change = ReorderBufferGetChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_INSERT; + + change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + memset(change->data.tp.newtuple, 0, sizeof(ReorderBufferTupleBuf)); + memcpy(&change->data.tp.newtuple->tuple, SPI_tuptable->vals[0], sizeof(HeapTupleData)); + + ctx->reorder->apply_change(ctx->reorder, txn, p->rel, change); + + ReorderBufferReturnChange(ctx->reorder, change); + ReorderBufferReturnTXN(ctx->reorder, txn); + + /* fetch a tuple from the store */ + if (tuplestore_gettupleslot(p->tupstore, /* forward = */ true, /* copy = */ false, + p->tupslot)) + { + tuple = ExecCopySlotTuple(p->tupslot); + result = fastgetattr(tuple, 3, p->tupdesc, &isnull); + } + else + { + isnull = true; + result = (Datum) 0; + } + + /* don't forget to clear the SPI temp context */ + SPI_freetuptable(SPI_tuptable); + + MemoryContextSwitchTo(oldcontext); + + /* XXX: Assert(!isnull) ? */ + if (isnull) + SRF_RETURN_NEXT_NULL(funcctx); + else + SRF_RETURN_NEXT(funcctx, result); +} diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 7402f20..2a9c2e0 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -165,8 +165,6 @@ static const Size max_cached_transactions = 512; * primary reorderbuffer support routines * --------------------------------------- */ -static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb); -static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top); @@ -288,7 +286,7 @@ ReorderBufferFree(ReorderBuffer *rb) /* * Get an unused, possibly preallocated, ReorderBufferTXN. */ -static ReorderBufferTXN * +ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb) { ReorderBufferTXN *txn; @@ -322,7 +320,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb) * Deallocation might be delayed for efficiency purposes, for details check * the comments above max_cached_changes's definition. */ -static void +void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { /* clean the lookup cache if we were cached (quite likely) */ diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 3df5ac5..23c0b5e 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5064,6 +5064,8 @@ DATA(insert OID = 3784 ( pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000 DESCR("peek at changes from replication slot"); DATA(insert OID = 3785 ( pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ )); DESCR("peek at binary changes from replication slot"); +DATA(insert OID = 3997 ( pg_logical_slot_stream_relation PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 5 0 25 "19 19 19 16 1009" "{19,19,19,16,1009,25}" "{i,i,i,i,v,o}" "{slot_name,relnamespace,relname,nochildren,options,data}" _null_ _null_ pg_logical_slot_stream_relation _null_ _null_ _null_ )); +DESCR("stream relation as a series of changes using the replication slot plugin"); /* event triggers */ DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s s 0 0 2249 "" "{26,26,23,16,16,16,25,25,25,25,1009,1009}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args}" _null_ _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ )); diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h index c87a1df..df60bfe 100644 --- a/src/include/replication/logicalfuncs.h +++ b/src/include/replication/logicalfuncs.h @@ -20,5 +20,6 @@ extern Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS); extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS); extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS); extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS); +extern Datum pg_logical_slot_stream_relation(PG_FUNCTION_ARGS); #endif diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 2abee0a..e321e43 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -341,6 +341,9 @@ struct ReorderBuffer ReorderBuffer *ReorderBufferAllocate(void); void ReorderBufferFree(ReorderBuffer *); +ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb); +void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); + ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *); void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple); ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); -- 2.5.0
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers