Changeset: 9415374da46d for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=9415374da46d Modified Files: clients/Tests/exports.stable.out gdk/gdk.h gdk/gdk_utils.c gdk/gdk_utils.h monetdb5/mal/mal.c monetdb5/mal/mal_profiler.c monetdb5/modules/mal/mal_mapi.c sql/backends/monet5/sql_scenario.c sql/server/sql_mvc.c Branch: embedded Log Message:
merge with default diffs (truncated from 326 to 300 lines): diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out --- a/clients/Tests/exports.stable.out +++ b/clients/Tests/exports.stable.out @@ -233,9 +233,11 @@ size_t GDKmem_cursize(void); void *GDKmmap(const char *path, int mode, size_t len); int GDKms(void); int GDKnr_threads; +void GDKprepareExit(void); void GDKqsort(void *h, void *t, const void *base, size_t n, int hs, int ts, int tpe); void GDKqsort_rev(void *h, void *t, const void *base, size_t n, int hs, int ts, int tpe); void *GDKrealloc(void *pold, size_t size) __attribute__((__warn_unused_result__)); +void GDKregister(MT_Id pid); void GDKreset(int status); void GDKsetenv(str name, str value); ssize_t GDKstrFromStr(unsigned char *dst, const unsigned char *src, ssize_t len); diff --git a/gdk/gdk.h b/gdk/gdk.h --- a/gdk/gdk.h +++ b/gdk/gdk.h @@ -2459,7 +2459,6 @@ VALptr(const ValRecord *v) typedef struct threadStruct { int tid; /* logical ID by MonetDB; val == index into this array + 1 (0 is invalid) */ - int waitfor; /* waitfor on exit */ MT_Id pid; /* physical thread id (pointer-sized) from the OS thread library */ str name; ptr data[THREADDATA]; diff --git a/gdk/gdk_utils.c b/gdk/gdk_utils.c --- a/gdk/gdk_utils.c +++ b/gdk/gdk_utils.c @@ -1240,15 +1240,44 @@ GDKexiting(void) return stopped; } +void +GDKprepareExit(void) +{ + if (ATOMIC_TAS(GDKstopped, GDKstoppedLock) != 0) + return; + if (GDKvmtrim_id) + MT_join_thread(GDKvmtrim_id); +} + +static struct serverthread { + struct serverthread *next; + MT_Id pid; +} *serverthread; + +/* Register a thread that should be waited for in GDKreset. The + * thread must exit by itself when GDKexiting() returns true. */ +void +GDKregister(MT_Id pid) +{ + struct serverthread *st; + + if ((st = GDKmalloc(sizeof(struct serverthread))) == NULL) + return; + st->pid = pid; + MT_lock_set(&GDKthreadLock); + st->next = serverthread; + serverthread = st; + MT_lock_unset(&GDKthreadLock); +} + /* coverity[+kill] */ void GDKreset(int status) { MT_Id pid = MT_getpid(); Thread t, s; + struct serverthread *st; - if (ATOMIC_TAS(GDKstopped, GDKstoppedLock) != 0) - return ; if( GDKkey){ BBPunfix(GDKkey->batCacheid); GDKkey = 0; @@ -1257,16 +1286,15 @@ GDKreset(int status) BBPunfix(GDKval->batCacheid); GDKval = 0; } - if (GDKvmtrim_id) - MT_join_thread(GDKvmtrim_id); MT_lock_set(&GDKthreadLock); - for (t = GDKthreads, s = t + THREADS; t < s; t++) - if (t->pid && t->pid != pid && t->waitfor) { - MT_lock_unset(&GDKthreadLock); - MT_join_thread(t->pid); - MT_lock_set(&GDKthreadLock); - } + for (st = serverthread; st; st = serverthread) { + MT_lock_unset(&GDKthreadLock); + MT_join_thread(st->pid); + MT_lock_set(&GDKthreadLock); + serverthread = st->next; + GDKfree(st); + } MT_lock_unset(&GDKthreadLock); if (status == 0) { @@ -1335,6 +1363,7 @@ GDKexit(int status) /* no database lock, so no threads, so exit now */ exit(status); } + GDKprepareExit(); GDKreset(status); MT_exit_thread(-1); } diff --git a/gdk/gdk_utils.h b/gdk/gdk_utils.h --- a/gdk/gdk_utils.h +++ b/gdk/gdk_utils.h @@ -94,6 +94,8 @@ gdk_export void GDKexit(int status); #endif gdk_export int GDKexiting(void); +gdk_export void GDKregister(MT_Id pid); +gdk_export void GDKprepareExit(void); gdk_export void GDKreset(int status); gdk_export const char *GDKversion(void); diff --git a/monetdb5/mal/mal.c b/monetdb5/mal/mal.c --- a/monetdb5/mal/mal.c +++ b/monetdb5/mal/mal.c @@ -106,6 +106,7 @@ int mal_init(void){ initProfiler(); return 0; } + /* * Upon exit we should attempt to remove all allocated memory explicitly. * This seemingly superflous action is necessary to simplify analyis of @@ -116,13 +117,15 @@ int mal_init(void){ * activity first. * This function should be called after you have issued sql_reset(); */ -void mserver_reset(void){ +void mserver_reset(void) +{ str err = 0; - MCstopClients(0); - setHeartbeat(-1); - stopProfiler(); - RECYCLEdrop(mal_clients); + GDKprepareExit(); + MCstopClients(0); + setHeartbeat(-1); + stopProfiler(); + RECYCLEdrop(mal_clients); AUTHreset(); if ((err = msab_wildRetreat()) != NULL) { fprintf(stderr, "!%s", err); @@ -141,18 +144,17 @@ void mserver_reset(void){ mal_client_reset(); mal_module_reset(); mal_module_reset(); - mal_linker_reset(); + mal_linker_reset(); mal_resource_reset(); mal_runtime_reset(); mal_scenario_reset(); - + memset((char*)monet_cwd,0, sizeof(monet_cwd)); monet_memory = 0; memset((char*)monet_characteristics,0, sizeof(monet_characteristics)); - mal_trace = 0; -/* No need to clean up the namespace, it will simply be extended upon restart - mal_namespace_reset(); -*/ + mal_trace = 0; + /* No need to clean up the namespace, it will simply be extended + * upon restart mal_namespace_reset(); */ } diff --git a/monetdb5/mal/mal_profiler.c b/monetdb5/mal/mal_profiler.c --- a/monetdb5/mal/mal_profiler.c +++ b/monetdb5/mal/mal_profiler.c @@ -953,19 +953,17 @@ static void profilerHeartbeat(void *dumm int t; (void) dummy; - while (ATOMIC_GET(hbrunning, mal_beatLock) && !GDKexiting()) { + for (;;) { /* wait until you need this info */ - while (ATOMIC_GET(hbdelay, mal_beatLock) == 0 || eventstream == NULL) { - for (t = 1000; t > 0 && ! GDKexiting(); t -= 25) { - MT_sleep_ms(25); - if (!ATOMIC_GET(hbrunning, mal_beatLock)) - return; - } + while (ATOMIC_GET(hbdelay, mal_beatLock) == 0 || eventstream == NULL) { + if (GDKexiting() || !ATOMIC_GET(hbrunning, mal_beatLock)) + return; + MT_sleep_ms(25); } - for (t = (int) ATOMIC_GET(hbdelay, mal_beatLock); t > 0 && !GDKexiting(); t -= 25) { + for (t = (int) ATOMIC_GET(hbdelay, mal_beatLock); t > 0; t -= 25) { + if (GDKexiting() || !ATOMIC_GET(hbrunning, mal_beatLock)) + return; MT_sleep_ms(t > 25 ? 25 : t); - if (!ATOMIC_GET(hbrunning, mal_beatLock)) - return; } profilerHeartbeatEvent("ping"); } @@ -980,7 +978,7 @@ void setHeartbeat(int delay) return; } if (delay <= 10) - hbdelay =10; + delay = 10; ATOMIC_SET(hbdelay, (ATOMIC_TYPE) delay, mal_beatLock); } @@ -995,11 +993,11 @@ void initHeartbeat(void) #ifdef NEED_MT_LOCK_INIT ATOMIC_INIT(mal_beatLock, "beatLock"); #endif - hbrunning = 1; + ATOMIC_SET(hbrunning, 1, mal_beatLock); if (MT_create_thread(&hbthread, profilerHeartbeat, NULL, MT_THR_JOINABLE) < 0) { /* it didn't happen */ hbthread = 0; - hbrunning = 0; + ATOMIC_SET(hbrunning, 0, mal_beatLock); } } diff --git a/monetdb5/modules/mal/mal_mapi.c b/monetdb5/modules/mal/mal_mapi.c --- a/monetdb5/modules/mal/mal_mapi.c +++ b/monetdb5/modules/mal/mal_mapi.c @@ -366,7 +366,7 @@ SERVERlistenThread(SOCKET *Sock) data = GDKmalloc(sizeof(*data)); data->in = socket_rastream(msgsock, "Server read"); data->out = socket_wastream(msgsock, "Server write"); - if (MT_create_thread(&tid, doChallenge, data, MT_THR_DETACHED)) { + if (MT_create_thread(&tid, doChallenge, data, MT_THR_JOINABLE)) { mnstr_printf(data->out, "!internal server error (cannot fork new " "client thread), please try again later\n"); mnstr_flush(data->out); @@ -374,6 +374,7 @@ SERVERlistenThread(SOCKET *Sock) "cannot fork new client thread"); GDKfree(data); } + GDKregister(tid); } while (!ATOMIC_GET(serverexiting, atomicLock) && !GDKexiting()); (void) ATOMIC_DEC(nlistener, atomicLock); @@ -440,7 +441,7 @@ SERVERlisten(int *Port, str *Usockfile, SOCKLEN length = 0; int on = 1; int i = 0; - MT_Id pid, *pidp = &pid; + MT_Id pid; int port; int maxusers; char *usockfile; @@ -647,12 +648,13 @@ SERVERlisten(int *Port, str *Usockfile, psock[1] = INVALID_SOCKET; #endif psock[2] = INVALID_SOCKET; - if (MT_create_thread(pidp, (void (*)(void *)) SERVERlistenThread, psock, MT_THR_DETACHED) != 0) { + if (MT_create_thread(&pid, (void (*)(void *)) SERVERlistenThread, psock, MT_THR_JOINABLE) != 0) { GDKfree(psock); if (usockfile) GDKfree(usockfile); throw(MAL, "mal_mapi.listen", OPERATION_FAILED ": starting thread failed"); } + GDKregister(pid); #ifdef DEBUG_SERVER gethostname(host, (int) 512); snprintf(msg, (int) 512, "#Ready to accept connections on %s:%d\n", host, port); @@ -758,7 +760,7 @@ SERVERclient(void *res, const Stream *In data = GDKmalloc(sizeof(*data)); data->in = *In; data->out = *Out; - if (MT_create_thread(&tid, doChallenge, data, MT_THR_DETACHED)) { + if (MT_create_thread(&tid, doChallenge, data, MT_THR_JOINABLE)) { mnstr_printf(data->out, "!internal server error (cannot fork new " "client thread), please try again later\n"); mnstr_flush(data->out); @@ -766,6 +768,7 @@ SERVERclient(void *res, const Stream *In "cannot fork new client thread"); free(data); } + GDKregister(tid); return MAL_SUCCEED; } diff --git a/sql/backends/monet5/sql_scenario.c b/sql/backends/monet5/sql_scenario.c --- a/sql/backends/monet5/sql_scenario.c +++ b/sql/backends/monet5/sql_scenario.c @@ -256,10 +256,12 @@ SQLinit(void) if (MT_create_thread(&sqllogthread, (void (*)(void *)) mvc_logmanager, NULL, MT_THR_JOINABLE) != 0) { throw(SQL, "SQLinit", "Starting log manager failed"); } + GDKregister(sqllogthread); #if 0 _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list