Changeset: 21f8c1613294 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/21f8c1613294
Modified Files:
        tools/merovingian/ChangeLog.Jul2021
        tools/merovingian/daemon/forkmserver.c
        tools/merovingian/daemon/handlers.c
        tools/merovingian/daemon/merovingian.c
        tools/merovingian/daemon/merovingian.h
        tools/merovingian/daemon/multiplex-funnel.c
Branch: Jul2021
Log Message:

Combine repeated log messages into a single one.


diffs (truncated from 388 to 300 lines):

diff --git a/tools/merovingian/ChangeLog.Jul2021 
b/tools/merovingian/ChangeLog.Jul2021
--- a/tools/merovingian/ChangeLog.Jul2021
+++ b/tools/merovingian/ChangeLog.Jul2021
@@ -1,6 +1,10 @@
 # ChangeLog file for sql/src/backends/monet5/merovingian
 # This file is updated with mchangelog
 
+* Thu Jun 23 2022 Sjoerd Mullender <sjo...@acm.org>
+- When multiple identical messages are written to the log, write the
+  first one, and combine subsequent ones in a single message.
+
 * Wed Jun 22 2022 Sjoerd Mullender <sjo...@acm.org>
 - Fixed a leak where the log file wasn't closed when it was reopened
   after a log rotation (SIGHUP signal).
