On Fri, Jan 15, 2016 at 5:31 PM, Shulgin, Oleksandr <
oleksandr.shul...@zalando.de> wrote:

>
> POC patch attached.  Findings:
>
> 1) Needs an actual slot for all the decode machinery to work (code depends
> on MyReplicationSlot being set).
> 2) Requires a core patch.
> 3) Currently only supports textual output, adding binary is trivial.
>
>
> Acquiring a slot means this cannot be run in parallel from multiple
> backends.  Any ideas on how to overcome this (except for opening multiple
> slots with the same LSN)?
> To obtain a consistent snapshot, the client still needs to take care of
> preserving and setting transaction snapshot properly.
>

Testing revealed a number of problems with memory handling in this code, a
corrected v2 is attached.

Completely another problem is proper handling of SPI stack and releasing
the replication slot.  The latter I'd like to avoid dealing with, because
at the moment it's not possible to stream a number of relations in parallel
using this POC function, so I'd rather move in a direction of not acquiring
the replication slot at all.

The SPI problem manifests itself if I place a LIMIT on top of the query:

# SELECT pg_logical_slot_stream_relation('slot1', 'pg_catalog', 'pg_class')
LIMIT 5;
WARNING:  relcache reference leak: relation "pg_class" not closed
WARNING:  transaction left non-empty SPI stack
HINT:  Check for missing "SPI_finish" calls.

I wonder if there is a way to install some sort of cleanup handler that
will be called by executor?

--
Alex
From 83c2c754066f43111d0f21ff088cc5503e910aab Mon Sep 17 00:00:00 2001
From: Oleksandr Shulgin <oleksandr.shul...@zalando.de>
Date: Fri, 15 Jan 2016 17:30:04 +0100
Subject: [PATCH] POC: pg_logical_slot_stream_relation

---
 src/backend/catalog/system_views.sql            |   9 +
 src/backend/replication/logical/logicalfuncs.c  | 355 +++++++++++++++++++++---
 src/backend/replication/logical/reorderbuffer.c |   6 +-
 src/include/catalog/pg_proc.h                   |   2 +
 src/include/replication/logicalfuncs.h          |   1 +
 src/include/replication/reorderbuffer.h         |   3 +
 6 files changed, 333 insertions(+), 43 deletions(-)

diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 923fe58..5431b61 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -941,6 +941,15 @@ LANGUAGE INTERNAL
 VOLATILE ROWS 1000 COST 1000
 AS 'pg_logical_slot_peek_binary_changes';
 
+CREATE OR REPLACE FUNCTION pg_logical_slot_stream_relation(
+    IN slot_name name, IN relnamespace name, IN relname name, IN nochildren bool DEFAULT FALSE,
+    VARIADIC options text[] DEFAULT '{}',
+    OUT data text)
+RETURNS SETOF TEXT
+LANGUAGE INTERNAL
+VOLATILE ROWS 1000 COST 1000
+AS 'pg_logical_slot_stream_relation';
+
 CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
     IN slot_name name, IN immediately_reserve boolean DEFAULT false,
     OUT slot_name name, OUT xlog_position pg_lsn)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 562c8f6..c1605de 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -21,12 +21,18 @@
 #include "funcapi.h"
 #include "miscadmin.h"
 
+#include "access/htup_details.h"
 #include "access/xlog_internal.h"
 
+#include "executor/spi.h"
+
+#include "catalog/namespace.h"
 #include "catalog/pg_type.h"
 
 #include "nodes/makefuncs.h"
 
+#include "lib/stringinfo.h"
+
 #include "mb/pg_wchar.h"
 
 #include "utils/array.h"
@@ -40,6 +46,7 @@
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/logicalfuncs.h"
+#include "replication/reorderbuffer.h"
 
 #include "storage/fd.h"
 
@@ -50,6 +57,12 @@ typedef struct DecodingOutputState
 	TupleDesc	tupdesc;
 	bool		binary_output;
 	int64		returned_rows;
+
+	/* for pg_logical_stream_relation */
+	MemoryContext	context;
+	Relation		rel;
+	Portal			cursor;
+	TupleTableSlot *tupslot;
 } DecodingOutputState;
 
 /*
@@ -270,6 +283,53 @@ logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	return count;
 }
 
+static List *
+deconstruct_options_array(ArrayType *arr)
+{
+	Size		ndim;
+	List	   *options = NIL;
+
+	ndim = ARR_NDIM(arr);
+	if (ndim > 1)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("array must be one-dimensional")));
+	}
+	else if (array_contains_nulls(arr))
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("array must not contain nulls")));
+	}
+	else if (ndim == 1)
+	{
+		int			nelems;
+		Datum	   *datum_opts;
+		int			i;
+
+		Assert(ARR_ELEMTYPE(arr) == TEXTOID);
+
+		deconstruct_array(arr, TEXTOID, -1, false, 'i',
+						  &datum_opts, NULL, &nelems);
+
+		if (nelems % 2 != 0)
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("array must have even number of elements")));
+
+		for (i = 0; i < nelems; i += 2)
+		{
+			char	   *name = TextDatumGetCString(datum_opts[i]);
+			char	   *opt = TextDatumGetCString(datum_opts[i + 1]);
+
+			options = lappend(options, makeDefElem(name, (Node *) makeString(opt)));
+		}
+	}
+
+	return options;
+}
+
 /*
  * Helper function for the various SQL callable logical decoding functions.
  */
