So this is what I came up with on plane. Generalized the loading functionality into load_library_function which that can load either known postgres functions or call load_external_function.
I am not quite sure if fmgr.c is best place to put it, but I didn't want to include stuff from executor in bgworker.c. I was thinking about putting it to dfmgr.c (as that's where load_external_function) but again seemed weird to including executor there. As with previous patch, 9.6 will need quite different patch as we need to keep compatibility there, but since I am traveling I think it's better to share what I have so far. -- Petr Jelinek http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel index db9ac3d..b360887 100644 --- a/src/backend/access/transam/README.parallel +++ b/src/backend/access/transam/README.parallel @@ -198,7 +198,7 @@ pattern looks like this: EnterParallelMode(); /* prohibit unsafe state changes */ - pcxt = CreateParallelContext(entrypoint, nworkers); + pcxt = CreateParallelContext("library_name", "function_name", nworkers); /* Allow space for application-specific data here. */ shm_toc_estimate_chunk(&pcxt->estimator, size); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index b3d3853..326d4f9 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -61,7 +61,7 @@ #define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006) #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007) #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008) -#define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF0009) +#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009) /* Fixed-size parallel state. */ typedef struct FixedParallelState @@ -77,9 +77,6 @@ typedef struct FixedParallelState pid_t parallel_master_pid; BackendId parallel_master_backend_id; - /* Entrypoint for parallel workers. */ - parallel_worker_main_type entrypoint; - /* Mutex protects remaining fields. */ slock_t mutex; @@ -109,7 +106,6 @@ static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list); /* Private functions. */ static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg); -static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc); static void WaitForParallelWorkersToExit(ParallelContext *pcxt); @@ -119,7 +115,8 @@ static void WaitForParallelWorkersToExit(ParallelContext *pcxt); * destroyed before exiting the current subtransaction. */ ParallelContext * -CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers) +CreateParallelContext(const char *library_name, const char *function_name, + int nworkers) { MemoryContext oldcontext; ParallelContext *pcxt; @@ -152,7 +149,8 @@ CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers) pcxt = palloc0(sizeof(ParallelContext)); pcxt->subid = GetCurrentSubTransactionId(); pcxt->nworkers = nworkers; - pcxt->entrypoint = entrypoint; + pcxt->library_name = pstrdup(library_name); + pcxt->function_name = pstrdup(function_name); pcxt->error_context_stack = error_context_stack; shm_toc_initialize_estimator(&pcxt->estimator); dlist_push_head(&pcxt_list, &pcxt->node); @@ -164,33 +162,6 @@ CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers) } /* - * Establish a new parallel context that calls a function provided by an - * extension. This works around the fact that the library might get mapped - * at a different address in each backend. - */ -ParallelContext * -CreateParallelContextForExternalFunction(char *library_name, - char *function_name, - int nworkers) -{ - MemoryContext oldcontext; - ParallelContext *pcxt; - - /* We might be running in a very short-lived memory context. */ - oldcontext = MemoryContextSwitchTo(TopTransactionContext); - - /* Create the context. */ - pcxt = CreateParallelContext(ParallelExtensionTrampoline, nworkers); - pcxt->library_name = pstrdup(library_name); - pcxt->function_name = pstrdup(function_name); - - /* Restore previous memory context. */ - MemoryContextSwitchTo(oldcontext); - - return pcxt; -} - -/* * Establish the dynamic shared memory segment for a parallel context and * copy state and other bookkeeping information that will be needed by * parallel workers into it. @@ -249,15 +220,10 @@ InitializeParallelDSM(ParallelContext *pcxt) pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); - /* Estimate how much we'll need for extension entrypoint info. */ - if (pcxt->library_name != NULL) - { - Assert(pcxt->entrypoint == ParallelExtensionTrampoline); - Assert(pcxt->function_name != NULL); - shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) - + strlen(pcxt->function_name) + 2); - shm_toc_estimate_keys(&pcxt->estimator, 1); - } + /* Estimate how much we'll need for the entrypoint info. */ + shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) + + strlen(pcxt->function_name) + 2); + shm_toc_estimate_keys(&pcxt->estimator, 1); } /* @@ -297,7 +263,6 @@ InitializeParallelDSM(ParallelContext *pcxt) fps->parallel_master_pgproc = MyProc; fps->parallel_master_pid = MyProcPid; fps->parallel_master_backend_id = MyBackendId; - fps->entrypoint = pcxt->entrypoint; SpinLockInit(&fps->mutex); fps->last_xlog_end = 0; shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps); @@ -312,6 +277,8 @@ InitializeParallelDSM(ParallelContext *pcxt) char *asnapspace; char *tstatespace; char *error_queue_space; + char *entrypointstate; + Size lnamelen; /* Serialize shared libraries we have loaded. */ libraryspace = shm_toc_allocate(pcxt->toc, library_len); @@ -368,19 +335,18 @@ InitializeParallelDSM(ParallelContext *pcxt) } shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space); - /* Serialize extension entrypoint information. */ - if (pcxt->library_name != NULL) - { - Size lnamelen = strlen(pcxt->library_name); - char *extensionstate; - - extensionstate = shm_toc_allocate(pcxt->toc, lnamelen - + strlen(pcxt->function_name) + 2); - strcpy(extensionstate, pcxt->library_name); - strcpy(extensionstate + lnamelen + 1, pcxt->function_name); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE, - extensionstate); - } + /* + * Serialize extension entrypoint information. It's unsafe to pass + * function pointers across parallel processes as the function pointer + * may be different in new process in EXEC_BACKEND builds so we + * always pass library and function name. + */ + lnamelen = strlen(pcxt->library_name); + entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen + + strlen(pcxt->function_name) + 2); + strcpy(entrypointstate, pcxt->library_name); + strcpy(entrypointstate + lnamelen + 1, pcxt->function_name); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate); } /* Restore previous memory context. */ @@ -946,7 +912,11 @@ ParallelWorkerMain(Datum main_arg) char *tsnapspace; char *asnapspace; char *tstatespace; + char *entrypointstate; + char *library_name; + char *function_name; StringInfoData msgbuf; + parallel_worker_main_type entrypt; /* Set flag to indicate that we're initializing a parallel worker. */ InitializingParallelWorker = true; @@ -1077,6 +1047,15 @@ ParallelWorkerMain(Datum main_arg) Assert(asnapspace != NULL); PushActiveSnapshot(RestoreSnapshot(asnapspace)); + /* Load the entry point. */ + entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT); + Assert(entrypointstate != NULL); + library_name = entrypointstate; + function_name = entrypointstate + strlen(library_name) + 1; + + entrypt = (parallel_worker_main_type) + load_library_function(library_name, function_name); + /* * We've changed which tuples we can see, and must therefore invalidate * system caches. @@ -1102,11 +1081,8 @@ ParallelWorkerMain(Datum main_arg) /* * Time to do the real work: invoke the caller-supplied code. - * - * If you get a crash at this line, see the comments for - * ParallelExtensionTrampoline. */ - fps->entrypoint(seg, toc); + entrypt(seg, toc); /* Must exit parallel mode to pop active snapshot. */ ExitParallelMode(); @@ -1122,33 +1098,6 @@ ParallelWorkerMain(Datum main_arg) } /* - * It's unsafe for the entrypoint invoked by ParallelWorkerMain to be a - * function living in a dynamically loaded module, because the module might - * not be loaded in every process, or might be loaded but not at the same - * address. To work around that problem, CreateParallelContextForExtension() - * arranges to call this function rather than calling the extension-provided - * function directly; and this function then looks up the real entrypoint and - * calls it. - */ -static void -ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc) -{ - char *extensionstate; - char *library_name; - char *function_name; - parallel_worker_main_type entrypt; - - extensionstate = shm_toc_lookup(toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE); - Assert(extensionstate != NULL); - library_name = extensionstate; - function_name = extensionstate + strlen(library_name) + 1; - - entrypt = (parallel_worker_main_type) - load_external_function(library_name, function_name, true, NULL); - entrypt(seg, toc); -} - -/* * Update shared memory with the ending location of the last WAL record we * wrote, if it's greater than the value already stored there. */ diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 469a32c..49ea1e0 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -112,8 +112,7 @@ static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt, static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation); -/* Helper functions that run in the parallel worker. */ -static void ParallelQueryMain(dsm_segment *seg, shm_toc *toc); +/* Helper function that run in the parallel worker. */ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc); /* @@ -393,7 +392,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) pstmt_data = ExecSerializePlan(planstate->plan, estate); /* Create a parallel context. */ - pcxt = CreateParallelContext(ParallelQueryMain, nworkers); + pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers); pei->pcxt = pcxt; /* @@ -814,7 +813,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) * to do this are also stored in the dsm_segment and can be accessed through * the shm_toc. */ -static void +void ParallelQueryMain(dsm_segment *seg, shm_toc *toc) { BufferUsage *buffer_usage; diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 0823317..0a9ca98 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -15,14 +15,11 @@ #include <unistd.h> #include "libpq/pqsignal.h" -#include "access/parallel.h" #include "miscadmin.h" #include "pgstat.h" #include "port/atomics.h" #include "postmaster/bgworker_internals.h" #include "postmaster/postmaster.h" -#include "replication/logicallauncher.h" -#include "replication/logicalworker.h" #include "storage/dsm.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -111,27 +108,6 @@ struct BackgroundWorkerHandle static BackgroundWorkerArray *BackgroundWorkerData; /* - * List of internal background workers. These are used for mapping the - * function name to actual function when building with EXEC_BACKEND and also - * to allow these to be loaded outside of shared_preload_libraries. - */ -typedef struct InternalBGWorkerMain -{ - char *bgw_function_name; - bgworker_main_type bgw_main; -} InternalBGWorkerMain; - -static const InternalBGWorkerMain InternalBGWorkers[] = { - {"ParallelWorkerMain", ParallelWorkerMain}, - {"ApplyLauncherMain", ApplyLauncherMain}, - {"ApplyWorkerMain", ApplyWorkerMain}, - /* Dummy entry marking end of the array. */ - {NULL, NULL} -}; - -static bgworker_main_type GetInternalBgWorkerMain(BackgroundWorker *worker); - -/* * Calculate shared memory needed. */ Size @@ -776,18 +752,14 @@ StartBackgroundWorker(void) } /* - * For internal workers set the entry point to known function address. - * Otherwise use the entry point specified by library name (which will - * be loaded, if necessary) and a function name (which will be looked up - * in the named library). + * Load the function. For internal workers the function will be loaded + * according to known function map (see fmgr.c). Otherwise use the entry + * point specified by library name (which will be loaded, if necessary) + * and a function name (which will be looked up in the named library). */ - entrypt = GetInternalBgWorkerMain(worker); - - if (entrypt == NULL) - entrypt = (bgworker_main_type) - load_external_function(worker->bgw_library_name, - worker->bgw_function_name, - true, NULL); + entrypt = (bgworker_main_type) + load_library_function(worker->bgw_library_name, + worker->bgw_function_name); /* * Note that in normal processes, we would call InitPostgres here. For a @@ -806,10 +778,11 @@ StartBackgroundWorker(void) } /* - * Register a new background worker while processing shared_preload_libraries. + * Register a new static background worker. * - * This can only be called in the _PG_init function of a module library - * that's loaded by shared_preload_libraries; otherwise it has no effect. + * This can only be called directly from postmaster or in the _PG_init + * function of a module library that's loaded by shared_preload_libraries; + * otherwise it will have no effect. */ void RegisterBackgroundWorker(BackgroundWorker *worker) @@ -822,7 +795,7 @@ RegisterBackgroundWorker(BackgroundWorker *worker) (errmsg("registering background worker \"%s\"", worker->bgw_name))); if (!process_shared_preload_libraries_in_progress && - GetInternalBgWorkerMain(worker) == NULL) + strcmp(worker->bgw_library_name, "postgres") != 0) { if (!IsUnderPostmaster) ereport(LOG, @@ -1152,28 +1125,3 @@ TerminateBackgroundWorker(BackgroundWorkerHandle *handle) if (signal_postmaster) SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE); } - -/* - * Search the known internal worker array and return its main function - * pointer if found. - * - * Returns NULL if not known internal worker. - */ -static bgworker_main_type -GetInternalBgWorkerMain(BackgroundWorker *worker) -{ - int i; - - /* Internal workers always have to use postgres as library name. */ - if (strncmp(worker->bgw_library_name, "postgres", BGW_MAXLEN) != 0) - return NULL; - - for (i = 0; InternalBGWorkers[i].bgw_function_name; i++) - { - if (strncmp(InternalBGWorkers[i].bgw_function_name, - worker->bgw_function_name, BGW_MAXLEN) == 0) - return InternalBGWorkers[i].bgw_main; - } - - return NULL; -} diff --git a/src/backend/utils/fmgr/fmgr.c b/src/backend/utils/fmgr/fmgr.c index 68d2110..c23db6a 100644 --- a/src/backend/utils/fmgr/fmgr.c +++ b/src/backend/utils/fmgr/fmgr.c @@ -15,14 +15,18 @@ #include "postgres.h" +#include "access/parallel.h" #include "access/tuptoaster.h" #include "catalog/pg_language.h" #include "catalog/pg_proc.h" +#include "executor/execParallel.h" #include "executor/functions.h" #include "lib/stringinfo.h" #include "miscadmin.h" #include "nodes/nodeFuncs.h" #include "pgstat.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/fmgrtab.h" @@ -61,6 +65,25 @@ static void record_C_func(HeapTuple procedureTuple, PGFunction user_fn, const Pg_finfo_record *inforec); static Datum fmgr_security_definer(PG_FUNCTION_ARGS); +/* + * These are used for mapping the function name to actual function when + * building with EXEC_BACKEND and also to allow bgworkers using these outside + * of shared_preload_libraries. + */ +typedef struct KnownFunctionPointer +{ + char *fn_name; + void *fn_addr; +} KnownFunctionPointer; + +static const KnownFunctionPointer known_functions_map[] = { + {"ParallelWorkerMain", ParallelWorkerMain}, + {"ParallelQueryMain", ParallelQueryMain}, + {"ApplyLauncherMain", ApplyLauncherMain}, + {"ApplyWorkerMain", ApplyWorkerMain}, + /* Dummy entry marking end of the array. */ + {NULL, NULL} +}; /* * Lookup routines for builtin-function table. We can search by either Oid @@ -691,6 +714,36 @@ fmgr_security_definer(PG_FUNCTION_ARGS) return result; } +/* + * This is similar to load_external_function (which is may call) but will + * also try to match funct name to internal known functions map when asked to + * load postgres function. + */ +void * +load_library_function(char *libraryname, char *funcname) +{ + /* + * If the function is to be loaded from postgres itself, search the known + * functions map. + */ + if (strcmp(libraryname, "postgres") == 0) + { + int i; + + for (i = 0; known_functions_map[i].fn_name; i++) + { + if (strcmp(known_functions_map[i].fn_name, + funcname) == 0) + return known_functions_map[i].fn_addr; + } + + /* We can only reach this by programming error. */ + elog(ERROR, "internal function \"%s\" not found", funcname); + } + + /* Otherwise load from external library. */ + return load_external_function(libraryname, funcname, true, NULL); +} /*------------------------------------------------------------------------- * Support routines for callers of fmgr-compatible functions diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 5065a38..b869727 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -52,8 +52,7 @@ extern bool InitializingParallelWorker; #define IsParallelWorker() (ParallelWorkerNumber >= 0) -extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers); -extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers); +extern ParallelContext *CreateParallelContext(const char *library_name, const char *function_name, int nworkers); extern void InitializeParallelDSM(ParallelContext *pcxt); extern void ReinitializeParallelDSM(ParallelContext *pcxt); extern void LaunchParallelWorkers(ParallelContext *pcxt); diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 8bc4270..0b7ca59 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -38,4 +38,6 @@ extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei); extern void ExecParallelReinitialize(ParallelExecutorInfo *pei); +extern void ParallelQueryMain(dsm_segment *seg, shm_toc *toc); + #endif /* EXECPARALLEL_H */ diff --git a/src/include/fmgr.h b/src/include/fmgr.h index 0c695e2..2e41edf 100644 --- a/src/include/fmgr.h +++ b/src/include/fmgr.h @@ -667,6 +667,7 @@ extern bool get_fn_expr_arg_stable(FmgrInfo *flinfo, int argnum); extern bool get_call_expr_arg_stable(fmNodePtr expr, int argnum); extern bool get_fn_expr_variadic(FmgrInfo *flinfo); extern bool CheckFunctionValidatorAccess(Oid validatorOid, Oid functionOid); +extern void *load_library_function(char *libraryname, char *funcname); /* * Routines in dfmgr.c
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers