Hi, 

>>From: Thomas Munro [mailto:thomas.mu...@gmail.com]
>>Subject: Re: Copy data to DSA area
>>
>>On Wed, May 8, 2019 at 5:29 PM Ideriha, Takeshi
>><ideriha.take...@jp.fujitsu.com>
>>wrote:
>>> >From: Ideriha, Takeshi [mailto:ideriha.take...@jp.fujitsu.com]
>>> >Sent: Friday, April 26, 2019 11:50 PM Well, after developing PoC, I
>>> >realized that this PoC doesn't solve the local process is crashed
>>> >before the context becomes shared because local process keeps track
>>> >of pointer to chunks.
>>> >Maybe all of you have already noticed and pointed out this case :)
>>> >So it needs another work but this poc is a good step for me to advance 
>>> >more.
>>>
>>> I think the point to prevent memory leak is allocating memory and
>>> storing its address into a structure at the same time. This structure
>>> should be trackable from other process.
>>
>>I'm not sure that it's necessarily wrong to keep tracking information in 
>>private
>memory.
>>If any backend crashes, the postmaster will terminate all backends and
>>reinitialise everything anyway, because shared memory might be corrupted.
>
>I'm going to put keep tracking information in private
>memory and send a patch.

I updated a PoC patch.
This has memory tracking buffer in local process. The old version also has this 
system but I refactored the code. Memory leak while allocating memory seems to 
be solved thanks to memory tracking buffer.

What I haven't addressed is memory leak while freeing objects. In current
sequence a cache (e.g. relcache) is freed after removed from its hash table. 
If cache and hash table gets shared, memory leak is likely to happen
between removal from hash table and free. We lose track of cache objects
if error happens after cache is unlinked from the hash table. And also a cache
consists of graph structure. So we also take care of freeing cache partially.

Maybe we need to remember pointers of objects before unlink from the hash.
Also, we need to free them all at once after we can make sure that all the 
pointers are registered to local buffer. Followings are some idea to implement
this: 
- change the order of removal from hash table and deletion
- pfree in shared memory context doesn't dsa_free but just add pointer to
  the local buffer. 
- remove link from hash table after all pfree() is done
- then, call a function, which does actual dsa_free taking a look at the local
  Buffer

But I'm not sure this solution is good one. Do you have any thoughts?

Regards,
Takeshi Ideriha
/*
 * Another PoC of MemoryContext for DSA based on Thomus Munro's work
 *
 * https://www.postgresql.org/message-id/
 * CAEepm%3D22H0TDCAHWh%3DYWmSV%2BX%2BbTtcTNg8RpP%3DeaCjWJU_d-9A
 * %40mail.gmail.com
 *
 */

#include "postgres.h"

#include "fmgr.h"
#include "lib/ilist.h"
#include "miscadmin.h"
#include "nodes/pg_list.h"
#include "nodes/memnodes.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "utils/dsa.h"
#include "utils/memutils.h"

#define MY_AREA_SIZE (1024 * 1024)
#define NUM_CHUNKS 64 /* an arbitrary number */


PG_MODULE_MAGIC;

void _PG_init(void);
PG_FUNCTION_INFO_V1(hoge);
PG_FUNCTION_INFO_V1(hoge_list);
PG_FUNCTION_INFO_V1(hoge_list_error);

#define isLocalContext(c) (c->alloc_buffer != NULL)
#define isBufferFull(buf) (buf->tail == NUM_CHUNKS)

/* Support code to make a dsa_area into a MemoryContext. */
typedef struct dsa_temp_buffer dsa_temp_buffer;

static void hoge_shmem_startup_hook(void);
static MemoryContext make_hoge_memory_context(dsa_area *area, void *base, bool 
isLocal);
static void push_new_buffer(dsa_temp_buffer *buffer);

static shmem_startup_hook_type prev_shmem_startup_hook;
static void *my_raw_memory;
static dsa_area *my_area;
static MemoryContext my_shared_dsa_context;
static MemoryContext my_local_dsa_context;

static List **my_list;
void AddToDsaSharedContext(MemoryContext local, MemoryContext shared);
MemoryContext CreateDsaLocalContext(MemoryContext shared);
Size ShmDsaContextSize(void);


