Changeset: 08d8961ace58 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=08d8961ace58
Modified Files:
        gdk/shared_memory.c
        gdk/shared_memory.h
        monetdb5/extras/pyapi/Benchmarks/pyapi_test.sh
        monetdb5/extras/pyapi/pyapi.c
        monetdb5/extras/pyapi/pyapi.h
        monetdb5/extras/pyapi/pyapi.mal
Branch: pyapi
Log Message:

Merge.


diffs (truncated from 475 to 300 lines):

diff --git a/gdk/shared_memory.c b/gdk/shared_memory.c
--- a/gdk/shared_memory.c
+++ b/gdk/shared_memory.c
@@ -3,8 +3,7 @@
 
 #ifndef _WIN32
 
-#include "monetdb_config.h"
-#include "gdk.h"
+#include "../monetdb5/mal/mal_exception.h"
 
 #include <stdlib.h>
 #include <assert.h>
@@ -28,25 +27,33 @@ static int shm_unique_id = 1;
 static int shm_current_id = 0;
 static int shm_max_id = 32;
 static int shm_is_initialized = false;
-static char shm_keystring[] = ".";
+static char shm_keystring[] = BINDIR;
+static MT_Lock release_memory_lock;
+static key_t base_key = 800000000;
 
-void *init_shared_memory(int id, size_t size, int flags);
+
+str init_shared_memory(int id, size_t size, void **ptr, int flags);
 void store_shared_memory(int memory_id, void *ptr);
-int release_shared_memory_id(int memory_id, void *ptr);
+str release_shared_memory_id(int memory_id, void *ptr);
 
 int init_process_semaphore(int id, int count, int flags);
 
