Changeset: c6f8733a08cf for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=c6f8733a08cf
Modified Files:
        gdk/shared_memory.c
        gdk/shared_memory.h
        monetdb5/extras/pyapi/pyapi.c
Branch: pyapi
Log Message:

Better error reporting for multiprocessing/shared memory.


diffs (truncated from 391 to 300 lines):

diff --git a/gdk/shared_memory.c b/gdk/shared_memory.c
--- a/gdk/shared_memory.c
+++ b/gdk/shared_memory.c
@@ -3,8 +3,7 @@
 
 #ifndef _WIN32
 
-#include "monetdb_config.h"
-#include "gdk.h"
+#include "../monetdb5/mal/mal_exception.h"
 
 #include <stdlib.h>
 #include <assert.h>
@@ -29,24 +28,30 @@ static int shm_current_id = 0;
 static int shm_max_id = 32;
 static int shm_is_initialized = false;
 static char shm_keystring[] = ".";
+static MT_Lock release_memory_lock;
 
-void *init_shared_memory(int id, size_t size, int flags);
+str init_shared_memory(int id, size_t size, void **ptr, int flags);
 void store_shared_memory(int memory_id, void *ptr);
-int release_shared_memory_id(int memory_id, void *ptr);
+str release_shared_memory_id(int memory_id, void *ptr);
 
 int init_process_semaphore(int id, int count, int flags);
 
