On Mon, Sep 13, 2021 at 4:30 PM Tom Lane <t...@sss.pgh.pa.us> wrote:
>
> The direct cause of that is that SPI_execute() doesn't permit the called
> query to perform COMMIT/ROLLBACK, which is because most callers would fail
> to cope with that.  You can instruct SPI to allow that by replacing the
> SPI_execute() call with something like
>
>                 SPIExecuteOptions options;
>
>                 ...
>                 memset(&options, 0, sizeof(options));
>                 options.allow_nonatomic = true;
>
>                 ret = SPI_execute_extended(buf.data, &options);
>

I completely forgot about the SPI execute options... Thanks for the
explanation!!!


> However, that's not enough to make this example work :-(.
> I find that it still fails inside the procedure's COMMIT,
> with
>
> 2021-09-13 15:14:54.775 EDT worker_spi[476310] ERROR:  portal snapshots
(0) did not account for all active snapshots (1)
> 2021-09-13 15:14:54.775 EDT worker_spi[476310] CONTEXT:  PL/pgSQL
function schema4.counted_proc() line 1 at COMMIT
>         SQL statement "CALL "schema4"."counted_proc"()"
>
> I think what this indicates is that worker_spi_main's cavalier
> management of the active snapshot isn't up to snuff for this
> use-case.  The error is coming from ForgetPortalSnapshots, which
> is expecting that all active snapshots are attached to Portals;
> but that one isn't.
>

That is exactly the root cause of all my investigation.

At Timescale we have a scheduler (background worker) that launches another
background worker to "execute a job", and by executing a job it means to
call a function [1] or a procedure [2] directly without a SPI.

But now a user raised an issue about snapshots [3] and when I saw the code
for the first time I tried to use SPI and it didn't work as expected.

Even tweaking worker_spi to execute the procedure without SPI by calling
ExecuteCallStmt (attached) we end up with the same situation about the
active snapshots:

2021-09-13 20:14:36.654 -03 [21483] LOG:  worker_spi worker 2 initialized
with schema2.counted
2021-09-13 20:14:36.655 -03 [21484] LOG:  worker_spi worker 1 initialized
with schema1.counted
2021-09-13 20:14:36.657 -03 [21483] ERROR:  portal snapshots (0) did not
account for all active snapshots (1)
2021-09-13 20:14:36.657 -03 [21483] CONTEXT:  PL/pgSQL function
schema2.counted_proc() line 1 at COMMIT
2021-09-13 20:14:36.657 -03 [21484] ERROR:  portal snapshots (0) did not
account for all active snapshots (1)
2021-09-13 20:14:36.657 -03 [21484] CONTEXT:  PL/pgSQL function
schema1.counted_proc() line 1 at COMMIT
2021-09-13 20:14:36.659 -03 [21476] LOG:  background worker "worker_spi"
(PID 21483) exited with exit code 1
2021-09-13 20:14:36.659 -03 [21476] LOG:  background worker "worker_spi"
(PID 21484) exited with exit code 1


> Probably the most appropriate fix is to make worker_spi_main
> set up a Portal to run the query inside of.  There are other
> bits of code that are not happy if they're not inside a Portal,
> so if you're hoping to run arbitrary SQL this way, sooner or
> later you're going to have to cross that bridge.
>

I started digging with it [4] by creating a Portal from scratch to execute
the Function or Procedure and it worked.

We're wondering if we can avoid the parser for PortalRun, can we??

Regards,

[1]
https://github.com/timescale/timescaledb/blob/master/tsl/src/bgw_policy/job.c#L726
[2]
https://github.com/timescale/timescaledb/blob/master/tsl/src/bgw_policy/job.c#L741
[3] https://github.com/timescale/timescaledb/issues/3545
[4]
https://github.com/fabriziomello/timescaledb/blob/issue/3545/tsl/src/bgw_policy/job.c#L824

-- 
Fabrízio de Royes Mello
diff --git a/src/test/modules/worker_spi/worker_spi.c b/src/test/modules/worker_spi/worker_spi.c
index d0acef2652..dace150fa7 100644
--- a/src/test/modules/worker_spi/worker_spi.c
+++ b/src/test/modules/worker_spi/worker_spi.c
@@ -42,6 +42,12 @@
 #include "utils/snapmgr.h"
 #include "tcop/utility.h"
 
+#include "nodes/makefuncs.h"
+#include "nodes/nodes.h"
+#include "nodes/pg_list.h"
+#include "parser/parse_func.h"
+#include "commands/defrem.h"
+
 PG_MODULE_MAGIC;
 
 PG_FUNCTION_INFO_V1(worker_spi_launch);