@@ -287,7 +347,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	LogicalDecodingContext *ctx;
 	ResourceOwner old_resowner = CurrentResourceOwner;
 	ArrayType  *arr;
-	Size		ndim;
 	List	   *options = NIL;
 	DecodingOutputState *p;
 
@@ -339,44 +398,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
 	oldcontext = MemoryContextSwitchTo(per_query_ctx);
 
-	/* Deconstruct options array */
-	ndim = ARR_NDIM(arr);
-	if (ndim > 1)
-	{
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("array must be one-dimensional")));
-	}
-	else if (array_contains_nulls(arr))
-	{
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("array must not contain nulls")));
-	}
-	else if (ndim == 1)
-	{
-		int			nelems;
-		Datum	   *datum_opts;
-		int			i;
-
-		Assert(ARR_ELEMTYPE(arr) == TEXTOID);
-
-		deconstruct_array(arr, TEXTOID, -1, false, 'i',
-						  &datum_opts, NULL, &nelems);
-
-		if (nelems % 2 != 0)
-			ereport(ERROR,
-					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-					 errmsg("array must have even number of elements")));
-
-		for (i = 0; i < nelems; i += 2)
-		{
-			char	   *name = TextDatumGetCString(datum_opts[i]);
-			char	   *opt = TextDatumGetCString(datum_opts[i + 1]);
-
-			options = lappend(options, makeDefElem(name, (Node *) makeString(opt)));
-		}
-	}
+	options = deconstruct_options_array(arr);
 
 	p->tupstore = tuplestore_begin_heap(true, false, work_mem);
 	rsinfo->returnMode = SFRM_Materialize;
@@ -515,3 +537,258 @@ pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
 {
 	return pg_logical_slot_get_changes_guts(fcinfo, false, true);
 }