void
_PG_init(void)
{
        /* This only works if preloaded by the postmaster. */
        if (!process_shared_preload_libraries_in_progress)
                return;

        /* Request a chunk of traditional shared memory. */
        RequestAddinShmemSpace(MY_AREA_SIZE);

        /* Register our hook for phase II of initialization. */
        prev_shmem_startup_hook = shmem_startup_hook;
        shmem_startup_hook = hoge_shmem_startup_hook;
}

static void
hoge_shmem_startup_hook(void)
{
        MemoryContext   old_context;
        bool            found;

        if (prev_shmem_startup_hook)
                prev_shmem_startup_hook();

        old_context = MemoryContextSwitchTo(TopMemoryContext);

        LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);

        /* Allocate, or look up, a chunk of raw fixed-address shared memory. */
        my_raw_memory = ShmemInitStruct("hoge", MY_AREA_SIZE, &found);
        if (!found)
        {
                /*
                 * Create a new DSA area, and clamp its size so it can't make 
any
                 * segments outside the provided space.
                 */
                my_area = dsa_create_in_place(my_raw_memory, MY_AREA_SIZE, 0, 
NULL);
                dsa_set_size_limit(my_area, MY_AREA_SIZE);
        }
        else
        {
                /* Attach to an existing area. */
                my_area = dsa_attach_in_place(my_raw_memory, NULL);
        }

        /* Also allocate or look up a list header. */
        my_list = ShmemInitStruct("hoge_list", MY_AREA_SIZE, &found);
        if (!found)
                *my_list = NIL;

        /* Create a memory context. */
        my_shared_dsa_context = 
                make_hoge_memory_context(my_area, my_raw_memory, false);

        LWLockRelease(AddinShmemInitLock);

        MemoryContextSwitchTo(old_context);
}

Datum
hoge(PG_FUNCTION_ARGS)
{
        char *s;
        MemoryContext old_context;

        old_context = MemoryContextSwitchTo(TopTransactionContext);
        my_local_dsa_context = CreateDsaLocalContext(my_shared_dsa_context);
        MemoryContextSwitchTo(old_context);

        old_context = MemoryContextSwitchTo(my_local_dsa_context);

        /* Simple smoke test: allocate and free immediately. */
        s = pstrdup("hello world");
        pfree(s);
        AddToDsaSharedContext(my_local_dsa_context, my_shared_dsa_context);

        MemoryContextSwitchTo(old_context);

        PG_RETURN_VOID();
}

Datum
hoge_list(PG_FUNCTION_ARGS)
{
        int i = PG_GETARG_INT32(0);
        ListCell *lc;
        MemoryContext old_context;

        old_context = MemoryContextSwitchTo(TopTransactionContext);
        my_local_dsa_context = CreateDsaLocalContext(my_shared_dsa_context);
        MemoryContextSwitchTo(old_context);

        old_context = MemoryContextSwitchTo(my_local_dsa_context);

        /* Manipulate a list in shared memory. */
        LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
        if (i < 0)
                *my_list = list_delete_int(*my_list, -i);
        else
                *my_list = lappend_int(*my_list, i);
        LWLockRelease(AddinShmemInitLock);

        AddToDsaSharedContext(my_local_dsa_context, my_shared_dsa_context);

        /* Dump list. */
        elog(NOTICE, "Contents of list:");
        foreach(lc, *my_list)
                elog(NOTICE, " %d", lfirst_int(lc));

        MemoryContextSwitchTo(old_context);

        PG_RETURN_VOID();
}


/*
 * Error test to check chunk is dsa_freed
 * Currently after this function is called by user
 * process goes down because it tries to access dangling pointer
 */
Datum
hoge_list_error(PG_FUNCTION_ARGS)
{
        int i = PG_GETARG_INT32(0);
        ListCell *lc;
        MemoryContext old_context;

        old_context = MemoryContextSwitchTo(TopTransactionContext);
        my_local_dsa_context = CreateDsaLocalContext(my_shared_dsa_context);
        MemoryContextSwitchTo(old_context);

        old_context = MemoryContextSwitchTo(my_local_dsa_context);

        /* Manipulate a list in shared memory. */
        LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
        if (i < 0)
                *my_list = list_delete_int(*my_list, -i);
        else
                *my_list = lappend_int(*my_list, i);

        elog(ERROR, "error test");
        LWLockRelease(AddinShmemInitLock);
        AddToDsaSharedContext(my_local_dsa_context, my_shared_dsa_context);

        /* Dump list. */
        elog(NOTICE, "Contents of list:");
        foreach(lc, *my_list)
                elog(NOTICE, " %d", lfirst_int(lc));

        MemoryContextSwitchTo(old_context);

        PG_RETURN_VOID();
}


