On Mon, Sep 13, 2021 at 4:30 PM Tom Lane <t...@sss.pgh.pa.us> wrote: > > The direct cause of that is that SPI_execute() doesn't permit the called > query to perform COMMIT/ROLLBACK, which is because most callers would fail > to cope with that. You can instruct SPI to allow that by replacing the > SPI_execute() call with something like > > SPIExecuteOptions options; > > ... > memset(&options, 0, sizeof(options)); > options.allow_nonatomic = true; > > ret = SPI_execute_extended(buf.data, &options); >
I completely forgot about the SPI execute options... Thanks for the explanation!!! > However, that's not enough to make this example work :-(. > I find that it still fails inside the procedure's COMMIT, > with > > 2021-09-13 15:14:54.775 EDT worker_spi[476310] ERROR: portal snapshots (0) did not account for all active snapshots (1) > 2021-09-13 15:14:54.775 EDT worker_spi[476310] CONTEXT: PL/pgSQL function schema4.counted_proc() line 1 at COMMIT > SQL statement "CALL "schema4"."counted_proc"()" > > I think what this indicates is that worker_spi_main's cavalier > management of the active snapshot isn't up to snuff for this > use-case. The error is coming from ForgetPortalSnapshots, which > is expecting that all active snapshots are attached to Portals; > but that one isn't. > That is exactly the root cause of all my investigation. At Timescale we have a scheduler (background worker) that launches another background worker to "execute a job", and by executing a job it means to call a function [1] or a procedure [2] directly without a SPI. But now a user raised an issue about snapshots [3] and when I saw the code for the first time I tried to use SPI and it didn't work as expected. Even tweaking worker_spi to execute the procedure without SPI by calling ExecuteCallStmt (attached) we end up with the same situation about the active snapshots: 2021-09-13 20:14:36.654 -03 [21483] LOG: worker_spi worker 2 initialized with schema2.counted 2021-09-13 20:14:36.655 -03 [21484] LOG: worker_spi worker 1 initialized with schema1.counted 2021-09-13 20:14:36.657 -03 [21483] ERROR: portal snapshots (0) did not account for all active snapshots (1) 2021-09-13 20:14:36.657 -03 [21483] CONTEXT: PL/pgSQL function schema2.counted_proc() line 1 at COMMIT 2021-09-13 20:14:36.657 -03 [21484] ERROR: portal snapshots (0) did not account for all active snapshots (1) 2021-09-13 20:14:36.657 -03 [21484] CONTEXT: PL/pgSQL function schema1.counted_proc() line 1 at COMMIT 2021-09-13 20:14:36.659 -03 [21476] LOG: background worker "worker_spi" (PID 21483) exited with exit code 1 2021-09-13 20:14:36.659 -03 [21476] LOG: background worker "worker_spi" (PID 21484) exited with exit code 1 > Probably the most appropriate fix is to make worker_spi_main > set up a Portal to run the query inside of. There are other > bits of code that are not happy if they're not inside a Portal, > so if you're hoping to run arbitrary SQL this way, sooner or > later you're going to have to cross that bridge. > I started digging with it [4] by creating a Portal from scratch to execute the Function or Procedure and it worked. We're wondering if we can avoid the parser for PortalRun, can we?? Regards, [1] https://github.com/timescale/timescaledb/blob/master/tsl/src/bgw_policy/job.c#L726 [2] https://github.com/timescale/timescaledb/blob/master/tsl/src/bgw_policy/job.c#L741 [3] https://github.com/timescale/timescaledb/issues/3545 [4] https://github.com/fabriziomello/timescaledb/blob/issue/3545/tsl/src/bgw_policy/job.c#L824 -- Fabrízio de Royes Mello
diff --git a/src/test/modules/worker_spi/worker_spi.c b/src/test/modules/worker_spi/worker_spi.c index d0acef2652..dace150fa7 100644 --- a/src/test/modules/worker_spi/worker_spi.c +++ b/src/test/modules/worker_spi/worker_spi.c @@ -42,6 +42,12 @@ #include "utils/snapmgr.h" #include "tcop/utility.h" +#include "nodes/makefuncs.h" +#include "nodes/nodes.h" +#include "nodes/pg_list.h" +#include "parser/parse_func.h" +#include "commands/defrem.h" + PG_MODULE_MAGIC; PG_FUNCTION_INFO_V1(worker_spi_launch); @@ -59,6 +65,7 @@ typedef struct worktable { const char *schema; const char *name; + const char *proc; } worktable; /* @@ -108,8 +115,20 @@ initialize_worker_spi(worktable *table) " type text CHECK (type IN ('total', 'delta')), " " value integer)" "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) " - "WHERE type = 'total'", - table->schema, table->name, table->name, table->name); + "WHERE type = 'total'; " + "CREATE PROCEDURE \"%s\".\"%s\"() AS $$ " + "DECLARE " + " i INTEGER; " + "BEGIN " + " FOR i IN 1..10 " + " LOOP " + " INSERT INTO \"%s\".\"%s\" VALUES ('delta', i); " + " COMMIT; " + " END LOOP; " + "END; " + "$$ LANGUAGE plpgsql; ", + table->schema, table->name, table->name, table->name, + table->schema, table->proc, table->schema, table->name); /* set statement start time */ SetCurrentStatementStartTimestamp(); @@ -137,11 +156,16 @@ worker_spi_main(Datum main_arg) worktable *table; StringInfoData buf; char name[20]; + FuncExpr *funcexpr; + Oid proc; + ObjectWithArgs *object; + MemoryContext oldcontext = CurrentMemoryContext; table = palloc(sizeof(worktable)); sprintf(name, "schema%d", index); table->schema = pstrdup(name); table->name = pstrdup("counted"); + table->proc = pstrdup("counted_proc"); /* Establish signal handlers before unblocking signals. */ pqsignal(SIGHUP, SignalHandlerForConfigReload); @@ -157,6 +181,27 @@ worker_spi_main(Datum main_arg) MyBgworkerEntry->bgw_name, table->schema, table->name); initialize_worker_spi(table); + StartTransactionCommand(); + + /* build a function expression call */ + object = makeNode(ObjectWithArgs); + object->objname = list_make2(makeString((char *)table->schema), + makeString((char *)table->proc)); + proc = LookupFuncWithArgs(OBJECT_ROUTINE, object, false); + + CommitTransactionCommand(); + + MemoryContextSwitchTo(oldcontext); + + funcexpr = makeFuncExpr(proc, + VOIDOID, + NIL, + InvalidOid, + InvalidOid, + COERCE_EXPLICIT_CALL); + + MemoryContextSwitchTo(oldcontext); + /* * Quote identifiers passed to us. Note that this must be done after * initialize_worker_spi, because that routine assumes the names are not @@ -166,22 +211,10 @@ worker_spi_main(Datum main_arg) */ table->schema = quote_identifier(table->schema); table->name = quote_identifier(table->name); + table->proc = quote_identifier(table->proc); initStringInfo(&buf); - appendStringInfo(&buf, - "WITH deleted AS (DELETE " - "FROM %s.%s " - "WHERE type = 'delta' RETURNING value), " - "total AS (SELECT coalesce(sum(value), 0) as sum " - "FROM deleted) " - "UPDATE %s.%s " - "SET value = %s.value + total.sum " - "FROM total WHERE type = 'total' " - "RETURNING %s.value", - table->schema, table->name, - table->schema, table->name, - table->name, - table->name); + appendStringInfo(&buf, "CALL %s.%s()", table->schema, table->proc); /* * Main loop: do this until SIGTERM is received and processed by @@ -189,7 +222,8 @@ worker_spi_main(Datum main_arg) */ for (;;) { - int ret; + CallStmt *call; + DestReceiver *dest; /* * Background workers mustn't call usleep() or any direct equivalent: @@ -232,39 +266,19 @@ worker_spi_main(Datum main_arg) */ SetCurrentStatementStartTimestamp(); StartTransactionCommand(); - SPI_connect(); PushActiveSnapshot(GetTransactionSnapshot()); - debug_query_string = buf.data; pgstat_report_activity(STATE_RUNNING, buf.data); - /* We can now execute queries via SPI */ - ret = SPI_execute(buf.data, false, 0); - - if (ret != SPI_OK_UPDATE_RETURNING) - elog(FATAL, "cannot select from table %s.%s: error code %d", - table->schema, table->name, ret); - - if (SPI_processed > 0) - { - bool isnull; - int32 val; - - val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0], - SPI_tuptable->tupdesc, - 1, &isnull)); - if (!isnull) - elog(LOG, "%s: count in %s.%s is now %d", - MyBgworkerEntry->bgw_name, - table->schema, table->name, val); - } + call = makeNode(CallStmt); + call->funcexpr = funcexpr; + dest = CreateDestReceiver(DestNone); + ExecuteCallStmt(call, NULL, false, dest); /* * And finish our transaction. */ - SPI_finish(); PopActiveSnapshot(); CommitTransactionCommand(); - debug_query_string = NULL; pgstat_report_stat(false); pgstat_report_activity(STATE_IDLE, NULL); }