Changeset: 0561eaacb72e for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0561eaacb72e Added Files: sql/backends/monet5/Tests/pyapi19.sql sql/backends/monet5/Tests/pyapi19.stable.err sql/backends/monet5/Tests/pyapi19.stable.out Modified Files: gdk/shared_memory.c gdk/shared_memory.h monetdb5/extras/pyapi/connection.c monetdb5/extras/pyapi/connection.h monetdb5/extras/pyapi/lazyarray.c monetdb5/extras/pyapi/pyapi.c monetdb5/extras/pyapi/pytypes.h sql/backends/monet5/Tests/All sql/backends/monet5/Tests/pyapi25.sql sql/backends/monet5/Tests/pyapi25.stable.err sql/backends/monet5/Tests/pyapi25.stable.out tools/embeddedpy/embeddedpy.c Branch: pyapi Log Message:
When a loopback query occurs in a forked process, send the query back to the main mserver and execute the query there, then ship the result back using shared memory or mmap. Because all queries are executed in the main process this resolves all locking issues, and we can now perform any type of query. diffs (truncated from 1962 to 300 lines): diff --git a/gdk/shared_memory.c b/gdk/shared_memory.c --- a/gdk/shared_memory.c +++ b/gdk/shared_memory.c @@ -23,6 +23,7 @@ #include <sched.h> #include <errno.h> #include <sys/sem.h> +#include <time.h> static lng *shm_memory_ids; static void **shm_ptrs; @@ -33,14 +34,22 @@ static int shm_is_initialized = false; static MT_Lock release_memory_lock; static key_t base_key = 800000000; +#define SHM_SHARED 1 +#define SHM_MEMMAP 2 +#define SHM_EITHER 3 -str init_shared_memory(int id, size_t size, void **ptr, int flags); +static int memtype = SHM_SHARED; + + +str init_shared_memory(int id, size_t size, void **return_ptr, int flags, bool reg, lng *return_shmid); void store_shared_memory(lng memory_id, void *ptr); str release_shared_memory_id(int memory_id, void *ptr); -str init_mmap_memory(int id, size_t size, void **ptr, int flags); +str init_mmap_memory(int id, size_t size, void **return_ptr, int flags, bool reg, lng *return_shmid); str release_mmap_memory(void *ptr, size_t size); +str init_process_semaphore(int id, int count, int flags, int *semid); + str initialize_shared_memory(void) { if (shm_is_initialized) //maybe this should just return MAL_SUCCEED as well @@ -104,11 +113,13 @@ int get_unique_shared_memory_id(int offs return id; } -str init_mmap_memory(int id, size_t size, void **return_ptr, int flags) +str init_mmap_memory(int id, size_t size, void **return_ptr, int flags, bool reg, lng *return_shmid) { char address[100]; void *ptr; int fd, result; + // TODO: memmap shouldn't be in tmp directory + // TODO: we should just use GDKmmap, try to get that to work snprintf(address, 100, "/tmp/temp_pyapi_mmap_%d", id); fd = open(address, flags | O_RDWR, MONETDB_MODE); @@ -141,14 +152,16 @@ str init_mmap_memory(int id, size_t size errno = 0; return createException(MAL, "shared_memory.get", "Failure in mmap(NULL, %zu, PROT_WRITE, MAP_SHARED, %d, 0): %s", size, fd, err); } - store_shared_memory(size, ptr); + if (reg) store_shared_memory(size, ptr); if (return_ptr != NULL) *return_ptr = ptr; + if (return_shmid != NULL) *return_shmid = id; return MAL_SUCCEED; } str release_mmap_memory(void *ptr, size_t size) { int ret; + // TODO: Actually delete files on disk ret = munmap(ptr, size); if (ret != 0) { char *err = strerror(errno); @@ -158,19 +171,46 @@ str release_mmap_memory(void *ptr, size_ return MAL_SUCCEED; } -str create_shared_memory(int id, size_t size, void **return_ptr) +str release_shared_memory_ptr(void *ptr) +{ + if (shmdt(ptr) == -1) + { + char *err = strerror(errno); + errno = 0; + return createException(MAL, "shared_memory.release", "Error calling shmdt(ptr:%p): %s", ptr, err); + } + return MAL_SUCCEED; +} + +str create_shared_memory(int id, size_t size, bool reg, void **return_ptr, lng *return_shmid) { char *shared, *mmap; - if ((shared = init_shared_memory(id, size, return_ptr, IPC_CREAT)) == MAL_SUCCEED) return MAL_SUCCEED; - if ((mmap = init_mmap_memory(id, size, return_ptr, O_CREAT)) == MAL_SUCCEED) return MAL_SUCCEED; + if (memtype == SHM_SHARED) + { + return init_shared_memory(id, size, return_ptr, IPC_CREAT, reg, return_shmid); + } + if (memtype == SHM_MEMMAP) + { + return init_mmap_memory(id, size, return_ptr, O_CREAT, reg, return_shmid); + } + if ((shared = init_shared_memory(id, size, return_ptr, IPC_CREAT, reg, return_shmid)) == MAL_SUCCEED) return MAL_SUCCEED; + if ((mmap = init_mmap_memory(id, size, return_ptr, O_CREAT, reg, return_shmid)) == MAL_SUCCEED) return MAL_SUCCEED; return createException(MAL, "shared_memory.release_mmap_memory", "Failed to create shared memory or mmap space.\nshared memory error: %s\nmmap error: %s", shared, mmap); } -str get_shared_memory(int id, size_t size, void **return_ptr) +str get_shared_memory(int id, size_t size, bool reg, void **return_ptr, lng *return_shmid) { char *shared, *mmap; - if ((shared = init_shared_memory(id, size, return_ptr, 0)) == MAL_SUCCEED) return MAL_SUCCEED; - if ((mmap = init_mmap_memory(id, size, return_ptr, 0)) == MAL_SUCCEED) return MAL_SUCCEED; + if (memtype == SHM_SHARED) + { + return init_shared_memory(id, size, return_ptr, 0, reg, return_shmid); + } + if (memtype == SHM_MEMMAP) + { + return init_mmap_memory(id, size, return_ptr, 0, reg, return_shmid); + } + if ((shared = init_shared_memory(id, size, return_ptr, 0, reg, return_shmid)) == MAL_SUCCEED) return MAL_SUCCEED; + if ((mmap = init_mmap_memory(id, size, return_ptr, 0, reg, return_shmid)) == MAL_SUCCEED) return MAL_SUCCEED; return createException(MAL, "shared_memory.release_mmap_memory", "Failed to get shared memory or mmap space.\nshared memory error: %s\nmmap error: %s", shared, mmap); } @@ -181,7 +221,7 @@ str ftok_enhanced(int id, key_t *return_ return MAL_SUCCEED; } -str init_shared_memory(int id, size_t size, void **return_ptr, int flags) +str init_shared_memory(int id, size_t size, void **return_ptr, int flags, bool reg, lng *return_shmid) { lng shmid; void *ptr; @@ -205,16 +245,19 @@ str init_shared_memory(int id, size_t si return createException(MAL, "shared_memory.get", "Error calling shmget(key:%zu,size:%zu,flags:%d): %s", (size_t)key, size, flags, err); } - //check if the shared memory segment is already created, if it is we do not need to add it to the table and can simply return the pointer - for(i = 0; i < shm_current_id; i++) - { - if (shm_memory_ids[i] == shmid) + if (reg) { + //check if the shared memory segment is already created, if it is we do not need to add it to the table and can simply return the pointer + for(i = 0; i < shm_current_id; i++) { - if (return_ptr != NULL) *return_ptr = shm_ptrs[i]; - return MAL_SUCCEED; + if (shm_memory_ids[i] == shmid) + { + if (return_ptr != NULL) *return_ptr = shm_ptrs[i]; + return MAL_SUCCEED; + } } } + ptr = shmat(shmid, NULL, 0); if (ptr == (void*)-1) { @@ -223,7 +266,8 @@ str init_shared_memory(int id, size_t si return createException(MAL, "shared_memory.get", "Error calling shmat(id:%lld,NULL,0): %s", shmid, err); } - store_shared_memory(shmid, ptr); + if (reg) store_shared_memory(shmid, ptr); + if (return_shmid != NULL) *return_shmid = shmid; if (return_ptr != NULL) *return_ptr = ptr; return MAL_SUCCEED; } @@ -248,30 +292,138 @@ str release_shared_memory(void *ptr) } } MT_lock_unset(&release_memory_lock, "release_memory_lock"); + return release_shared_memory_shmid(memory_id, ptr); +} - assert(memory_id); - //actually release the memory at the given ID - if (release_shared_memory_id(memory_id, ptr) == MAL_SUCCEED) return MAL_SUCCEED; +str release_shared_memory_shmid(int memory_id, void *ptr) +{ + assert(memory_id); + + if (memtype == SHM_SHARED) + { + return release_shared_memory_id(memory_id, ptr); + } + if (memtype == SHM_MEMMAP) + { + return release_mmap_memory(ptr, memory_id); + } + if (release_shared_memory_id(memory_id, ptr) == MAL_SUCCEED) return MAL_SUCCEED; if (release_mmap_memory(ptr, memory_id) == MAL_SUCCEED) return MAL_SUCCEED; return createException(MAL, "shared_memory.release", "Failed to release shared memory."); } str release_shared_memory_id(int memory_id, void *ptr) -{ +{ + if (shmdt(ptr) == -1) + { + char *err = strerror(errno); + errno = 0; + return createException(MAL, "shared_memory.release", "Error calling shmdt(ptr:%p): %s", ptr, err); + } if (shmctl(memory_id, IPC_RMID, NULL) == -1) { char *err = strerror(errno); errno = 0; return createException(MAL, "shared_memory.release", "Error calling shmctl(id:%d,IPC_RMID,NULL): %s", memory_id, err); } - if (shmdt(ptr) == -1) - { + return MAL_SUCCEED; +} + +str init_process_semaphore(int id, int count, int flags, int *semid) +{ + str msg = MAL_SUCCEED; + int key; + msg = ftok_enhanced(id, &key); + if (msg != MAL_SUCCEED) { + return msg; + } + *semid = semget(key, count, flags | 0666); + if (*semid < 0) { char *err = strerror(errno); errno = 0; - return createException(MAL, "shared_memory.release", "Error calling shmdt(ptr:%p): %s", ptr, err); - } - return MAL_SUCCEED; + return createException(MAL, "semaphore.init", "Error calling semget(key:%d,nsems:%d,semflg:%d): %s", key, count, flags | 0666, err); + } + return msg; } + +str create_process_semaphore(int id, int count, int *semid) +{ + return init_process_semaphore(id, count, IPC_CREAT, semid); +} + +str get_process_semaphore(int sem_id, int count, int *semid) +{ + return init_process_semaphore(sem_id, count, 0, semid); +} + +str get_semaphore_value(int sem_id, int number, int *semval) +{ + *semval = semctl(sem_id, number, GETVAL, 0); + if (*semval < 0) + { + char *err = strerror(errno); + errno = 0; + return createException(MAL, "semaphore.init", "Error calling semctl(semid:%d,semnum:%d,cmd:%d,param:0): %s", sem_id, number, GETVAL, err); + } + return MAL_SUCCEED; +} + +str change_semaphore_value(int sem_id, int number, int change) +{ + str msg = MAL_SUCCEED; + struct sembuf buffer; + buffer.sem_num = number; + buffer.sem_op = change; + buffer.sem_flg = 0; + + if (semop(sem_id, &buffer, 1) < 0) + { + char *err = strerror(errno); + errno = 0; + return createException(MAL, "semaphore.init", "Error calling semop(semid:%d, sops: { sem_num:%d, sem_op:%d, sem_flag: %d }, nsops:1): %s", sem_id, number, change, 0, err); + } + return msg; +} + +str change_semaphore_value_timeout(int sem_id, int number, int change, int timeout_mseconds, bool *succeed) +{ + str msg = MAL_SUCCEED; + struct timespec timeout; + struct sembuf buffer; + buffer.sem_num = number; + buffer.sem_op = change; + buffer.sem_flg = 0; + *succeed = false; + + timeout.tv_sec = (timeout_mseconds / 1000); + timeout.tv_nsec = (timeout_mseconds % 1000) * 1000; + + if (semtimedop(sem_id, &buffer, 1, &timeout) < 0) + { + if (errno == EAGAIN) { + errno = 0; + return MAL_SUCCEED; + } else { + char *err = strerror(errno); + errno = 0; + return createException(MAL, "semaphore.init", "Error calling semtimedop(semid:%d, sops: { sem_num:%d, sem_op:%d, sem_flag: %d }, nsops:1): %s", sem_id, number, change, 0, err); + } + } + *succeed = true; + return msg; +} _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list