Hi, hackers!

I want to propose the patch that allows to define custom signals and their handlers on extension side. It is based on ProcSignal module, namely it defines the fixed set (number is specified by constant) of custom signals that could be reserved on postgres initialization stage (in _PG_init function of shared preloaded module) through specific interface functions. Functions that are custom signal handlers are defined in extension. The relationship between custom signals and assigned handlers (function addresses) is replicated from postmaster to child processes including working backends. Using this signals we are able to call specific handler (via SendProcSignal function) on remote backend that is actually come into action in CHECK_FOR_INTERRUPTS point.

In perspective, this mechanism provides the low-level instrument to define remote procedure call on extension side. The simple RPC that defines effective userid on remote backend (remote_effective_user function) is attached for example.

C&C welcome!


--
Regards,
Maksim Milyutin

diff --git a/src/backend/storage/ipc/procsignal.c 
b/src/backend/storage/ipc/procsignal.c
index b9302ac630..75d4ea72b7 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -27,6 +27,7 @@
 #include "storage/shmem.h"
 #include "storage/sinval.h"
 #include "tcop/tcopprot.h"
+#include "utils/memutils.h"
 
 
 /*
@@ -60,12 +61,17 @@ typedef struct
  */
 #define NumProcSignalSlots     (MaxBackends + NUM_AUXPROCTYPES)
 
+static bool CustomSignalPendings[NUM_CUSTOM_PROCSIGNALS];
+static ProcSignalHandler_type CustomHandlers[NUM_CUSTOM_PROCSIGNALS];
+
 static ProcSignalSlot *ProcSignalSlots = NULL;
 static volatile ProcSignalSlot *MyProcSignalSlot = NULL;
 
 static bool CheckProcSignal(ProcSignalReason reason);
 static void CleanupProcSignalState(int status, Datum arg);
 
+static void CustomSignalInterrupt(ProcSignalReason reason);
+
 /*
  * ProcSignalShmemSize
  *             Compute space needed for procsignal's shared memory
@@ -166,6 +172,70 @@ CleanupProcSignalState(int status, Datum arg)
 }
 
 /*
+ * RegisterCustomProcSignalHandler
+ *             Assign specific handler of custom process signal with new 
ProcSignalReason key
+ *
+ * Return INVALID_PROCSIGNAL if all custom signals have been assigned.
+ */
+ProcSignalReason
+RegisterCustomProcSignalHandler(ProcSignalHandler_type handler)
+{
+       ProcSignalReason reason;
+
+       /* iterate through custom signal keys to find free spot */
+       for (reason = PROCSIG_CUSTOM_1; reason <= PROCSIG_CUSTOM_N; reason++)
+               if (!CustomHandlers[reason - PROCSIG_CUSTOM_1])
+               {
+                       CustomHandlers[reason - PROCSIG_CUSTOM_1] = handler;
+                       return reason;
+               }
+       return INVALID_PROCSIGNAL;
+}
+
+/*
+ * ReleaseCustomProcSignal
+ *      Release slot of specific custom signal
+ */
+void
+ReleaseCustomProcSignal(ProcSignalReason reason)
+{
+       CustomHandlers[reason - PROCSIG_CUSTOM_1] = NULL;
+}
+
+/*
+ * AssignCustomProcSignalHandler
+ *             Assign handler of custom process signal with specific 
ProcSignalReason key
+ *
+ * Return old ProcSignal handler.
+ * Assume incoming reason is one of custom ProcSignals.
+ */
+ProcSignalHandler_type
+AssignCustomProcSignalHandler(ProcSignalReason reason, ProcSignalHandler_type 
handler)
+{
+       ProcSignalHandler_type old;
+
+       Assert(reason >= PROCSIG_CUSTOM_1 && reason <= PROCSIG_CUSTOM_N);
+
+       old = CustomHandlers[reason - PROCSIG_CUSTOM_1];
+       CustomHandlers[reason - PROCSIG_CUSTOM_1] = handler;
+       return old;
+}
+
+/*
+ * GetCustomProcSignalHandler
+ *             Get handler of custom process signal
+ *
+ * Assume incoming reason is one of custom ProcSignals.
+ */
+ProcSignalHandler_type
+GetCustomProcSignalHandler(ProcSignalReason reason)
+{
+       Assert(reason >= PROCSIG_CUSTOM_1 && reason <= PROCSIG_CUSTOM_N);
+
+       return CustomHandlers[reason - PROCSIG_CUSTOM_1];
+}
+
+/*
  * SendProcSignal
  *             Send a signal to a Postgres process
  *
@@ -260,7 +330,8 @@ CheckProcSignal(ProcSignalReason reason)
 void
 procsignal_sigusr1_handler(SIGNAL_ARGS)
 {
-       int                     save_errno = errno;
+       int                                     save_errno = errno;
+       ProcSignalReason        reason;
 
        if (CheckProcSignal(PROCSIG_CATCHUP_INTERRUPT))
                HandleCatchupInterrupt();
@@ -292,9 +363,84 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
        if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
                RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
 
+       for (reason = PROCSIG_CUSTOM_1; reason <= PROCSIG_CUSTOM_N; reason++)
+               if (CheckProcSignal(reason))
+                       CustomSignalInterrupt(reason);
+
        SetLatch(MyLatch);
 
        latch_sigusr1_handler();
 
        errno = save_errno;
 }
+
+/*
+ * Handle receipt of an interrupt indicating a custom process signal.
+ */
+static void
+CustomSignalInterrupt(ProcSignalReason reason)
+{
+       int     save_errno = errno;
+
+       Assert(reason >= PROCSIG_CUSTOM_1 && reason <= PROCSIG_CUSTOM_N);
+
+       /* set interrupt flags */
+       InterruptPending = true;
+       CustomSignalPendings[reason - PROCSIG_CUSTOM_1] = true;
+
+       /* make sure the event is processed in due course */
+       SetLatch(MyLatch);
+
+       errno = save_errno;
+}
+
+/*
+ * CheckAndHandleCustomSignals
+ *             Check custom signal flags and call handler assigned to that 
signal
+ *             if it is not NULL
+ *
+ * This function is called within CHECK_FOR_INTERRUPTS if interrupt have been
+ * occurred. Skeleton is the same as in HandleParallelMessageInterrupt()
+ */
+void
+CheckAndHandleCustomSignals(void)
+{
+       int i;
+       MemoryContext oldcontext;
+
+       static MemoryContext hcs_context = NULL;
+
+       /* Disable interrupts to avoid recursive calls */
+       HOLD_INTERRUPTS();
+
+       /*
+        * Create specific subtop-level memory context in first call of this 
handler
+        */
+       if (hcs_context == NULL)
+               hcs_context = AllocSetContextCreate(TopMemoryContext,
+                                                                               
        "HandleCustomSignals",
+                                                                               
        ALLOCSET_DEFAULT_SIZES);
+       else
+               MemoryContextReset(hcs_context);
+
+       oldcontext = MemoryContextSwitchTo(hcs_context);
+
+       /* Check on expiring of custom signals and call its handlers if exist */
+       for (i = 0; i < NUM_CUSTOM_PROCSIGNALS; i++)
+               if (CustomSignalPendings[i])
+               {
+                       ProcSignalHandler_type handler;
+
+                       CustomSignalPendings[i] = false;
+                       handler = CustomHandlers[i];
+                       if (handler)
+                               handler();
+               }
+
+       MemoryContextSwitchTo(oldcontext);
+
+       /* Might as well clear the context on our way out */
+       MemoryContextReset(hcs_context);
+
+       RESUME_INTERRUPTS();
+}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 1b24dddbce..b8f54dc74c 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -3051,6 +3051,8 @@ ProcessInterrupts(void)
 
        if (ParallelMessagePending)
                HandleParallelMessages();