+
+Datum
+pg_logical_slot_stream_relation(PG_FUNCTION_ARGS)
+{
+	Name		name;
+	Name		relnamespace;
+	Name		relname;
+	bool		nochildren;
+	ArrayType  *arr;
+	List	   *options = NIL;
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+
+	LogicalDecodingContext	   *ctx;
+	DecodingOutputState		   *p;
+	ReorderBufferTXN		   *txn;
+	ReorderBufferChange		   *change;
+
+	FuncCallContext	   *funcctx;
+	MemoryContext		oldcontext;
+	const char		   *relident;
+	int					ret;
+	SPIPlanPtr			plan;
+	StringInfoData		query;
+	Oid					nspoid;
+
+	HeapTuple			tuple;
+	bool				isnull;
+	Datum				result;
+
+	oldcontext = CurrentMemoryContext;
+
+	if (SRF_IS_FIRSTCALL())
+	{
+		check_permissions();
+
+		CheckLogicalDecodingRequirements();
+
+		if (PG_ARGISNULL(0))
+			ereport(ERROR,
+					(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+					 errmsg("slot name must not be null")));
+		name = PG_GETARG_NAME(0);
+
+		if (PG_ARGISNULL(1))
+			ereport(ERROR,
+					(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+					 errmsg("relnamespace cannot be null")));
+		relnamespace = PG_GETARG_NAME(1);
+
+		if (PG_ARGISNULL(2))
+			ereport(ERROR,
+					(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+					 errmsg("relname cannot be null")));
+		relname = PG_GETARG_NAME(2);
+
+		if (PG_ARGISNULL(3))
+			ereport(ERROR,
+					(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+					 errmsg("nochildren must not be null")));
+		nochildren = PG_GETARG_BOOL(3);
+
+		if (PG_ARGISNULL(4))
+			ereport(ERROR,
+					(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+					 errmsg("options array must not be null")));
+		arr = PG_GETARG_ARRAYTYPE_P(4);
+
+		/* check to see if caller supports us returning a tuplestore */
+		if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("set-valued function called in context that cannot accept a set")));
+		/*
+		if (!(rsinfo->allowedModes & SFRM_Materialize))
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("materialize mode required, but it is not allowed in this context")));
+		*/
+
+		MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
+
+		options = deconstruct_options_array(arr);
+
+		funcctx = SRF_FIRSTCALL_INIT();
+
+		/* Things allocated in this memory context will live until SRF_RETURN_DONE(). */
+		MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		ReplicationSlotAcquire(NameStr(*name));
+
+		ctx = CreateDecodingContext(InvalidXLogRecPtr,
+									options,
+									NULL /*logical_read_local_xlog_page*/,
+									LogicalOutputPrepareWrite,
+									LogicalOutputWrite);
+		funcctx->user_fctx = ctx;
+
+		/*
+		 * Check whether the output plugin writes textual output if that's
+		 * what we need.
+		 */
+		if (/*!binary &&*/ ctx->options.output_type != OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
+							NameStr(MyReplicationSlot->data.plugin),
+							format_procedure(fcinfo->flinfo->fn_oid))));
+
+		p = palloc0(sizeof(DecodingOutputState));
+		ctx->output_writer_private = p;
+
+		p->binary_output = false /*binary*/;
+
+		/* Build a simple "SELECT ... FROM [ONLY] schema.table" query. */
+		initStringInfo(&query);
+		appendStringInfoString(&query, "SELECT * FROM ");
+
+		/* Exclude data from children tables? */
+		if (nochildren)
+			appendStringInfoString(&query, " ONLY");
+
+		relident = quote_qualified_identifier(NameStr(*relnamespace), NameStr(*relname));
+		appendStringInfoString(&query, relident);
+
+		nspoid = get_namespace_oid(NameStr(*relnamespace), false);
+		p->rel = RelationIdGetRelation(get_relname_relid(NameStr(*relname), nspoid));
+
+		/* Build a tuple descriptor for our result type. */
+		if (get_func_result_type(3784 /* XXX: piggyback on pg_logical_slot_peek_changes */,
+								 NULL, &p->tupdesc) != TYPEFUNC_COMPOSITE)
+			elog(ERROR, "return type must be a row type");
+
+		p->context = AllocSetContextCreate(CurrentMemoryContext,
+										   "pg_logical_slot_stream_relation context",
+										   ALLOCSET_DEFAULT_MINSIZE,
+										   ALLOCSET_DEFAULT_INITSIZE,
+										   ALLOCSET_DEFAULT_MAXSIZE);
+		MemoryContextSwitchTo(p->context);
+
+		/* Make the tuple store. */
+		p->tupstore = tuplestore_begin_heap(true, false, work_mem);
+
+		/* And we need a slot to fetch the tuples back. */
+		p->tupslot = MakeSingleTupleTableSlot(p->tupdesc);
+
+		/* Initialize SPI (this switches to its own memory context). */
+		if ((ret = SPI_connect()) < 0)
+			elog(ERROR, "SPI_connect returned %d", ret);
+
+		plan = SPI_prepare_cursor(query.data, 0, NULL, CURSOR_OPT_NO_SCROLL);
+		if (!plan)
+			elog(ERROR, "SPI_prepare_cursor failed with error %d", SPI_result);
+
+		p->cursor = SPI_cursor_open(NULL, plan, NULL, NULL, true);
+
+		/* XXX: emit BEGIN? */
+	}
+
+	funcctx = SRF_PERCALL_SETUP();
+
+	ctx = (LogicalDecodingContext *) funcctx->user_fctx;
+	p = (DecodingOutputState *) ctx->output_writer_private;
+
+	MemoryContextSwitchTo(p->context);
+
+	/*
+	 * A single call to logical decoding plugin callback might emit a number
+	 * of writes, so if we have some stuff in tupstore from previous call we
+	 * give it away now.
+	 */
+	if (tuplestore_gettupleslot(p->tupstore, /* forward = */ true, /* copy = */ false,
+								p->tupslot))
+	{
+		tuple = ExecCopySlotTuple(p->tupslot);
+		result = fastgetattr(tuple, 3, p->tupdesc, &isnull);
+
+		MemoryContextSwitchTo(oldcontext);
+
+		/* XXX: Assert(!isnull) ? */
+		if (isnull)
+			SRF_RETURN_NEXT_NULL(funcctx);
+		else
+			SRF_RETURN_NEXT(funcctx, result);
+	}
+
+	/* Fetch next tuple from the seq scan. */
+	SPI_cursor_fetch(p->cursor, true, 1);
+	if (SPI_processed == 0)
+	{
+		/* We're done, release the slot and other resources. */
+		/* XXX: emit COMMIT? */
+		RelationClose(p->rel);
+		ExecDropSingleTupleTableSlot(p->tupslot);
+
+		SPI_cursor_close(p->cursor);
+		SPI_freetuptable(SPI_tuptable);
+		SPI_finish();
+
+		FreeDecodingContext(ctx);
+		ReplicationSlotRelease();
+
+		MemoryContextSwitchTo(oldcontext);
+
+		SRF_RETURN_DONE(funcctx);
+	}
+	if (SPI_processed != 1)
+		elog(ERROR, "expected exactly 1 row from cursor, but got %d rows", SPI_processed);
+
+	/* SPI_cursor_fetch() leaves us in the SPI memory context, switch back. */
+	MemoryContextSwitchTo(p->context);
+
+	/* We're done with the last time results: reset the context, tuplestore and slot. */
+	MemoryContextReset(p->context);
+	p->tupstore = tuplestore_begin_heap(true, false, work_mem);
+	p->tupslot = MakeSingleTupleTableSlot(p->tupdesc);
+
+	/* emit INSERT */
+	txn = ReorderBufferGetTXN(ctx->reorder);
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_INSERT;
+
+	change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+	memset(change->data.tp.newtuple, 0, sizeof(ReorderBufferTupleBuf));
+	memcpy(&change->data.tp.newtuple->tuple, SPI_tuptable->vals[0], sizeof(HeapTupleData));
+
+	ctx->reorder->apply_change(ctx->reorder, txn, p->rel, change);
+
+	ReorderBufferReturnChange(ctx->reorder, change);
+	ReorderBufferReturnTXN(ctx->reorder, txn);
+
+	/* fetch a tuple from the store */
+	if (tuplestore_gettupleslot(p->tupstore, /* forward = */ true, /* copy = */ false,
+								p->tupslot))
+	{
+		tuple = ExecCopySlotTuple(p->tupslot);
+		result = fastgetattr(tuple, 3, p->tupdesc, &isnull);
+	}
+	else
+	{
+		isnull = true;
+		result = (Datum) 0;
+	}
+
+	/* don't forget to clear the SPI temp context */
+	SPI_freetuptable(SPI_tuptable);
+
+	MemoryContextSwitchTo(oldcontext);
+
+	/* XXX: Assert(!isnull) ? */
+	if (isnull)
+		SRF_RETURN_NEXT_NULL(funcctx);
+	else
+		SRF_RETURN_NEXT(funcctx, result);
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 7402f20..2a9c2e0 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -165,8 +165,6 @@ static const Size max_cached_transactions = 512;
  * primary reorderbuffer support routines
  * ---------------------------------------
  */