-void initialize_shared_memory(void)
+str initialize_shared_memory(void)
 {
-       if (shm_is_initialized) return;
-
+       if (shm_is_initialized) //maybe this should just return MAL_SUCCEED as 
well
+        return createException(MAL, "shared_memory.init", "Attempting to 
initialize shared memory when it was already initialized.");
+                
+    //initialize the pointer to memory ID structure
        shm_ptrs = malloc(shm_max_id * sizeof(void*));
        shm_memory_ids = malloc(shm_max_id * sizeof(int));
        shm_current_id = 0;
        shm_max_id = 32;
        shm_unique_id = 2;
 
-       shm_is_initialized = true;
+    MT_lock_init(&release_memory_lock, "release_memory_lock");
+
+    shm_is_initialized = true;
+    return MAL_SUCCEED;
 }
 
 void store_shared_memory(int memory_id, void *ptr)
@@ -79,7 +86,6 @@ void store_shared_memory(int memory_id, 
                shm_max_id *= 2;
        }
 
-
        shm_memory_ids[shm_current_id] = memory_id;
        shm_ptrs[shm_current_id] = ptr;
        shm_current_id++;
@@ -95,26 +101,34 @@ int get_unique_shared_memory_id(int offs
        return id;
 }
 
-void* create_shared_memory(int id, size_t size)
+str create_shared_memory(int id, size_t size, void **return_ptr)
 {
-       return init_shared_memory(id, size, IPC_CREAT);
+       return init_shared_memory(id, size, return_ptr, IPC_CREAT);
 }
 
-void *get_shared_memory(int id, size_t size)
+str get_shared_memory(int id, size_t size, void **return_ptr)
 {
-       return init_shared_memory(id, size, 0);
+       return init_shared_memory(id, size, return_ptr, 0);
 }
 
-void *init_shared_memory(int id, size_t size, int flags)
+str ftok_enhanced(int id, key_t *return_key);
+str ftok_enhanced(int id, key_t *return_key)
+{
+    *return_key = base_key + id;
+    return MAL_SUCCEED;
+}
+
+str init_shared_memory(int id, size_t size, void **return_ptr, int flags)
 {
     int shmid;
     void *ptr;
     int i;
-       int key = ftok(shm_keystring, id);
-    if (key == (key_t) -1)
+    key_t key;
+
+       str msg = ftok_enhanced(id, &key);
+    if (msg != MAL_SUCCEED)
     {
-        perror("ftok");
-        return NULL;
+        return msg;
     }
 
        assert(shm_is_initialized);
@@ -122,8 +136,9 @@ void *init_shared_memory(int id, size_t 
        shmid = shmget(key, size, flags | 0666);
     if (shmid < 0)
     {
-       perror("shmget");
-        return NULL;
+        char *err = strerror(errno);
+        errno = 0;
+        return createException(MAL, "shared_memory.get", "Error calling 
shmget(key:%d,size:%zu,flags:%d): %s", key, size, flags, err);
     }
 
     //check if the shared memory segment is already created, if it is we do 
not need to add it to the table and can simply return the pointer
@@ -131,28 +146,32 @@ void *init_shared_memory(int id, size_t 
     {
         if (shm_memory_ids[i] == shmid)
         {
-            return shm_ptrs[i];
+            if (return_ptr != NULL) *return_ptr = shm_ptrs[i];
+            return MAL_SUCCEED;
         }
     }
 
        ptr = shmat(shmid, NULL, 0);
     if (ptr == (void*)-1)
     {
-       perror("shmat");
-        return NULL;
+        char *err = strerror(errno);
+        errno = 0;
+        return createException(MAL, "shared_memory.get", "Error calling 
shmat(id:%d,NULL,0): %s", shmid, err);
     }
 
     store_shared_memory(shmid, ptr);
-    return ptr;
+    if (return_ptr != NULL) *return_ptr = ptr;
+    return MAL_SUCCEED;
 }
 
-int release_shared_memory(void *ptr)
+str release_shared_memory(void *ptr)
 {
        int i = 0;
        int memory_id = -1;
 
     assert(shm_is_initialized);
 
+    MT_lock_set(&release_memory_lock, "release_memory_lock");
        //find the memory_id accompanying the given pointer in the structure
        for(i = 0; i < shm_current_id; i++)
        {
@@ -164,25 +183,28 @@ int release_shared_memory(void *ptr)
                        break;
                }
        }
+    MT_lock_unset(&release_memory_lock, "release_memory_lock");
 
        assert(memory_id);
        //actually release the memory at the given ID
        return release_shared_memory_id(memory_id, ptr);
 }
 
-int release_shared_memory_id(int memory_id, void *ptr)
+str release_shared_memory_id(int memory_id, void *ptr)
 {
        if (shmctl(memory_id, IPC_RMID, NULL) == -1)
        {
-       perror("shmctl");
-        return false;
+        char *err = strerror(errno);
+        errno = 0;
+        return createException(MAL, "shared_memory.release", "Error calling 
shmctl(id:%d,IPC_RMID,NULL): %s", memory_id, err);
        }
        if (shmdt(ptr) == -1)
        {
-       perror("shmdt");
-        return false;
+        char *err = strerror(errno);
+        errno = 0;
+        return createException(MAL, "shared_memory.release", "Error calling 
shmdt(ptr:%p): %s", ptr, err);
        }
-       return true;
+       return MAL_SUCCEED;
 }
 
 int init_process_semaphore(int id, int count, int flags)
diff --git a/gdk/shared_memory.h b/gdk/shared_memory.h
--- a/gdk/shared_memory.h
+++ b/gdk/shared_memory.h
@@ -14,18 +14,21 @@
 #ifndef _SHAREDMEMORY_LIB_
 #define _SHAREDMEMORY_LIB_
 
+#include "monetdb_config.h"
+#include "gdk.h"
+
 #include <stddef.h>
 
 //! Initialize the shared memory module
-void initialize_shared_memory(void);
+str initialize_shared_memory(void);
 //! Not thread safe
-void* create_shared_memory(int id, size_t size);
-//! Not thread safe
-int release_shared_memory(void *ptr);
+str create_shared_memory(int id, size_t size, void **return_ptr);
+//! This is thread safe
+str release_shared_memory(void *ptr);
 //! Not thread safe
 int get_unique_shared_memory_id(int offset);
 //! This is thread safe
-void *get_shared_memory(int id, size_t size);
+str get_shared_memory(int id, size_t size, void **return_ptr);
 
 //! Returns semaphore ID, id = unique id within the program, count = amount of 
semaphores
 int create_process_semaphore(int id, int count);
diff --git a/monetdb5/extras/pyapi/Benchmarks/pyapi_test.sh 
b/monetdb5/extras/pyapi/Benchmarks/pyapi_test.sh
--- a/monetdb5/extras/pyapi/Benchmarks/pyapi_test.sh
+++ b/monetdb5/extras/pyapi/Benchmarks/pyapi_test.sh
@@ -11,7 +11,7 @@ export MSERVERTEST='netstat -ant | grep 
 # Testing parameters
 # Input test (zero copy vs copy)
 # The input sizes to test (in MB)
-export INPUT_TESTING_SIZES="0.1 1 10 100 1000"
+export INPUT_TESTING_SIZES="10"
 # Amount of tests to run for each size
 export INPUT_TESTING_NTESTS=10
 
@@ -130,6 +130,11 @@ function pyapi_build() {
     fi
 }
 
+function pyapi_run_single_test_echo() {
+    echo \$PYAPI_BUILD_DIR/bin/mserver5 --set mapi_port=\$MSERVER_PORT --set 
embedded_py=true --set enable_pyverbose=true --set 
pyapi_benchmark_output=\$PYAPI_OUTPUT_DIR/temp_output.tsv $2
+    echo python \$PYAPI_TESTFILE $3 $4 $5 \$MSERVER_PORT $6
+}
+
 function pyapi_run_single_test() {
     echo "Beginning Test $1"
     if [ $SETSID -eq 1 ]; then
@@ -155,6 +160,7 @@ function pyapi_run_single_test() {
     return 1
 }
 
+
 function pyapi_test_input() {
     echo "Beginning Input Testing (Copy vs Zero Copy)"
     pyapi_run_single_test "Input Testing (Zero Copy)" "" "INPUT" 
input_zerocopy "$INPUT_TESTING_NTESTS" "$INPUT_TESTING_SIZES"
diff --git a/monetdb5/extras/pyapi/pyapi.c b/monetdb5/extras/pyapi/pyapi.c
--- a/monetdb5/extras/pyapi/pyapi.c
+++ b/monetdb5/extras/pyapi/pyapi.c
@@ -532,13 +532,13 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st
 
         VERBOSE_MESSAGE("Initializing shared memory.\n");
 
+        assert(memory_size > 0);
         //create the shared memory for the header
         MT_lock_set(&pyapiLock, "pyapi.evaluate");
-        ptr = create_shared_memory(shm_id, memory_size); 
+        msg = create_shared_memory(shm_id, memory_size, (void**) &ptr); 
         MT_lock_unset(&pyapiLock, "pyapi.evaluate");
-        if (ptr == NULL) 
+        if (msg != MAL_SUCCEED) 
         {
-            msg = createException(MAL, "pyapi.eval", "Failed to initialize 
shared memory");
             GDKfree(pids);
             process_id = 0;
             goto wrapup;
@@ -613,16 +613,19 @@ str PyAPIeval(MalBlkPtr mb, MalStkPtr st
             {
                 //a child failed, get the error message from the child
                 ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[failedprocess 
* pci->retc + 0]);
+                char *err_ptr;
 
-                char *err_ptr = get_shared_memory(shm_id + 1, descr->bat_size);
-                if (err_ptr != NULL)
-                {
-                    msg = createException(MAL, "pyapi.eval", "%s", err_ptr);
-                    release_shared_memory(err_ptr);
-                }
-                else
-                {
-                    msg = createException(MAL, "pyapi.eval", "Error in child 
process, but no exception was thrown.");
+                if (descr->bat_size == 0) {
+                    msg = createException(MAL, "pyapi.eval", "Failure in child 
process with unknown error.");
+                } else {
+                    MT_lock_set(&pyapiLock, "pyapi.evaluate");
+                    msg = get_shared_memory(shm_id + 1, descr->bat_size, 
(void**) &err_ptr);
+                    MT_lock_unset(&pyapiLock, "pyapi.evaluate");
+                    if (msg == MAL_SUCCEED)
+                    {
+                        msg = createException(MAL, "pyapi.eval", "%s", 
err_ptr);
+                        release_shared_memory(err_ptr);
+                    }
                 }
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to