-void initialize_shared_memory(void)
+str initialize_shared_memory(void)
 {
-       if (shm_is_initialized) return;
-
+       if (shm_is_initialized) //maybe this should just return MAL_SUCCEED as 
well
+        return createException(MAL, "shared_memory.init", "Attempting to 
initialize shared memory when it was already initialized.");
+                
+    //initialize the pointer to memory ID structure
        shm_ptrs = malloc(shm_max_id * sizeof(void*));
        shm_memory_ids = malloc(shm_max_id * sizeof(int));
        shm_current_id = 0;
        shm_max_id = 32;
        shm_unique_id = 2;
 
-       shm_is_initialized = true;
+    MT_lock_init(&release_memory_lock, "release_memory_lock");
+
+    shm_is_initialized = true;
+    return MAL_SUCCEED;
 }
 
 void store_shared_memory(int memory_id, void *ptr)
@@ -95,17 +100,17 @@ int get_unique_shared_memory_id(int offs
        return id;
 }
 
-void* create_shared_memory(int id, size_t size)
+str create_shared_memory(int id, size_t size, void **return_ptr)
 {
-       return init_shared_memory(id, size, IPC_CREAT);
+       return init_shared_memory(id, size, return_ptr, IPC_CREAT);
 }
 
-void *get_shared_memory(int id, size_t size)
+str get_shared_memory(int id, size_t size, void **return_ptr)
 {
-       return init_shared_memory(id, size, 0);
+       return init_shared_memory(id, size, return_ptr, 0);
 }
 
-void *init_shared_memory(int id, size_t size, int flags)
+str init_shared_memory(int id, size_t size, void **return_ptr, int flags)
 {
     int shmid;
     void *ptr;
@@ -113,8 +118,9 @@ void *init_shared_memory(int id, size_t 
        int key = ftok(shm_keystring, id);
     if (key == (key_t) -1)
     {
-        perror("ftok");
-        return NULL;
+        char *err = strerror(errno);
+        errno = 0;
+        return createException(MAL, "shared_memory.get", "Error calling 
ftok(keystring:%s,id:%d): %s", shm_keystring, id, err);
     }
 
        assert(shm_is_initialized);
@@ -122,8 +128,9 @@ void *init_shared_memory(int id, size_t 
        shmid = shmget(key, size, flags | 0666);
     if (shmid < 0)
     {
-       perror("shmget");
-        return NULL;
+        char *err = strerror(errno);
+        errno = 0;
+        return createException(MAL, "shared_memory.get", "Error calling 
shmget(key:%d,size:%zu,flags:%d): %s", key, size, flags, err);
     }
 
     //check if the shared memory segment is already created, if it is we do 
not need to add it to the table and can simply return the pointer
@@ -131,28 +138,32 @@ void *init_shared_memory(int id, size_t 
     {
         if (shm_memory_ids[i] == shmid)
         {
-            return shm_ptrs[i];
+            if (return_ptr != NULL) *return_ptr = shm_ptrs[i];
+            return MAL_SUCCEED;
         }
     }
 
        ptr = shmat(shmid, NULL, 0);
     if (ptr == (void*)-1)
     {
-       perror("shmat");
-        return NULL;
+        char *err = strerror(errno);
+        errno = 0;
+        return createException(MAL, "shared_memory.get", "Error calling 
shmat(id:%d,NULL,0): %s", shmid, err);
     }
 
     store_shared_memory(shmid, ptr);
-    return ptr;
+    if (return_ptr != NULL) *return_ptr = ptr;
+    return MAL_SUCCEED;
 }
 
-int release_shared_memory(void *ptr)
+str release_shared_memory(void *ptr)
 {
        int i = 0;
        int memory_id = -1;
 
     assert(shm_is_initialized);
 
+    MT_lock_set(&release_memory_lock, "release_memory_lock");
        //find the memory_id accompanying the given pointer in the structure
        for(i = 0; i < shm_current_id; i++)
        {
@@ -164,25 +175,28 @@ int release_shared_memory(void *ptr)
                        break;
                }
        }
+    MT_lock_unset(&release_memory_lock, "release_memory_lock");
 
        assert(memory_id);
        //actually release the memory at the given ID
        return release_shared_memory_id(memory_id, ptr);
 }
 
-int release_shared_memory_id(int memory_id, void *ptr)
+str release_shared_memory_id(int memory_id, void *ptr)
 {
        if (shmctl(memory_id, IPC_RMID, NULL) == -1)
        {
-       perror("shmctl");
-        return false;
+        char *err = strerror(errno);
+        errno = 0;
+        return createException(MAL, "shared_memory.release", "Error calling 
shmctl(id:%d,IPC_RMID,NULL): %s", memory_id, err);
        }
        if (shmdt(ptr) == -1)
        {
-       perror("shmdt");
-        return false;
+        char *err = strerror(errno);
+        errno = 0;
+        return createException(MAL, "shared_memory.release", "Error calling 
shmdt(ptr:%p): %s", ptr, err);
        }
-       return true;
+       return MAL_SUCCEED;
 }
 
 int init_process_semaphore(int id, int count, int flags)
diff --git a/gdk/shared_memory.h b/gdk/shared_memory.h
--- a/gdk/shared_memory.h
+++ b/gdk/shared_memory.h
@@ -14,18 +14,21 @@
 #ifndef _SHAREDMEMORY_LIB_
 #define _SHAREDMEMORY_LIB_
 
+#include "monetdb_config.h"
+#include "gdk.h"
+
 #include <stddef.h>
 
 //! Initialize the shared memory module
-void initialize_shared_memory(void);
+str initialize_shared_memory(void);
 //! Not thread safe
-void* create_shared_memory(int id, size_t size);
-//! Not thread safe
-int release_shared_memory(void *ptr);
+str create_shared_memory(int id, size_t size, void **return_ptr);
+//! This is thread safe
+str release_shared_memory(void *ptr);
 //! Not thread safe
 int get_unique_shared_memory_id(int offset);
 //! This is thread safe
-void *get_shared_memory(int id, size_t size);
+str get_shared_memory(int id, size_t size, void **return_ptr);
 
 //! Returns semaphore ID, id = unique id within the program, count = amount of 
semaphores
 int create_process_semaphore(int id, int count);
diff --git a/monetdb5/extras/pyapi/pyapi.c b/monetdb5/extras/pyapi/pyapi.c
--- a/monetdb5/extras/pyapi/pyapi.c
+++ b/monetdb5/extras/pyapi/pyapi.c
@@ -532,13 +532,13 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st
 
         VERBOSE_MESSAGE("Initializing shared memory.\n");
 
+        assert(memory_size > 0);
         //create the shared memory for the header
         MT_lock_set(&pyapiLock, "pyapi.evaluate");
-        ptr = create_shared_memory(shm_id, memory_size); 
+        msg = create_shared_memory(shm_id, memory_size, (void**) &ptr); 
         MT_lock_unset(&pyapiLock, "pyapi.evaluate");
-        if (ptr == NULL) 
+        if (msg != MAL_SUCCEED) 
         {
-            msg = createException(MAL, "pyapi.eval", "Failed to initialize 
shared memory");
             GDKfree(pids);
             process_id = 0;
             goto wrapup;
@@ -613,16 +613,17 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st
             {
                 //a child failed, get the error message from the child
                 ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[failedprocess 
* pci->retc + 0]);
+                char *err_ptr;
 
-                char *err_ptr = get_shared_memory(shm_id + 1, descr->bat_size);
-                if (err_ptr != NULL)
-                {
-                    msg = createException(MAL, "pyapi.eval", "%s", err_ptr);
-                    release_shared_memory(err_ptr);
-                }
-                else
-                {
-                    msg = createException(MAL, "pyapi.eval", "Error in child 
process, but no exception was thrown.");
+                if (descr->bat_size == 0) {
+                    msg = createException(MAL, "pyapi.eval", "Failure in child 
process with unknown error.");
+                } else {
+                    msg = get_shared_memory(shm_id + 1, descr->bat_size, 
(void**) &err_ptr);
+                    if (msg == MAL_SUCCEED)
+                    {
+                        msg = createException(MAL, "pyapi.eval", "%s", 
err_ptr);
+                        release_shared_memory(err_ptr);
+                    }
                 }
 
                 if (process_count > 1) release_process_semaphore(sem_id);
@@ -670,13 +671,13 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st
                 //get the shared memory address for this return value
                 VERBOSE_MESSAGE("Parent requesting memory at id %d of size 
%d\n", shm_id + (i + 1), total_size);
 
+                assert(total_size > 0);
                 MT_lock_set(&pyapiLock, "pyapi.evaluate");
-                ret->array_data = get_shared_memory(shm_id + (i + 1), 
total_size);
+                msg = get_shared_memory(shm_id + (i + 1), total_size, 
&ret->array_data);
                 MT_lock_unset(&pyapiLock, "pyapi.evaluate");
 
-                if (ret->array_data == NULL)
+                if (msg != MAL_SUCCEED)
                 {
-                    msg = createException(MAL, "pyapi.eval", "Shared memory 
does not exist.\n");
                     if (process_count > 1) release_process_semaphore(sem_id);
                     release_shared_memory(ptr);
                     GDKfree(pids);
@@ -690,13 +691,13 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st
                 {
                     int mask_size = ret->count * sizeof(bool);
 
+                    assert(mask_size > 0);
                     MT_lock_set(&pyapiLock, "pyapi.evaluate");
-                    ret->mask_data = get_shared_memory(shm_id + pci->retc + (i 
+ 1), mask_size);
+                    msg = get_shared_memory(shm_id + pci->retc + (i + 1), 
mask_size, (void**) &ret->mask_data);
                     MT_lock_unset(&pyapiLock, "pyapi.evaluate");
 
-                    if (ret->mask_data == NULL)
+                    if (msg != MAL_SUCCEED)
                     {
-                        msg = createException(MAL, "pyapi.eval", "Shared 
memory does not exist.\n");
                         if (process_count > 1) 
release_process_semaphore(sem_id);
                         release_shared_memory(ptr);
                         release_shared_memory(ret->array_data);
@@ -869,9 +870,8 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st
         // First we will fill in the header information, we will need to get a 
pointer to the header data first
         // The main process has already created the header data for all the 
child processes
         VERBOSE_MESSAGE("Getting shared memory.\n");
-        shm_ptr = get_shared_memory(shm_id, memory_size);
-        if (shm_ptr == NULL) {
-            msg = createException(MAL, "pyapi.eval", "Failed to allocate 
shared memory for header data.\n");
+        msg = get_shared_memory(shm_id, memory_size, (void**) &shm_ptr);
+        if (msg != MAL_SUCCEED) {
             goto wrapup;
         }
 
@@ -952,16 +952,18 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st
                      mask_size += descr->bat_count * sizeof(bool);
                      has_mask = has_mask || descr->has_mask;
                 }
+                assert(return_size > 0);
                 // Then we allocate the shared memory for this return value
                 VERBOSE_MESSAGE("Child creating shared memory at id %d of size 
%d\n", shm_id + (i + 1), return_size);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to