-static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
-static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
 					  TransactionId xid, bool create, bool *is_new,
 					  XLogRecPtr lsn, bool create_as_top);
@@ -288,7 +286,7 @@ ReorderBufferFree(ReorderBuffer *rb)
 /*
  * Get an unused, possibly preallocated, ReorderBufferTXN.
  */
-static ReorderBufferTXN *
+ReorderBufferTXN *
 ReorderBufferGetTXN(ReorderBuffer *rb)
 {
 	ReorderBufferTXN *txn;
@@ -322,7 +320,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
  * Deallocation might be delayed for efficiency purposes, for details check
  * the comments above max_cached_changes's definition.
  */
-static void
+void
 ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
 	/* clean the lookup cache if we were cached (quite likely) */
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 3df5ac5..23c0b5e 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5064,6 +5064,8 @@ DATA(insert OID = 3784 (  pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000
 DESCR("peek at changes from replication slot");
 DATA(insert OID = 3785 (  pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ ));
 DESCR("peek at binary changes from replication slot");
+DATA(insert OID = 3997 (  pg_logical_slot_stream_relation PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 5 0 25 "19 19 19 16 1009" "{19,19,19,16,1009,25}" "{i,i,i,i,v,o}" "{slot_name,relnamespace,relname,nochildren,options,data}" _null_ _null_ pg_logical_slot_stream_relation _null_ _null_ _null_ ));
+DESCR("stream relation as a series of changes using the replication slot plugin");
 
 /* event triggers */
 DATA(insert OID = 3566 (  pg_event_trigger_dropped_objects		PGNSP PGUID 12 10 100 0 0 f f f f t t s s 0 0 2249 "" "{26,26,23,16,16,16,25,25,25,25,1009,1009}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args}" _null_ _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ ));
diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h
index c87a1df..df60bfe 100644
--- a/src/include/replication/logicalfuncs.h
+++ b/src/include/replication/logicalfuncs.h
@@ -20,5 +20,6 @@ extern Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS);
 extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS);
 extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS);
 extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS);
+extern Datum pg_logical_slot_stream_relation(PG_FUNCTION_ARGS);
 
 #endif
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 2abee0a..e321e43 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -341,6 +341,9 @@ struct ReorderBuffer
 ReorderBuffer *ReorderBufferAllocate(void);
 void		ReorderBufferFree(ReorderBuffer *);
 
+ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
+void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
+
 ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *);
 void		ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
 ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
-- 
2.5.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to