From a5811a850ebc66b6b6267afe341c3929cbb57b17 Mon Sep 17 00:00:00 2001
From: Amul Sul <sulamul@gmail.com>
Date: Fri, 6 Jan 2017 18:13:37 +0530
Subject: [PATCH 2/2] pg_background as client of BackgroundSession-v1

---
 contrib/Makefile                                 |   1 +
 contrib/pg_background/Makefile                   |  20 ++
 contrib/pg_background/expected/pg_background.out |  75 +++++
 contrib/pg_background/pg_background--1.0.sql     |  37 +++
 contrib/pg_background/pg_background.c            | 397 +++++++++++++++++++++++
 contrib/pg_background/pg_background.control      |   4 +
 contrib/pg_background/sql/pg_background.sql      |  30 ++
 7 files changed, 564 insertions(+)
 create mode 100644 contrib/pg_background/Makefile
 create mode 100644 contrib/pg_background/expected/pg_background.out
 create mode 100644 contrib/pg_background/pg_background--1.0.sql
 create mode 100644 contrib/pg_background/pg_background.c
 create mode 100644 contrib/pg_background/pg_background.control
 create mode 100644 contrib/pg_background/sql/pg_background.sql

diff --git a/contrib/Makefile b/contrib/Makefile
index 25263c0..04ec28a 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -29,6 +29,7 @@ SUBDIRS = \
 		oid2name	\
 		pageinspect	\
 		passwordcheck	\
+		pg_background \
 		pg_buffercache	\
 		pg_freespacemap \
 		pg_prewarm	\
