Changeset: c6f8733a08cf for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=c6f8733a08cf Modified Files: gdk/shared_memory.c gdk/shared_memory.h monetdb5/extras/pyapi/pyapi.c Branch: pyapi Log Message:
Better error reporting for multiprocessing/shared memory. diffs (truncated from 391 to 300 lines): diff --git a/gdk/shared_memory.c b/gdk/shared_memory.c --- a/gdk/shared_memory.c +++ b/gdk/shared_memory.c @@ -3,8 +3,7 @@ #ifndef _WIN32 -#include "monetdb_config.h" -#include "gdk.h" +#include "../monetdb5/mal/mal_exception.h" #include <stdlib.h> #include <assert.h> @@ -29,24 +28,30 @@ static int shm_current_id = 0; static int shm_max_id = 32; static int shm_is_initialized = false; static char shm_keystring[] = "."; +static MT_Lock release_memory_lock; -void *init_shared_memory(int id, size_t size, int flags); +str init_shared_memory(int id, size_t size, void **ptr, int flags); void store_shared_memory(int memory_id, void *ptr); -int release_shared_memory_id(int memory_id, void *ptr); +str release_shared_memory_id(int memory_id, void *ptr); int init_process_semaphore(int id, int count, int flags); -void initialize_shared_memory(void) +str initialize_shared_memory(void) { - if (shm_is_initialized) return; - + if (shm_is_initialized) //maybe this should just return MAL_SUCCEED as well + return createException(MAL, "shared_memory.init", "Attempting to initialize shared memory when it was already initialized."); + + //initialize the pointer to memory ID structure shm_ptrs = malloc(shm_max_id * sizeof(void*)); shm_memory_ids = malloc(shm_max_id * sizeof(int)); shm_current_id = 0; shm_max_id = 32; shm_unique_id = 2; - shm_is_initialized = true; + MT_lock_init(&release_memory_lock, "release_memory_lock"); + + shm_is_initialized = true; + return MAL_SUCCEED; } void store_shared_memory(int memory_id, void *ptr) @@ -95,17 +100,17 @@ int get_unique_shared_memory_id(int offs return id; } -void* create_shared_memory(int id, size_t size) +str create_shared_memory(int id, size_t size, void **return_ptr) { - return init_shared_memory(id, size, IPC_CREAT); + return init_shared_memory(id, size, return_ptr, IPC_CREAT); } -void *get_shared_memory(int id, size_t size) +str get_shared_memory(int id, size_t size, void **return_ptr) { - return init_shared_memory(id, size, 0); + return init_shared_memory(id, size, return_ptr, 0); } -void *init_shared_memory(int id, size_t size, int flags) +str init_shared_memory(int id, size_t size, void **return_ptr, int flags) { int shmid; void *ptr; @@ -113,8 +118,9 @@ void *init_shared_memory(int id, size_t int key = ftok(shm_keystring, id); if (key == (key_t) -1) { - perror("ftok"); - return NULL; + char *err = strerror(errno); + errno = 0; + return createException(MAL, "shared_memory.get", "Error calling ftok(keystring:%s,id:%d): %s", shm_keystring, id, err); } assert(shm_is_initialized); @@ -122,8 +128,9 @@ void *init_shared_memory(int id, size_t shmid = shmget(key, size, flags | 0666); if (shmid < 0) { - perror("shmget"); - return NULL; + char *err = strerror(errno); + errno = 0; + return createException(MAL, "shared_memory.get", "Error calling shmget(key:%d,size:%zu,flags:%d): %s", key, size, flags, err); } //check if the shared memory segment is already created, if it is we do not need to add it to the table and can simply return the pointer @@ -131,28 +138,32 @@ void *init_shared_memory(int id, size_t { if (shm_memory_ids[i] == shmid) { - return shm_ptrs[i]; + if (return_ptr != NULL) *return_ptr = shm_ptrs[i]; + return MAL_SUCCEED; } } ptr = shmat(shmid, NULL, 0); if (ptr == (void*)-1) { - perror("shmat"); - return NULL; + char *err = strerror(errno); + errno = 0; + return createException(MAL, "shared_memory.get", "Error calling shmat(id:%d,NULL,0): %s", shmid, err); } store_shared_memory(shmid, ptr); - return ptr; + if (return_ptr != NULL) *return_ptr = ptr; + return MAL_SUCCEED; } -int release_shared_memory(void *ptr) +str release_shared_memory(void *ptr) { int i = 0; int memory_id = -1; assert(shm_is_initialized); + MT_lock_set(&release_memory_lock, "release_memory_lock"); //find the memory_id accompanying the given pointer in the structure for(i = 0; i < shm_current_id; i++) { @@ -164,25 +175,28 @@ int release_shared_memory(void *ptr) break; } } + MT_lock_unset(&release_memory_lock, "release_memory_lock"); assert(memory_id); //actually release the memory at the given ID return release_shared_memory_id(memory_id, ptr); } -int release_shared_memory_id(int memory_id, void *ptr) +str release_shared_memory_id(int memory_id, void *ptr) { if (shmctl(memory_id, IPC_RMID, NULL) == -1) { - perror("shmctl"); - return false; + char *err = strerror(errno); + errno = 0; + return createException(MAL, "shared_memory.release", "Error calling shmctl(id:%d,IPC_RMID,NULL): %s", memory_id, err); } if (shmdt(ptr) == -1) { - perror("shmdt"); - return false; + char *err = strerror(errno); + errno = 0; + return createException(MAL, "shared_memory.release", "Error calling shmdt(ptr:%p): %s", ptr, err); } - return true; + return MAL_SUCCEED; } int init_process_semaphore(int id, int count, int flags) diff --git a/gdk/shared_memory.h b/gdk/shared_memory.h --- a/gdk/shared_memory.h +++ b/gdk/shared_memory.h @@ -14,18 +14,21 @@ #ifndef _SHAREDMEMORY_LIB_ #define _SHAREDMEMORY_LIB_ +#include "monetdb_config.h" +#include "gdk.h" + #include <stddef.h> //! Initialize the shared memory module -void initialize_shared_memory(void); +str initialize_shared_memory(void); //! Not thread safe -void* create_shared_memory(int id, size_t size); -//! Not thread safe -int release_shared_memory(void *ptr); +str create_shared_memory(int id, size_t size, void **return_ptr); +//! This is thread safe +str release_shared_memory(void *ptr); //! Not thread safe int get_unique_shared_memory_id(int offset); //! This is thread safe -void *get_shared_memory(int id, size_t size); +str get_shared_memory(int id, size_t size, void **return_ptr); //! Returns semaphore ID, id = unique id within the program, count = amount of semaphores int create_process_semaphore(int id, int count); diff --git a/monetdb5/extras/pyapi/pyapi.c b/monetdb5/extras/pyapi/pyapi.c --- a/monetdb5/extras/pyapi/pyapi.c +++ b/monetdb5/extras/pyapi/pyapi.c @@ -532,13 +532,13 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st VERBOSE_MESSAGE("Initializing shared memory.\n"); + assert(memory_size > 0); //create the shared memory for the header MT_lock_set(&pyapiLock, "pyapi.evaluate"); - ptr = create_shared_memory(shm_id, memory_size); + msg = create_shared_memory(shm_id, memory_size, (void**) &ptr); MT_lock_unset(&pyapiLock, "pyapi.evaluate"); - if (ptr == NULL) + if (msg != MAL_SUCCEED) { - msg = createException(MAL, "pyapi.eval", "Failed to initialize shared memory"); GDKfree(pids); process_id = 0; goto wrapup; @@ -613,16 +613,17 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st { //a child failed, get the error message from the child ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[failedprocess * pci->retc + 0]); + char *err_ptr; - char *err_ptr = get_shared_memory(shm_id + 1, descr->bat_size); - if (err_ptr != NULL) - { - msg = createException(MAL, "pyapi.eval", "%s", err_ptr); - release_shared_memory(err_ptr); - } - else - { - msg = createException(MAL, "pyapi.eval", "Error in child process, but no exception was thrown."); + if (descr->bat_size == 0) { + msg = createException(MAL, "pyapi.eval", "Failure in child process with unknown error."); + } else { + msg = get_shared_memory(shm_id + 1, descr->bat_size, (void**) &err_ptr); + if (msg == MAL_SUCCEED) + { + msg = createException(MAL, "pyapi.eval", "%s", err_ptr); + release_shared_memory(err_ptr); + } } if (process_count > 1) release_process_semaphore(sem_id); @@ -670,13 +671,13 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st //get the shared memory address for this return value VERBOSE_MESSAGE("Parent requesting memory at id %d of size %d\n", shm_id + (i + 1), total_size); + assert(total_size > 0); MT_lock_set(&pyapiLock, "pyapi.evaluate"); - ret->array_data = get_shared_memory(shm_id + (i + 1), total_size); + msg = get_shared_memory(shm_id + (i + 1), total_size, &ret->array_data); MT_lock_unset(&pyapiLock, "pyapi.evaluate"); - if (ret->array_data == NULL) + if (msg != MAL_SUCCEED) { - msg = createException(MAL, "pyapi.eval", "Shared memory does not exist.\n"); if (process_count > 1) release_process_semaphore(sem_id); release_shared_memory(ptr); GDKfree(pids); @@ -690,13 +691,13 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st { int mask_size = ret->count * sizeof(bool); + assert(mask_size > 0); MT_lock_set(&pyapiLock, "pyapi.evaluate"); - ret->mask_data = get_shared_memory(shm_id + pci->retc + (i + 1), mask_size); + msg = get_shared_memory(shm_id + pci->retc + (i + 1), mask_size, (void**) &ret->mask_data); MT_lock_unset(&pyapiLock, "pyapi.evaluate"); - if (ret->mask_data == NULL) + if (msg != MAL_SUCCEED) { - msg = createException(MAL, "pyapi.eval", "Shared memory does not exist.\n"); if (process_count > 1) release_process_semaphore(sem_id); release_shared_memory(ptr); release_shared_memory(ret->array_data); @@ -869,9 +870,8 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st // First we will fill in the header information, we will need to get a pointer to the header data first // The main process has already created the header data for all the child processes VERBOSE_MESSAGE("Getting shared memory.\n"); - shm_ptr = get_shared_memory(shm_id, memory_size); - if (shm_ptr == NULL) { - msg = createException(MAL, "pyapi.eval", "Failed to allocate shared memory for header data.\n"); + msg = get_shared_memory(shm_id, memory_size, (void**) &shm_ptr); + if (msg != MAL_SUCCEED) { goto wrapup; } @@ -952,16 +952,18 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st mask_size += descr->bat_count * sizeof(bool); has_mask = has_mask || descr->has_mask; } + assert(return_size > 0); // Then we allocate the shared memory for this return value VERBOSE_MESSAGE("Child creating shared memory at id %d of size %d\n", shm_id + (i + 1), return_size); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list