/* this buffer is used to free all "local" dsa chunks */
struct dsa_temp_buffer
{
        dsa_temp_buffer *next; /* single linked list */
        int tail;                       /* index of array to be allocated */
        dsa_pointer chunks[NUM_CHUNKS]; /* relative address of chunks */
};

struct hoge_memory_context
{
        struct MemoryContextData memory_context;
        void     *base;
        dsa_area   *area;
        /* array of pointer to dsa_allocated chunks to be freed if error occurs 
*/
        dsa_temp_buffer *alloc_buffer;
};


static void *
hoge_alloc(MemoryContext context, Size size)
{
        struct hoge_memory_context *c = (struct hoge_memory_context *) context;
        
        char *chunk_backp;      
        dsa_temp_buffer *buf;

        /* we only allow palloc in temporary DSA-MemoryContext */
        if (!isLocalContext(c))
        {
                elog(ERROR, "hoge_alloc should be run in "
                         "local DSA MemoryContext");
                return NULL;
        }

        /* if buffer is full, allocate a new buffer */
        if (isBufferFull(c->alloc_buffer))
                push_new_buffer(c->alloc_buffer);

        /* Add space for the secret context pointer. */
        buf = c->alloc_buffer;
        buf->chunks[buf->tail] = dsa_allocate(c->area, sizeof(void *) + size);
        
        chunk_backp = (char *)c->base + 
                buf->chunks[buf->tail];
        buf->tail++;
        *(void **) chunk_backp = context;

        return chunk_backp + sizeof(void *);
}

static void
hoge_free(MemoryContext context, void *pointer)
{
        struct hoge_memory_context *c = (struct hoge_memory_context *) context;
        dsa_temp_buffer *buf;
        char *chunk_backp;
        dsa_pointer dp;
        int idx;

        /* Rewind to the secret start of the chunk */
        chunk_backp = (char *) pointer - sizeof(void *);
        dp = (dsa_pointer) ((char *)chunk_backp - (char *)c->base);

        dsa_free(c->area, dp);

        /* early return if shared context */
        if (!isLocalContext(c))
                return;

        /* To avoid free twice at MemoryContextDelete, remove its reference */
        for (buf = c->alloc_buffer; buf != NULL; buf = buf->next)
        {
                for (idx = 0; idx < buf->tail; idx++) 
                {
                        if (buf->chunks[idx] == dp)
                        {
                                buf->chunks[idx] = 0;
                                break;
                        }
                }
        }       
}

static void *
hoge_realloc(MemoryContext context, void *pointer, Size size)
{
        elog(ERROR, "hoge_realloc not implemented");
        return NULL;
}

static void
hoge_reset(MemoryContext context)
{
        elog(ERROR, "hoge_reset not implemented");
}