diff --git a/contrib/pg_background/Makefile b/contrib/pg_background/Makefile
new file mode 100644
index 0000000..085fbff
--- /dev/null
+++ b/contrib/pg_background/Makefile
@@ -0,0 +1,20 @@
+# contrib/pg_background/Makefile
+
+MODULE_big = pg_background
+OBJS = pg_background.o
+
+EXTENSION = pg_background
+DATA = pg_background--1.0.sql
+
+REGRESS = pg_background
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/pg_background
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/pg_background/expected/pg_background.out b/contrib/pg_background/expected/pg_background.out
new file mode 100644
index 0000000..23679e6
--- /dev/null
+++ b/contrib/pg_background/expected/pg_background.out
@@ -0,0 +1,75 @@
+CREATE EXTENSION pg_background;
+--launch 6 workers which wait  .0, .1, .2, .3, .4, .5 seconds respectively
+CREATE TABLE input AS
+	SELECT pg_background_start() pid, x, row_number() OVER (ORDER BY x) n
+	FROM generate_series(0,.5,0.1) x
+	ORDER BY x DESC;
+CREATE TABLE output(place int,value float);
+--display active background workers
+SELECT count(*) as active_background_workers FROM pg_background_show();
+ active_background_workers 
+---------------------------
+                         6
+(1 row)
+
+--run sql query
+SELECT n as worker_num
+FROM input JOIN LATERAL pg_background_run(input.pid,
+										  format($$
+												 SELECT pg_sleep(%s)::text;
+												 INSERT INTO output VALUES (%s, %s);
+												 $$,
+												 x, n, x
+												)
+										 ) ON (true);
+ worker_num 
+------------
+          6
+          5
+          4
+          3
+          2
+          1
+(6 rows)
+
+--wait until everyone finishes
+SELECT n as worker_num
+FROM input JOIN LATERAL pg_background_result(input.pid) AS (x TEXT) ON (true);
+ worker_num 
+------------
+          6
+          5
+          4
+          3
+          2
+          1
+(6 rows)
+
+--output results
+SELECT * FROM output ORDER BY place;
+ place | value 
+-------+-------
+     1 |     0
+     2 |   0.1
+     3 |   0.2
+     4 |   0.3
+     5 |   0.4
+     6 |   0.5
+(6 rows)
+
+--cleanup
+SELECT n as worker_num
+FROM input JOIN LATERAL pg_background_close(input.pid) ON (true);
+ worker_num 
+------------
+          6
+          5
+          4
+          3
+          2
+          1
+(6 rows)
+
+DROP TABLE input;
+DROP TABLE output;
+DROP EXTENSION pg_background;
diff --git a/contrib/pg_background/pg_background--1.0.sql b/contrib/pg_background/pg_background--1.0.sql
new file mode 100644
index 0000000..62a9d18
--- /dev/null
+++ b/contrib/pg_background/pg_background--1.0.sql
@@ -0,0 +1,37 @@
+/* contrib/pg_background/pg_background--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION pg_background" to load this file. \quit
+
+CREATE FUNCTION pg_background_show()
+    RETURNS TABLE(background_worker_id pg_catalog.int4,
+				  num_of_results_waiting_to_read pg_catalog.int4) STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_start()
+    RETURNS pg_catalog.int4 STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_run(pid pg_catalog.int4, sql pg_catalog.text)
+    RETURNS pg_catalog.void STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_result(pid pg_catalog.int4)
+    RETURNS SETOF pg_catalog.record STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_close(pid pg_catalog.int4)
+    RETURNS pg_catalog.void STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
+
+REVOKE ALL ON FUNCTION pg_background_show()
+	FROM public;
+REVOKE ALL ON FUNCTION pg_background_start()
+	FROM public;
+REVOKE ALL ON FUNCTION pg_background_run(pid pg_catalog.int4,
+										 sql pg_catalog.text)
+	FROM public;
+REVOKE ALL ON FUNCTION pg_background_result(pg_catalog.int4)
+	FROM public;
+REVOKE ALL ON FUNCTION pg_background_close(pg_catalog.int4)
+	FROM public;
diff --git a/contrib/pg_background/pg_background.c b/contrib/pg_background/pg_background.c
new file mode 100644
index 0000000..e5a5b35
--- /dev/null
+++ b/contrib/pg_background/pg_background.c
@@ -0,0 +1,397 @@
+/*--------------------------------------------------------------------------
+ *
+ * pg_background.c
+ *		Run SQL commands using a background worker.
+ *
+ * Copyright (C) 2017, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		contrib/pg_background/pg_background.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "catalog/pg_type.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "tcop/bgsession.h"
+#include "utils/acl.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+
+/*  Private state maintained by the launching backend for IPC. */
+typedef struct pg_background_worker_info
+{
+	int32				pid;
+	Oid					current_user_id;
+	BackgroundSession  *session;
+	uint32				result_count;
+} pg_background_worker_info;
+
+/* Private state maintained across calls to pg_background_result. */
+typedef struct pg_background_result_state
+{
+	pg_background_worker_info  *info;
+	BackgroundSessionResult	   *result;
+} pg_background_result_state;
+
+static HTAB *worker_hash = NULL;
+
+static void remove_worker_info(int32 pid);
+static pg_background_worker_info *find_worker_info(int32 pid);
+static void save_worker_info(int32 pid, BackgroundSession *session);
+static void check_rights(pg_background_worker_info *info);
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(pg_background_show);
+PG_FUNCTION_INFO_V1(pg_background_start);
+PG_FUNCTION_INFO_V1(pg_background_run);
+PG_FUNCTION_INFO_V1(pg_background_result);
+PG_FUNCTION_INFO_V1(pg_background_close);
+
+/*
+ * Display the list of background worker previously launched in this session.
+ */
+Datum
+pg_background_show(PG_FUNCTION_ARGS)
+{
+	FuncCallContext			   *funcctx;
+	HASH_SEQ_STATUS			   *hash_seq;
+	TupleDesc					tupdesc;
+	pg_background_worker_info  *info;
+
+	/* First-time setup. */
+	if (SRF_IS_FIRSTCALL())
+	{
+		MemoryContext	oldcontext;
+
+		funcctx = SRF_FIRSTCALL_INIT();
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		/* Construct a tuple descriptor for the result rows. */
+		tupdesc = CreateTemplateTupleDesc(2, false);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pid",
+						   INT4OID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "results",
+						   INT4OID, -1, 0);
+
+		funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+		if (worker_hash)
+		{
+			hash_seq = palloc(sizeof(HASH_SEQ_STATUS));
+
+			hash_seq_init(hash_seq, worker_hash);
+			funcctx->user_fctx = hash_seq;
+		}
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+
+	funcctx = SRF_PERCALL_SETUP();
+	hash_seq = (HASH_SEQ_STATUS *) funcctx->user_fctx;
+
+	while (hash_seq && (info = hash_seq_search(hash_seq)) != NULL)
+	{
+		Datum		values[2];
+		bool		nulls[2];
+		HeapTuple	tuple;
+
+		values[0] = Int32GetDatum(info->pid);
+		nulls[0] = false;
+		values[1] = Int32GetDatum(info->result_count);
+		nulls[1] = false;
+
+		/* Build and return the tuple. */
+		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
+		SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
+	}
+
+	SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * Start a dynamic background worker.
+ */
+Datum
+pg_background_start(PG_FUNCTION_ARGS)
+{
+	BackgroundSession  *session;
+	int32				pid;
+
+	session = BackgroundSessionStart(TopMemoryContext);
+	pid = session->pid;
+
+	/* Save worker info */
+	save_worker_info(pid, session);
+
+	/*  Return the worker's PID. */
+	PG_RETURN_INT32(pid);
+}
+
+/*
+ * Run a user-specified SQL command.
+ */
+Datum
+pg_background_run(PG_FUNCTION_ARGS)
+{
+	char	   *sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+	int32		pid = PG_GETARG_INT32(0);
+	pg_background_worker_info *info;
+
+	/* See if we have a connection to the specified PID. */
+	if ((info = find_worker_info(pid)) == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("PID %d is not attached to this session", pid)));
+	check_rights(info);
+
+	/* Execute give SQL query */
+	BackgroundSessionSend(info->session, sql);
+	info->result_count++;
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Retrieve the results of a background query previously launched in this
+ * session.
+ */
+Datum
+pg_background_result(PG_FUNCTION_ARGS)
+{
+	int32						pid = PG_GETARG_INT32(0);
+	FuncCallContext			   *funcctx;
+	pg_background_result_state *state;
+	TupleDesc					tupdesc;
+	BackgroundSessionResult	   *result;
+
+	/* First-time setup. */
+	if (SRF_IS_FIRSTCALL())
+	{
+		MemoryContext				oldcontext;
+		pg_background_worker_info  *info;
+
+		funcctx = SRF_FIRSTCALL_INIT();
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		/* See if we have a connection to the specified PID. */
+		if ((info = find_worker_info(pid)) == NULL)
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_OBJECT),
+					 errmsg("PID %d is not attached to this session", pid)));
+		check_rights(info);
+
+		/* Can't read results twice. */
+		if (info->result_count <= 0)
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("results for PID %d have already been consumed", pid)));
+		info->result_count--;
+
+		/* Set up tuple-descriptor based on column definition list. */
+		if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("function returning record called in context "
+							"that cannot accept type record"),
+					 errhint("Try calling the function in the FROM clause "
+							 "using a column definition list.")));
+		result = BackgroundSessionGetResult(info->session);
+		funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+		if (result->tupdesc)
+		{
+			bool datatype_mismatch = false;
+
+			if (tupdesc->natts != result->tupdesc->natts)
+				datatype_mismatch = true;
+			else
+			{
+				int i;
+				for (i = 0; i < tupdesc->natts; ++i)
+					if(tupdesc->attrs[i]->atttypid !=
+					   result->tupdesc->attrs[i]->atttypid)
+					{
+						datatype_mismatch = true;
+						break;
+					}
+			}
+
+			if (datatype_mismatch)
+				ereport(ERROR,
+						(errcode(ERRCODE_DATATYPE_MISMATCH),
+						 errmsg("remote query result rowtype does not match the specified FROM clause rowtype")));
+		}
+
+		/* Cache state that will be needed on every call. */
+		state = palloc0(sizeof(pg_background_result_state));
+		state->info = info;
+		state->result = result;
+
+		funcctx->user_fctx = state;
+		MemoryContextSwitchTo(oldcontext);
+	}
+
+	funcctx = SRF_PERCALL_SETUP();
+	tupdesc = funcctx->tuple_desc;
+	state = funcctx->user_fctx;
+	result = state->result;
+
+	if (result->tupdesc)
+	{
+		if (result->tuples != NIL && funcctx->call_cntr > 0)
+			result->tuples = list_delete_first(result->tuples);
+
+		if (result->tuples != NIL)
+		{
+			HeapTuple tuple = (HeapTuple) linitial(result->tuples);
+
+			/*
+			 * Set the tuple type ID information fields correctly because
+			 * BackgroundSessionFetchResult returns it as an anonymous record
+			 * type.
+			 */
+			HeapTupleHeaderSetTypeId(tuple->t_data, tupdesc->tdtypeid);
+			HeapTupleHeaderSetTypMod(tuple->t_data, tupdesc->tdtypmod);
+
+			SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
+		}
+	}
+	else	/* If no data rows, return the command tags instead. */
+	{
+		if (tupdesc->natts != 1 || tupdesc->attrs[0]->atttypid != TEXTOID)
+			ereport(ERROR,
+					(errcode(ERRCODE_DATATYPE_MISMATCH),
+					 errmsg("remote query did not return a result set, but result rowtype is not a single text column")));
+
+		if (result->command != NULL)
+		{
+			bool	isnull = false;
+			Datum	value = PointerGetDatum(cstring_to_text(result->command));
+			HeapTuple	tuple = heap_form_tuple(tupdesc, &value, &isnull);
+
+			result->command = NULL;
+
+			SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
+		}
+	}
+
+	SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * End background session and remove hashtable entry.
+ */
+Datum
+pg_background_close(PG_FUNCTION_ARGS)
+{
+	int32						pid = PG_GETARG_INT32(0);
+	pg_background_worker_info  *info;
+
+	info = find_worker_info(pid);
+	if (info == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("PID %d is not attached to this session", pid)));
+
+	check_rights(info);
+	remove_worker_info(pid);
+	BackgroundSessionEnd(info->session);
+
+	PG_RETURN_VOID();
+}
+
+static void
+remove_worker_info(int32 pid)
+{
+	bool	found;
+
+	/* Remove the hashtable entry. */
+	hash_search(worker_hash, (void *) &pid, HASH_REMOVE, &found);
+	if (!found)
+		elog(ERROR, "pg_background worker_hash table corrupted");
+}
+
+/*
+ * Find the background worker information for the worker with a given PID.
+ */
+static pg_background_worker_info *
+find_worker_info(int32 pid)
+{
+	pg_background_worker_info *info = NULL;
+
+	if (worker_hash != NULL)
+		info = hash_search(worker_hash, (void *) &pid, HASH_FIND, NULL);
+
+	return info;
+}
+
+/*
+ * Save worker info.
+ */
+static void
+save_worker_info(int32 pid, BackgroundSession *session)
+{
+	pg_background_worker_info  *info;
+	Oid							current_user_id;
+	int							sec_context;
+
+	/* If the hash table hasn't been set up yet, do that now. */
+	if (worker_hash == NULL)
+	{
+		HASHCTL	ctl;
+
+		ctl.keysize = sizeof(int32);
+		ctl.entrysize = sizeof(pg_background_worker_info);
+		worker_hash = hash_create("pg_background worker_hash", 8, &ctl,
+								  HASH_ELEM);
+	}
+
+	/* Get current authentication information. */
+	GetUserIdAndSecContext(&current_user_id, &sec_context);
+
+	/*
+	 * In the unlikely event that there's an older worker with this PID,
+	 * just detach it - unless it has a different user ID than the
+	 * currently-active one, in which case someone might be trying to pull
+	 * a fast one.  Let's kill the backend to make sure we don't break
+	 * anyone's expectations.
+	 */
+	if ((info = find_worker_info(pid)) != NULL)
+	{
+		if (current_user_id != info->current_user_id)
+			ereport(FATAL,
+				(errcode(ERRCODE_DUPLICATE_OBJECT),
+			 errmsg("background worker with PID \"%d\" already exists",
+						pid)));
+	}
+
+	/* Create a new entry for this worker. */
+	info = hash_search(worker_hash, (void *) &pid, HASH_ENTER, NULL);
+	info->session = session;
+	info->result_count = 0;
+	info->current_user_id = current_user_id;
+}
+
+/*
+ * Check whether the current user has rights to manipulate the background
+ * worker with the given PID.
+ */
+static void
+check_rights(pg_background_worker_info *info)
+{
+	Oid	current_user_id;
+	int	sec_context;
+
+	GetUserIdAndSecContext(&current_user_id, &sec_context);
+	if (!has_privs_of_role(current_user_id, info->current_user_id))
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+			 errmsg("permission denied for background worker with PID \"%d\"",
+						info->pid)));
+}
diff --git a/contrib/pg_background/pg_background.control b/contrib/pg_background/pg_background.control
new file mode 100644
index 0000000..733d0e1
--- /dev/null
+++ b/contrib/pg_background/pg_background.control
@@ -0,0 +1,4 @@
+comment = 'Run SQL queries in the background'
+default_version = '1.0'
+module_pathname = '$libdir/pg_background'
+relocatable = true
diff --git a/contrib/pg_background/sql/pg_background.sql b/contrib/pg_background/sql/pg_background.sql
new file mode 100644
index 0000000..be49dd5
--- /dev/null
+++ b/contrib/pg_background/sql/pg_background.sql
@@ -0,0 +1,30 @@
+CREATE EXTENSION pg_background;
+--launch 6 workers which wait  .0, .1, .2, .3, .4, .5 seconds respectively
+CREATE TABLE input AS
+	SELECT pg_background_start() pid, x, row_number() OVER (ORDER BY x) n
+	FROM generate_series(0,.5,0.1) x
+	ORDER BY x DESC;
+CREATE TABLE output(place int,value float);
+--display active background workers
+SELECT count(*) as active_background_workers FROM pg_background_show();
+--run sql query
+SELECT n as worker_num
+FROM input JOIN LATERAL pg_background_run(input.pid,
+										  format($$
+												 SELECT pg_sleep(%s)::text;
+												 INSERT INTO output VALUES (%s, %s);
+												 $$,
+												 x, n, x
+												)
+										 ) ON (true);
+--wait until everyone finishes
+SELECT n as worker_num
+FROM input JOIN LATERAL pg_background_result(input.pid) AS (x TEXT) ON (true);
+--output results
+SELECT * FROM output ORDER BY place;
+--cleanup
+SELECT n as worker_num
+FROM input JOIN LATERAL pg_background_close(input.pid) ON (true);
+DROP TABLE input;
+DROP TABLE output;
+DROP EXTENSION pg_background;
-- 
2.6.2