+
+       CheckAndHandleCustomSignals();
 }
 
 
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index 20bb05b177..8122deda86 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -17,6 +17,8 @@
 #include "storage/backendid.h"
 
 
+#define NUM_CUSTOM_PROCSIGNALS 64
+
 /*
  * Reasons for signalling a Postgres child process (a backend or an auxiliary
  * process, like checkpointer).  We can cope with concurrent signals for 
different
@@ -29,6 +31,8 @@
  */
 typedef enum
 {
+       INVALID_PROCSIGNAL = -1,        /* Must be first */
+
        PROCSIG_CATCHUP_INTERRUPT,      /* sinval catchup interrupt */
        PROCSIG_NOTIFY_INTERRUPT,       /* listen/notify interrupt */
        PROCSIG_PARALLEL_MESSAGE,       /* message from cooperating parallel 
backend */
@@ -42,9 +46,20 @@ typedef enum
        PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
        PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
+       PROCSIG_CUSTOM_1,
+       /*
+        * PROCSIG_CUSTOM_2,
+        * ...,
+        * PROCSIG_CUSTOM_N-1,
+        */
+       PROCSIG_CUSTOM_N = PROCSIG_CUSTOM_1 + NUM_CUSTOM_PROCSIGNALS - 1,
+
        NUM_PROCSIGNALS                         /* Must be last! */
 } ProcSignalReason;
 
+/* Handler of custom process signal */
+typedef void (*ProcSignalHandler_type) (void);
+
 /*
  * prototypes for functions in procsignal.c
  */
@@ -52,9 +67,16 @@ extern Size ProcSignalShmemSize(void);
 extern void ProcSignalShmemInit(void);
 
 extern void ProcSignalInit(int pss_idx);