@@ -59,6 +65,7 @@ typedef struct worktable
 {
 	const char *schema;
 	const char *name;
+	const char *proc;
 } worktable;
 
 /*
@@ -108,8 +115,20 @@ initialize_worker_spi(worktable *table)
 						 "		type text CHECK (type IN ('total', 'delta')), "
 						 "		value	integer)"
 						 "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
-						 "WHERE type = 'total'",
-						 table->schema, table->name, table->name, table->name);
+						 "WHERE type = 'total'; "
+						 "CREATE PROCEDURE \"%s\".\"%s\"() AS $$ "
+						 "DECLARE "
+						 "  i INTEGER; "
+						 "BEGIN "
+						 "  FOR i IN 1..10 "
+						 "  LOOP "
+						 "    INSERT INTO \"%s\".\"%s\" VALUES ('delta', i); "
+						 "    COMMIT; "
+						 "  END LOOP; "
+						 "END; "
+						 "$$ LANGUAGE plpgsql; ",
+						 table->schema, table->name, table->name, table->name,
+						 table->schema, table->proc, table->schema, table->name);
 
 		/* set statement start time */
 		SetCurrentStatementStartTimestamp();
@@ -137,11 +156,16 @@ worker_spi_main(Datum main_arg)
 	worktable  *table;
 	StringInfoData buf;
 	char		name[20];
+	FuncExpr 	*funcexpr;
+	Oid 		proc;
+	ObjectWithArgs *object;
+	MemoryContext oldcontext = CurrentMemoryContext;
 
 	table = palloc(sizeof(worktable));
 	sprintf(name, "schema%d", index);
 	table->schema = pstrdup(name);
 	table->name = pstrdup("counted");
+	table->proc = pstrdup("counted_proc");
 
 	/* Establish signal handlers before unblocking signals. */
 	pqsignal(SIGHUP, SignalHandlerForConfigReload);
@@ -157,6 +181,27 @@ worker_spi_main(Datum main_arg)
 		 MyBgworkerEntry->bgw_name, table->schema, table->name);
 	initialize_worker_spi(table);
 
+	StartTransactionCommand();
+
+	/* build a function expression call */
+	object = makeNode(ObjectWithArgs);
+	object->objname = list_make2(makeString((char *)table->schema),
+								 makeString((char *)table->proc));
+	proc = LookupFuncWithArgs(OBJECT_ROUTINE, object, false);
+
+	CommitTransactionCommand();
+
+	MemoryContextSwitchTo(oldcontext);
+
+	funcexpr = makeFuncExpr(proc,
+							VOIDOID,
+							NIL,
+							InvalidOid,
+							InvalidOid,
+							COERCE_EXPLICIT_CALL);
+
+	MemoryContextSwitchTo(oldcontext);
+
 	/*
 	 * Quote identifiers passed to us.  Note that this must be done after
 	 * initialize_worker_spi, because that routine assumes the names are not
@@ -166,22 +211,10 @@ worker_spi_main(Datum main_arg)
 	 */
 	table->schema = quote_identifier(table->schema);
 	table->name = quote_identifier(table->name);
+	table->proc = quote_identifier(table->proc);
 
 	initStringInfo(&buf);
-	appendStringInfo(&buf,
-					 "WITH deleted AS (DELETE "
-					 "FROM %s.%s "
-					 "WHERE type = 'delta' RETURNING value), "
-					 "total AS (SELECT coalesce(sum(value), 0) as sum "
-					 "FROM deleted) "
-					 "UPDATE %s.%s "
-					 "SET value = %s.value + total.sum "
-					 "FROM total WHERE type = 'total' "
-					 "RETURNING %s.value",
-					 table->schema, table->name,
-					 table->schema, table->name,
-					 table->name,
-					 table->name);
+	appendStringInfo(&buf, "CALL %s.%s()", table->schema, table->proc);
 
 	/*
 	 * Main loop: do this until SIGTERM is received and processed by
@@ -189,7 +222,8 @@ worker_spi_main(Datum main_arg)
 	 */
 	for (;;)
 	{
-		int			ret;
+		CallStmt 		*call;
+		DestReceiver 	*dest;
 
 		/*
 		 * Background workers mustn't call usleep() or any direct equivalent:
@@ -232,39 +266,19 @@ worker_spi_main(Datum main_arg)
 		 */
 		SetCurrentStatementStartTimestamp();
 		StartTransactionCommand();
-		SPI_connect();
 		PushActiveSnapshot(GetTransactionSnapshot());
-		debug_query_string = buf.data;
 		pgstat_report_activity(STATE_RUNNING, buf.data);
 
-		/* We can now execute queries via SPI */
-		ret = SPI_execute(buf.data, false, 0);
-
-		if (ret != SPI_OK_UPDATE_RETURNING)
-			elog(FATAL, "cannot select from table %s.%s: error code %d",
-				 table->schema, table->name, ret);
-
-		if (SPI_processed > 0)
-		{
-			bool		isnull;
-			int32		val;
-
-			val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
-											  SPI_tuptable->tupdesc,
-											  1, &isnull));
-			if (!isnull)
-				elog(LOG, "%s: count in %s.%s is now %d",
-					 MyBgworkerEntry->bgw_name,
-					 table->schema, table->name, val);
-		}
+		call = makeNode(CallStmt);
+		call->funcexpr = funcexpr;
+		dest = CreateDestReceiver(DestNone);
+		ExecuteCallStmt(call, NULL, false, dest);
 
 		/*
 		 * And finish our transaction.
 		 */
-		SPI_finish();
 		PopActiveSnapshot();
 		CommitTransactionCommand();
-		debug_query_string = NULL;
 		pgstat_report_stat(false);
 		pgstat_report_activity(STATE_IDLE, NULL);
 	}

Reply via email to