static void
hoge_delete(MemoryContext context)
{       
        struct hoge_memory_context *c = (struct hoge_memory_context *) context;
        dsa_temp_buffer *buf = c->alloc_buffer;
        int idx;

        /* We don't support MemoryContextDelete if Context is permanent */
        if (!isLocalContext(c))
           elog(ERROR, "hoge_delete is not supported at permanent DSA 
MemoryContext");

        /* free all chunks and buffers */
        while (buf && buf->chunks)
        {
                dsa_temp_buffer *next_buf = buf->next;
                
                for (idx = 0; idx < buf->tail; idx++) 
                {
                        /* Rewind to the secret start of the chunk */
                        if (buf->chunks[idx] != 0)
                        {
                                dsa_free(c->area, buf->chunks[idx]);
                                elog(NOTICE, "chunk is dsa_freed");
                        }
                }
                pfree(buf);
                buf = next_buf;
        }
        /* Finally, free the context header */
        pfree(c);
}

static Size
hoge_get_chunk_space(MemoryContext context, void *pointer)
{
        elog(ERROR, "hoge_get_chunk_space not implemented");
        return 0;
}

static bool
hoge_is_empty(MemoryContext context)
{
        elog(ERROR, "hoge_is_empty not implemented");
        return false;
}

static void
hoge_set_state(MemoryContext context,
                           MemoryStatsPrintFunc printfunc, void *passthru,
                           MemoryContextCounters *totals)
{
        elog(ERROR, "hoge_set_state not implemented");
}

static void
hoge_set_check(MemoryContext context)
{
}

static void
push_new_buffer(dsa_temp_buffer *buffer)
{
        MemoryContext old_context;
        dsa_temp_buffer *new_buffer;

        old_context = MemoryContextSwitchTo(TopTransactionContext);
        
        /* Insert a new buffer into head position */
        new_buffer = (dsa_temp_buffer *) palloc0(sizeof(dsa_temp_buffer));
        new_buffer->next = buffer;
        buffer = new_buffer;
        
        MemoryContextSwitchTo(old_context);
}



MemoryContext
make_hoge_memory_context(dsa_area *area, void *base, bool isLocal)
{
        static const MemoryContextMethods hoge_methods = {
                hoge_alloc,
                hoge_free,
                hoge_realloc,
                hoge_reset,
                hoge_delete,
                hoge_get_chunk_space,
                hoge_is_empty,
                hoge_set_state,
#ifdef MEMORY_CONTEXT_CHECKING
                hoge_set_check
#endif
        };
        
        struct hoge_memory_context *result;
        bool found;

        if (isLocal)
                result = palloc0(sizeof(struct hoge_memory_context));
        else
        {
                result = (struct hoge_memory_context *) 
                        ShmemInitStruct("ShmDsaContext", ShmDsaContextSize(), 
&found);
                if (found)
                        return &result->memory_context;
        }

        MemoryContextCreate(&result->memory_context,
                                                T_SlabContext, /* TODO: this is 
a lie */
                                                &hoge_methods,
                                                NULL,
                                                "hoge");
        result->base = base;
        result->area = area;
        
        /* If context is shared one, chunks become permanent so temp_buffer is 
NULL */
        if (isLocal)
                result->alloc_buffer = (dsa_temp_buffer *) 
palloc0(sizeof(dsa_temp_buffer));
        else
                result->alloc_buffer = NULL;

        return &result->memory_context;
}

Size
ShmDsaContextSize(void)
{
        return sizeof(struct hoge_memory_context);
}

MemoryContext
CreateDsaLocalContext(MemoryContext shared)
{
        MemoryContext local;
        struct hoge_memory_context *shared_context = (struct 
hoge_memory_context *) shared;

        AssertArg(MemoryContextIsValid(shared));
        
        local = make_hoge_memory_context(shared_context->area, 
shared_context->base, true);
        MemoryContextSetParent(local, TopTransactionContext);
        
        Assert(MemoryContextIsValid(local));

        return local;
}

/* 
 * AddToDsaSharedContext
 *
 * We don't want to leak memory in shared memory. Unlike local process, 
 * memory leak still exists even after local process is terminated.
 * If error occurs in transaction, we free all dsa_allocated chunks linked
 * from local DSA-MemoryContext. When the function using DSA-MemoryContext make
 * sure that the memory leak does not happen, AddToDsaSharedContext should be 
called.
 *
 * Assuming following case: when catcache is inserted into hash table on 
 * shared memory and transaction is aborted we wouldn't leak memory because 
 * some eviction alogorithm would clean up useless cache eventually.
 *
 */
void
AddToDsaSharedContext(MemoryContext local, MemoryContext shared)
{
        struct hoge_memory_context *local_dsa_cxt = (struct hoge_memory_context 
*) local;
        dsa_temp_buffer *buf = local_dsa_cxt->alloc_buffer;
        int idx;

        AssertArg(MemoryContextIsValid(local));
        AssertArg(MemoryContextIsValid(shared));
        

        /* change backpointer to shared MemoryContext */        
        while (buf && buf->chunks)
        {
                dsa_temp_buffer *next_buf = buf->next;

                for (idx = 0; idx < buf->tail; idx++) 
                {
                        /* Rewind to the secret start of the chunk */
                        if (buf->chunks[idx] != 0)
                                *(void **)(buf->chunks[idx] + (char 
*)local_dsa_cxt->base)
                                                   = shared;
                }
                /* initialize head of buffer */
                buf->tail = 0;
                buf = next_buf;
        }
}

Attachment: hoge.control
Description: hoge.control

Attachment: hoge--1.0.sql
Description: hoge--1.0.sql

Attachment: Makefile
Description: Makefile

Reply via email to