diff --git a/tools/merovingian/daemon/forkmserver.c 
b/tools/merovingian/daemon/forkmserver.c
--- a/tools/merovingian/daemon/forkmserver.c
+++ b/tools/merovingian/daemon/forkmserver.c
@@ -393,8 +393,8 @@ forkMserver(const char *database, sabdb*
                /* fill in the rest of the dpair entry */
                pthread_mutex_lock(&_mero_topdp_lock);
 
-               dp->out = pfdo[0];
-               dp->err = pfde[0];
+               dp->input[0].fd = pfdo[0];
+               dp->input[1].fd = pfde[0];
                dp->type = MEROFUN;
                dp->pid = getpid();
                dp->flag = 0;
@@ -745,9 +745,9 @@ forkMserver(const char *database, sabdb*
                exit(1);
        } else if (pid > 0) {
                /* parent: fine, let's add the pipes for this child */
-               dp->out = pfdo[0];
+               dp->input[0].fd = pfdo[0];
                close(pfdo[1]);
-               dp->err = pfde[0];
+               dp->input[1].fd = pfde[0];
                close(pfde[1]);
                dp->type = MERODB;
                dp->pid = pid;
diff --git a/tools/merovingian/daemon/handlers.c 
b/tools/merovingian/daemon/handlers.c
--- a/tools/merovingian/daemon/handlers.c
+++ b/tools/merovingian/daemon/handlers.c
@@ -250,14 +250,14 @@ void reinitialize(void)
                                "caught SIGHUP, closing logfile\n",
                                mytime, (long long int)_mero_topdp->next->pid);
                fflush(_mero_logfile);
-               _mero_topdp->out = _mero_topdp->err = t;
+               _mero_topdp->input[0].fd = _mero_topdp->input[1].fd = t;
                FILE *f = _mero_logfile;
                if ((_mero_logfile = fdopen(t, "a")) == NULL) {
                        /* revert to old log so that we have something */
                        Mfprintf(f, "%s ERR merovingian[%lld]: "
                                         "failed to reopen logfile\n",
                                         mytime, (long long 
int)_mero_topdp->next->pid);
-                       _mero_topdp->out = _mero_topdp->err = fileno(f);
+                       _mero_topdp->input[0].fd = _mero_topdp->input[1].fd = 
fileno(f);
                        _mero_logfile = f;
                } else {
                        fclose(f);
@@ -293,14 +293,14 @@ childhandler(void)
                while (p != NULL) {
                        if (p->pid == pid) {
                                /* log everything that's still in the pipes */
-                               logFD(p->out, "MSG", p->dbname, (long long 
int)p->pid, _mero_logfile, 1);
+                               logFD(p, 0, "MSG", p->dbname, (long long 
int)p->pid, _mero_logfile, 1);
                                p->pid = -1;    /* indicate the process is dead 
*/
 
                                /* close the descriptors */
-                               close(p->out);
-                               close(p->err);
-                               p->out = -1;
-                               p->err = -1;
+                               close(p->input[0].fd);
+                               close(p->input[1].fd);
+                               p->input[0].fd = -1;
+                               p->input[1].fd = -1;
                                if (WIFEXITED(wstatus)) {
                                        Mfprintf(stdout, "database '%s' (%lld) 
has exited with "
                                                         "exit status %d\n", 
p->dbname,
@@ -354,7 +354,7 @@ segvhandler(int sig) {
                                "\nand include the tail of this log in your 
bugreport with your explanation of "
                                "\nwhat you were doing, if possible.\n"
                                "\nABORTING NOW, YOU HAVE TO MANUALLY KILL ALL 
REMAINING mserver5 PROCESSES\n";
-               if (write(_mero_topdp->err, errmsg, sizeof(errmsg) - 1) >= 0)
+               if (write(_mero_topdp->input[1].fd, errmsg, sizeof(errmsg) - 1) 
>= 0)
                        sync();
        }
        abort();
diff --git a/tools/merovingian/daemon/merovingian.c 
b/tools/merovingian/daemon/merovingian.c
--- a/tools/merovingian/daemon/merovingian.c
+++ b/tools/merovingian/daemon/merovingian.c
@@ -132,39 +132,76 @@ confkeyval *_mero_props = NULL;
 /* funcs */
 
 inline void
-logFD(int fd, char *type, char *dbname, long long int pid, FILE *stream, int 
rest)
+logFD(dpair dp, int fd, const char *type, const char *dbname, long long pid, 
FILE *stream, bool rest)
 {
        time_t now;
        char buf[8096];
-       int len = 0;
+       ssize_t len = 0;
        char *p, *q;
        struct tm *tmp;
        char mytime[20];
-       bool writeident = true;
 
+       assert(fd == 0 || fd == 1);
        do {
-               if ((len = read(fd, buf, sizeof(buf) - 1)) <= 0)
+               do {
+                 repeat:
+                       ssize_t n = read(dp->input[fd].fd, buf + len, 
sizeof(buf) - len - 1);
+                       if (n <= 0)
+                               break;
+                       len += n;
+                       buf[len] = 0;
+               } while (buf[len - 1] != '\n' && len < (ssize_t) sizeof(buf) - 
1);
+               if (len == 0)
                        break;
-               buf[len] = '\0';
-               q = buf;
                now = time(NULL);
                tmp = localtime(&now);
                strftime(mytime, sizeof(mytime), "%Y-%m-%d %H:%M:%S", tmp);
-               while ((p = strchr(q, '\n')) != NULL) {
-                       if (writeident)
-                               fprintf(stream, "%s %s %s[%lld]: ",
-                                               mytime, type, dbname, pid);
-                       *p = '\0';
-                       fprintf(stream, "%s\n", q);
-                       q = p + 1;
-                       writeident = true;
-               }
-               if ((int)(q - buf) < len) {
-                       if (writeident)
-                               fprintf(stream, "%s %s %s[%lld]: ",
-                                               mytime, type, dbname, pid);
-                       writeident = false;
-                       fprintf(stream, "%s\n", q);
+               for (q = buf; *q; q = p + 1) {
+                       p = strchr(q, '\n');
+                       if (p == NULL) {
+                               if (q > buf) {
+                                       /* the last part of the last line 
didn't fit, so
+                                        * just continue reading */
+                                       len = strlen(q);
+                                       memmove(buf, q, len);
+                                       goto repeat;
+                               }
+                               /* we must have received a ridiculously long 
line */
+                               dp->input[fd].ts = now;
+                               dp->input[fd].cnt = 0;
+                               dp->input[fd].buf[0] = '\0';
+                               fprintf(stream, "%s %s %s[%lld]: %s\n",
+                                               mytime, type, dbname, pid, q);
+                               break;
+                       }
+                       if (p == q) {
+                               /* empty line, don't bother */
+                               continue;
+                       }
+                       if (dp->input[fd].cnt < 30000 && 
strncmp(dp->input[fd].buf, q, (size_t) (p + 1 - q)) == 0) {
+                               /* repeat of last message */
+                               dp->input[fd].cnt++;
+                               dp->input[fd].ts = now;
+                       } else {
+                               if (dp->input[fd].cnt > 0) {
+                                       /* last message was repeated but not 
all repeats reported */
+                                       char tmptime[20];
+                                       strftime(tmptime, sizeof(tmptime), 
"%Y-%m-%d %H:%M:%S",
+                                                        
localtime(&dp->input[fd].ts));
+                                       if (dp->input[fd].cnt == 1)
+                                               fprintf(stream, "%s %s 
%s[%lld]: %s",
+                                                               tmptime, type, 
dbname, pid, dp->input[fd].buf);
+                                       else
+                                               fprintf(stream, "%s %s 
%s[%lld]: message repeated %d times: %s",
+                                                               tmptime, type, 
dbname, pid, dp->input[fd].cnt, dp->input[fd].buf);
+                               }
+                               dp->input[fd].ts = now;
+                               dp->input[fd].cnt = 0;
+                               strncpy(dp->input[fd].buf, q, p + 1 - q);
+                               dp->input[fd].buf[p + 1 - q] = '\0';
+                               fprintf(stream, "%s %s %s[%lld]: %.*s\n",
+                                               mytime, type, dbname, pid, 
(int) (p - q), q);
+                       }
                }
        } while (rest);
        fflush(stream);
@@ -208,9 +245,9 @@ logListener(void *x)
                for (w = d; w != NULL; w = w->next) {
                        if (w->pid <= 0)
                                continue;
-                       pfd[nfds++] = (struct pollfd) {.fd = w->out, .events = 
POLLIN};
-                       if (w->out != w->err)
-                               pfd[nfds++] = (struct pollfd) {.fd = w->err, 
.events = POLLIN};
+                       pfd[nfds++] = (struct pollfd) {.fd = w->input[0].fd, 
.events = POLLIN};
+                       if (w->input[0].fd != w->input[1].fd)
+                               pfd[nfds++] = (struct pollfd) {.fd = 
w->input[1].fd, .events = POLLIN};
                        w->flag |= 1;
                }
 #else
@@ -219,12 +256,12 @@ logListener(void *x)
                for (w = d; w != NULL; w = w->next) {
                        if (w->pid <= 0)
                                continue;
-                       FD_SET(w->out, &readfds);
-                       if (nfds < w->out)
-                               nfds = w->out;
-                       FD_SET(w->err, &readfds);
-                       if (nfds < w->err)
-                               nfds = w->err;
+                       FD_SET(w->input[0].fd, &readfds);
+                       if (nfds < w->input[0].fd)
+                               nfds = w->input[0].fd;
+                       FD_SET(w->input[1].fd, &readfds);
+                       if (nfds < w->input[1].fd)
+                               nfds = w->input[1].fd;
                        w->flag |= 1;
                }
 #endif
@@ -257,19 +294,19 @@ logListener(void *x)
                        if (w->pid > 0 && w->flag & 1) {
 #ifdef HAVE_POLL
                                for (int i = 0; i < nfds; i++) {
-                                       if (pfd[i].fd == w->out && 
pfd[i].revents & POLLIN)
-                                               logFD(w->out, "MSG", w->dbname,
+                                       if (pfd[i].fd == w->input[0].fd && 
pfd[i].revents & POLLIN)
+                                               logFD(w, 0, "MSG", w->dbname,
                                                          (long long 
int)w->pid, _mero_logfile, 0);
-                                       else if (pfd[i].fd == w->err && 
pfd[i].revents & POLLIN)
-                                               logFD(w->err, "ERR", w->dbname,
+                                       else if (pfd[i].fd == w->input[1].fd && 
pfd[i].revents & POLLIN)
+                                               logFD(w, 1, "ERR", w->dbname,
                                                          (long long 
int)w->pid, _mero_logfile, 0);
                                }
 #else
-                               if (FD_ISSET(w->out, &readfds) != 0)
-                                       logFD(w->out, "MSG", w->dbname,
+                               if (FD_ISSET(w->input[0].fd, &readfds) != 0)
+                                       logFD(w, 0, "MSG", w->dbname,
                                                  (long long int)w->pid, 
_mero_logfile, 0);
-                               if (w->err != w->out && FD_ISSET(w->err, 
&readfds) != 0)
-                                       logFD(w->err, "ERR", w->dbname,
+                               if (w->input[1].fd != w->input[0].fd && 
FD_ISSET(w->input[1].fd, &readfds) != 0)
+                                       logFD(w, 1, "ERR", w->dbname,
                                                  (long long int)w->pid, 
_mero_logfile, 0);
 #endif
                                w->flag &= ~1;
@@ -734,19 +771,19 @@ main(int argc, char *argv[])
        /* where should our msg output go to? */
        p = getConfVal(_mero_props, "logfile");
        /* write to the given file */
-       _mero_topdp->out = open(p, O_WRONLY | O_APPEND | O_CREAT | O_CLOEXEC,
+       _mero_topdp->input[0].fd = open(p, O_WRONLY | O_APPEND | O_CREAT | 
O_CLOEXEC,
                        S_IRUSR | S_IWUSR);
-       if (_mero_topdp->out == -1) {
+       if (_mero_topdp->input[0].fd == -1) {
                Mfprintf(stderr, "unable to open '%s': %s\n",
                                p, strerror(errno));
                MERO_EXIT_CLEAN(1);
        }
 #if O_CLOEXEC == 0
-       (void) fcntl(_mero_topdp->out, F_SETFD, FD_CLOEXEC);
+       (void) fcntl(_mero_topdp->input[0].fd, F_SETFD, FD_CLOEXEC);
 #endif
-       _mero_topdp->err = _mero_topdp->out;
+       _mero_topdp->input[1].fd = _mero_topdp->input[0].fd;
 
-       if(!(_mero_logfile = fdopen(_mero_topdp->out, "a"))) {
+       if(!(_mero_logfile = fdopen(_mero_topdp->input[0].fd, "a"))) {
                Mfprintf(stderr, "unable to open file descriptor: %s\n",
                                 strerror(errno));
                MERO_EXIT(1);
@@ -769,7 +806,7 @@ main(int argc, char *argv[])
 #if !defined(HAVE_PIPE2) || O_CLOEXEC == 0
        (void) fcntl(pfd[0], F_SETFD, FD_CLOEXEC);
 #endif
-       d->out = pfd[0];
+       d->input[0].fd = pfd[0];
        dup_err = dup2(pfd[1], 1);
        close(pfd[1]);
        if(dup_err == -1) {
@@ -804,7 +841,7 @@ main(int argc, char *argv[])
                Mfprintf(stderr, "unable to dup stderr\n");
                MERO_EXIT(1);
        }
-       d->err = pfd[0];
+       d->input[1].fd = pfd[0];
        dup_err = dup2(pfd[1], 2);
        close(pfd[1]);
        if(dup_err == -1) {
@@ -830,7 +867,7 @@ main(int argc, char *argv[])
        (void) fcntl(pfd[0], F_SETFD, FD_CLOEXEC);
        (void) fcntl(pfd[1], F_SETFD, FD_CLOEXEC);
 #endif
-       d->out = pfd[0];
+       d->input[0].fd = pfd[0];
        if(!(_mero_discout = fdopen(pfd[1], "a"))) {
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to