Changeset: 08d8961ace58 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=08d8961ace58 Modified Files: gdk/shared_memory.c gdk/shared_memory.h monetdb5/extras/pyapi/Benchmarks/pyapi_test.sh monetdb5/extras/pyapi/pyapi.c monetdb5/extras/pyapi/pyapi.h monetdb5/extras/pyapi/pyapi.mal Branch: pyapi Log Message:
Merge. diffs (truncated from 475 to 300 lines): diff --git a/gdk/shared_memory.c b/gdk/shared_memory.c --- a/gdk/shared_memory.c +++ b/gdk/shared_memory.c @@ -3,8 +3,7 @@ #ifndef _WIN32 -#include "monetdb_config.h" -#include "gdk.h" +#include "../monetdb5/mal/mal_exception.h" #include <stdlib.h> #include <assert.h> @@ -28,25 +27,33 @@ static int shm_unique_id = 1; static int shm_current_id = 0; static int shm_max_id = 32; static int shm_is_initialized = false; -static char shm_keystring[] = "."; +static char shm_keystring[] = BINDIR; +static MT_Lock release_memory_lock; +static key_t base_key = 800000000; -void *init_shared_memory(int id, size_t size, int flags); + +str init_shared_memory(int id, size_t size, void **ptr, int flags); void store_shared_memory(int memory_id, void *ptr); -int release_shared_memory_id(int memory_id, void *ptr); +str release_shared_memory_id(int memory_id, void *ptr); int init_process_semaphore(int id, int count, int flags); -void initialize_shared_memory(void) +str initialize_shared_memory(void) { - if (shm_is_initialized) return; - + if (shm_is_initialized) //maybe this should just return MAL_SUCCEED as well + return createException(MAL, "shared_memory.init", "Attempting to initialize shared memory when it was already initialized."); + + //initialize the pointer to memory ID structure shm_ptrs = malloc(shm_max_id * sizeof(void*)); shm_memory_ids = malloc(shm_max_id * sizeof(int)); shm_current_id = 0; shm_max_id = 32; shm_unique_id = 2; - shm_is_initialized = true; + MT_lock_init(&release_memory_lock, "release_memory_lock"); + + shm_is_initialized = true; + return MAL_SUCCEED; } void store_shared_memory(int memory_id, void *ptr) @@ -79,7 +86,6 @@ void store_shared_memory(int memory_id, shm_max_id *= 2; } - shm_memory_ids[shm_current_id] = memory_id; shm_ptrs[shm_current_id] = ptr; shm_current_id++; @@ -95,26 +101,34 @@ int get_unique_shared_memory_id(int offs return id; } -void* create_shared_memory(int id, size_t size) +str create_shared_memory(int id, size_t size, void **return_ptr) { - return init_shared_memory(id, size, IPC_CREAT); + return init_shared_memory(id, size, return_ptr, IPC_CREAT); } -void *get_shared_memory(int id, size_t size) +str get_shared_memory(int id, size_t size, void **return_ptr) { - return init_shared_memory(id, size, 0); + return init_shared_memory(id, size, return_ptr, 0); } -void *init_shared_memory(int id, size_t size, int flags) +str ftok_enhanced(int id, key_t *return_key); +str ftok_enhanced(int id, key_t *return_key) +{ + *return_key = base_key + id; + return MAL_SUCCEED; +} + +str init_shared_memory(int id, size_t size, void **return_ptr, int flags) { int shmid; void *ptr; int i; - int key = ftok(shm_keystring, id); - if (key == (key_t) -1) + key_t key; + + str msg = ftok_enhanced(id, &key); + if (msg != MAL_SUCCEED) { - perror("ftok"); - return NULL; + return msg; } assert(shm_is_initialized); @@ -122,8 +136,9 @@ void *init_shared_memory(int id, size_t shmid = shmget(key, size, flags | 0666); if (shmid < 0) { - perror("shmget"); - return NULL; + char *err = strerror(errno); + errno = 0; + return createException(MAL, "shared_memory.get", "Error calling shmget(key:%d,size:%zu,flags:%d): %s", key, size, flags, err); } //check if the shared memory segment is already created, if it is we do not need to add it to the table and can simply return the pointer @@ -131,28 +146,32 @@ void *init_shared_memory(int id, size_t { if (shm_memory_ids[i] == shmid) { - return shm_ptrs[i]; + if (return_ptr != NULL) *return_ptr = shm_ptrs[i]; + return MAL_SUCCEED; } } ptr = shmat(shmid, NULL, 0); if (ptr == (void*)-1) { - perror("shmat"); - return NULL; + char *err = strerror(errno); + errno = 0; + return createException(MAL, "shared_memory.get", "Error calling shmat(id:%d,NULL,0): %s", shmid, err); } store_shared_memory(shmid, ptr); - return ptr; + if (return_ptr != NULL) *return_ptr = ptr; + return MAL_SUCCEED; } -int release_shared_memory(void *ptr) +str release_shared_memory(void *ptr) { int i = 0; int memory_id = -1; assert(shm_is_initialized); + MT_lock_set(&release_memory_lock, "release_memory_lock"); //find the memory_id accompanying the given pointer in the structure for(i = 0; i < shm_current_id; i++) { @@ -164,25 +183,28 @@ int release_shared_memory(void *ptr) break; } } + MT_lock_unset(&release_memory_lock, "release_memory_lock"); assert(memory_id); //actually release the memory at the given ID return release_shared_memory_id(memory_id, ptr); } -int release_shared_memory_id(int memory_id, void *ptr) +str release_shared_memory_id(int memory_id, void *ptr) { if (shmctl(memory_id, IPC_RMID, NULL) == -1) { - perror("shmctl"); - return false; + char *err = strerror(errno); + errno = 0; + return createException(MAL, "shared_memory.release", "Error calling shmctl(id:%d,IPC_RMID,NULL): %s", memory_id, err); } if (shmdt(ptr) == -1) { - perror("shmdt"); - return false; + char *err = strerror(errno); + errno = 0; + return createException(MAL, "shared_memory.release", "Error calling shmdt(ptr:%p): %s", ptr, err); } - return true; + return MAL_SUCCEED; } int init_process_semaphore(int id, int count, int flags) diff --git a/gdk/shared_memory.h b/gdk/shared_memory.h --- a/gdk/shared_memory.h +++ b/gdk/shared_memory.h @@ -14,18 +14,21 @@ #ifndef _SHAREDMEMORY_LIB_ #define _SHAREDMEMORY_LIB_ +#include "monetdb_config.h" +#include "gdk.h" + #include <stddef.h> //! Initialize the shared memory module -void initialize_shared_memory(void); +str initialize_shared_memory(void); //! Not thread safe -void* create_shared_memory(int id, size_t size); -//! Not thread safe -int release_shared_memory(void *ptr); +str create_shared_memory(int id, size_t size, void **return_ptr); +//! This is thread safe +str release_shared_memory(void *ptr); //! Not thread safe int get_unique_shared_memory_id(int offset); //! This is thread safe -void *get_shared_memory(int id, size_t size); +str get_shared_memory(int id, size_t size, void **return_ptr); //! Returns semaphore ID, id = unique id within the program, count = amount of semaphores int create_process_semaphore(int id, int count); diff --git a/monetdb5/extras/pyapi/Benchmarks/pyapi_test.sh b/monetdb5/extras/pyapi/Benchmarks/pyapi_test.sh --- a/monetdb5/extras/pyapi/Benchmarks/pyapi_test.sh +++ b/monetdb5/extras/pyapi/Benchmarks/pyapi_test.sh @@ -11,7 +11,7 @@ export MSERVERTEST='netstat -ant | grep # Testing parameters # Input test (zero copy vs copy) # The input sizes to test (in MB) -export INPUT_TESTING_SIZES="0.1 1 10 100 1000" +export INPUT_TESTING_SIZES="10" # Amount of tests to run for each size export INPUT_TESTING_NTESTS=10 @@ -130,6 +130,11 @@ function pyapi_build() { fi } +function pyapi_run_single_test_echo() { + echo \$PYAPI_BUILD_DIR/bin/mserver5 --set mapi_port=\$MSERVER_PORT --set embedded_py=true --set enable_pyverbose=true --set pyapi_benchmark_output=\$PYAPI_OUTPUT_DIR/temp_output.tsv $2 + echo python \$PYAPI_TESTFILE $3 $4 $5 \$MSERVER_PORT $6 +} + function pyapi_run_single_test() { echo "Beginning Test $1" if [ $SETSID -eq 1 ]; then @@ -155,6 +160,7 @@ function pyapi_run_single_test() { return 1 } + function pyapi_test_input() { echo "Beginning Input Testing (Copy vs Zero Copy)" pyapi_run_single_test "Input Testing (Zero Copy)" "" "INPUT" input_zerocopy "$INPUT_TESTING_NTESTS" "$INPUT_TESTING_SIZES" diff --git a/monetdb5/extras/pyapi/pyapi.c b/monetdb5/extras/pyapi/pyapi.c --- a/monetdb5/extras/pyapi/pyapi.c +++ b/monetdb5/extras/pyapi/pyapi.c @@ -532,13 +532,13 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st VERBOSE_MESSAGE("Initializing shared memory.\n"); + assert(memory_size > 0); //create the shared memory for the header MT_lock_set(&pyapiLock, "pyapi.evaluate"); - ptr = create_shared_memory(shm_id, memory_size); + msg = create_shared_memory(shm_id, memory_size, (void**) &ptr); MT_lock_unset(&pyapiLock, "pyapi.evaluate"); - if (ptr == NULL) + if (msg != MAL_SUCCEED) { - msg = createException(MAL, "pyapi.eval", "Failed to initialize shared memory"); GDKfree(pids); process_id = 0; goto wrapup; @@ -613,16 +613,19 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st { //a child failed, get the error message from the child ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[failedprocess * pci->retc + 0]); + char *err_ptr; - char *err_ptr = get_shared_memory(shm_id + 1, descr->bat_size); - if (err_ptr != NULL) - { - msg = createException(MAL, "pyapi.eval", "%s", err_ptr); - release_shared_memory(err_ptr); - } - else - { - msg = createException(MAL, "pyapi.eval", "Error in child process, but no exception was thrown."); + if (descr->bat_size == 0) { + msg = createException(MAL, "pyapi.eval", "Failure in child process with unknown error."); + } else { + MT_lock_set(&pyapiLock, "pyapi.evaluate"); + msg = get_shared_memory(shm_id + 1, descr->bat_size, (void**) &err_ptr); + MT_lock_unset(&pyapiLock, "pyapi.evaluate"); + if (msg == MAL_SUCCEED) + { + msg = createException(MAL, "pyapi.eval", "%s", err_ptr); + release_shared_memory(err_ptr); + } } _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list