+extern ProcSignalReason RegisterCustomProcSignalHandler(ProcSignalHandler_type 
handler);
+extern void ReleaseCustomProcSignal(ProcSignalReason reason);
+extern ProcSignalHandler_type AssignCustomProcSignalHandler(ProcSignalReason 
reason,
+                          ProcSignalHandler_type handler);
+extern ProcSignalHandler_type GetCustomProcSignalHandler(ProcSignalReason 
reason);
 extern int SendProcSignal(pid_t pid, ProcSignalReason reason,
                           BackendId backendId);
 
+extern void CheckAndHandleCustomSignals(void);
+
 extern void procsignal_sigusr1_handler(SIGNAL_ARGS);
 
 #endif                                                 /* PROCSIGNAL_H */
#include "postgres.h"

#include "fmgr.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "portability/instr_time.h"
#include "port/atomics.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/procsignal.h"
#include "storage/s_lock.h"
#include "storage/shmem.h"
#include "storage/spin.h"
#include "utils/elog.h"

#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif

typedef struct {
        slock_t  mutex;
        Oid              userid;
        bool     isSet;
        Latch   *callerLatch;
} UserIdSlot;
const Size UserIdSlotSize = BUFFERALIGN(sizeof(UserIdSlot));

static ProcSignalReason extractEffectiveUserReason;
static UserIdSlot               *userIdSlot = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;

static void foo_shmem_startup();
static void sendEffectiveUserId();
static Oid extractEffectiveUserId(pid_t remoteSessionId);

void
_PG_init()
{
    elog(LOG, "Load. My PID = %d", MyProcPid);

    extractEffectiveUserReason =
        RegisterCustomProcSignalHandler(sendEffectiveUserId);
        if (extractEffectiveUserReason == INVALID_PROCSIGNAL)
        {
                elog(WARNING, "Insufficient custom ProcSignal slots");
                return;
        }

        RequestAddinShmemSpace(UserIdSlotSize);

        prev_shmem_startup_hook = shmem_startup_hook;
        shmem_startup_hook = foo_shmem_startup;
}

void
foo_shmem_startup()
{
        bool    found;

    elog(LOG, "Stand out shmem. My PID = %d", MyProcPid);

        userIdSlot = ShmemInitStruct("foo userid slot", UserIdSlotSize, &found);

        if (prev_shmem_startup_hook)
                prev_shmem_startup_hook();
}

void
_PG_fini()
{
    elog(LOG, "Unload. My PID = %d", MyProcPid);

    ReleaseCustomProcSignal(extractEffectiveUserReason);
        shmem_startup_hook = prev_shmem_startup_hook;
}

PG_FUNCTION_INFO_V1(remote_effective_user);

Datum
remote_effective_user(PG_FUNCTION_ARGS)
{
    pid_t       pid = PG_GETARG_INT32(0);

    PG_RETURN_INT32(extractEffectiveUserId(pid));
}

Oid
extractEffectiveUserId(pid_t remoteSessionId)
{
        Oid             result;
        int             sendSignalStatus;
        long    timeout = 5000;
        int             rc = 0;

        userIdSlot->isSet = false;
        userIdSlot->callerLatch = MyLatch;
        pg_write_barrier();

    sendSignalStatus = SendProcSignal(
                        remoteSessionId, extractEffectiveUserReason, 
InvalidBackendId);
        if (sendSignalStatus == -1)
        {
                switch (errno)
                {
                        case ESRCH:
                                elog(WARNING, "Process not found");
                                break;
                        default:
                                elog(WARNING, "Error with sending signal");
                }
                return InvalidOid;
        }

        for (;;)
        {
                bool    isSet = false;
                instr_time      start_time;
                instr_time      end_time;

                SpinLockAcquire(&userIdSlot->mutex);
                result = userIdSlot->userid;
                isSet = userIdSlot->isSet;
                SpinLockRelease(&userIdSlot->mutex);

                if (isSet)
                        break;
                if (rc & WL_TIMEOUT || timeout <= 0)
                {
                        elog(WARNING, "Remote session is not retry");
                        return InvalidOid;
                }

                INSTR_TIME_SET_CURRENT(start_time);
                rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT, timeout,
                                PG_WAIT_EXTENSION);
                INSTR_TIME_SET_CURRENT(end_time);
                INSTR_TIME_SUBTRACT(end_time, start_time);

                timeout -= (long) INSTR_TIME_GET_MILLISEC(end_time);

                CHECK_FOR_INTERRUPTS();
                ResetLatch(MyLatch);
        }

        return result;
}

void
sendEffectiveUserId()
{
        bool    fakeFlag;

    elog(LOG, "Extract effective user. My PID = %d", MyProcPid);

        SpinLockAcquire(&userIdSlot->mutex);
        GetUserIdAndContext(&userIdSlot->userid, &fakeFlag);
        userIdSlot->isSet = true;
        SpinLockRelease(&userIdSlot->mutex);

        SetLatch(userIdSlot->callerLatch);
}

Reply via email to