Hi,

here are two patches for imjournal dealing with module's termination. They should resolve problems with duplicate messages and delayed shutdown. More info is in the patches.

Tomas
>From 837c30918e909759e0164a3dfbcbd62e447b32c1 Mon Sep 17 00:00:00 2001
From: Tomas Heinrich <[email protected]>
Date: Thu, 23 May 2013 20:22:47 +0200
Subject: [PATCH 1/2] bugfix: imjournal's thread shouldn't be canceled

Because of cancelation, the correct possition in the journal was
lost. This resulted in duplicate messages appearing in the logs.
---
 plugins/imjournal/imjournal.c |   10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/plugins/imjournal/imjournal.c b/plugins/imjournal/imjournal.c
index 07ab446..aad7194 100755
--- a/plugins/imjournal/imjournal.c
+++ b/plugins/imjournal/imjournal.c
@@ -546,16 +546,22 @@ finalize_it:
 ENDsetModCnf
 
 
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+	if(eFeat == sFEATURENonCancelInputTermination)
+		iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
 BEGINqueryEtryPt
 CODESTARTqueryEtryPt
 CODEqueryEtryPt_STD_IMOD_QUERIES
 CODEqueryEtryPt_STD_CONF2_QUERIES
 CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
 ENDqueryEtryPt
 
 
-
-
 BEGINmodInit()
 CODESTARTmodInit
 	*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
-- 
1.7.10.4

>From 1fab15333900a12c29750211290286ba6a5f319d Mon Sep 17 00:00:00 2001
From: Tomas Heinrich <[email protected]>
Date: Fri, 24 May 2013 04:14:35 +0200
Subject: [PATCH 2/2] bugfix: imjournal should respect termination request

sd_journal_wait() ignores EINTR and this behavior interferes with
module's termination signaling. Therefore, poll() is used instead.
---
 plugins/imjournal/imjournal.c |   78 +++++++++++++++++++++++++++++++++++------
 1 file changed, 67 insertions(+), 11 deletions(-)

diff --git a/plugins/imjournal/imjournal.c b/plugins/imjournal/imjournal.c
index aad7194..ae29154 100755
--- a/plugins/imjournal/imjournal.c
+++ b/plugins/imjournal/imjournal.c
@@ -30,6 +30,7 @@
 #include <ctype.h>
 #include <stdlib.h>
 #include <time.h>
+#include <sys/poll.h>
 #include <sys/socket.h>
 #include <errno.h>
 
@@ -164,14 +165,6 @@ readjournal() {
 	int priority = 0;
 	int facility = 0;
 
-	/* Get next journal message, if there is none, wait for next */
-	while (sd_journal_next(j) == 0) {
-		if (sd_journal_wait(j, (uint64_t) -1) < 0) {
-			iRet = RS_RET_ERR;
-			goto ret;
-		}
-	}
-
 	/* Get message text */
 	if (sd_journal_get_data(j, "MESSAGE", &get, &length) < 0) {
 		logmsgInternal(NO_ERRCODE, LOG_SYSLOG|LOG_INFO, (uchar *)"log message from journal doesn't have MESSAGE", 0);
@@ -388,14 +381,58 @@ persistJournalState () {
 }
 
 
+/* Polls the journal for new messages. Similar to sd_journal_wait()
+ * except for the special handling of EINTR.
+ */
+static rsRetVal
+pollJournal()
+{
+	DEFiRet;
+	struct pollfd pollfd;
+	int r;
+
+	pollfd.fd = sd_journal_get_fd(j);
+	pollfd.events = sd_journal_get_events(j);
+	r = poll(&pollfd, 1, -1);
+	if (r == -1) {
+		if (errno == EINTR) {
+			/* EINTR is also received during termination
+			 * so return now to check the term state.
+			 */
+			ABORT_FINALIZE(RS_RET_OK);
+		} else {
+			char errStr[256];
+
+			rs_strerror_r(errno, errStr, sizeof(errStr));
+			errmsg.LogError(0, RS_RET_ERR,
+				"poll() failed: '%s'", errStr);
+			ABORT_FINALIZE(RS_RET_ERR);
+		}
+	}
+
+	assert(r == 1);
+
+	r = sd_journal_process(j);
+	if (r < 0) {
+		char errStr[256];
+
+		rs_strerror_r(errno, errStr, sizeof(errStr));
+		errmsg.LogError(0, RS_RET_ERR,
+			"sd_journal_process() failed: '%s'", errStr);
+		ABORT_FINALIZE(RS_RET_ERR);
+	}
+
+finalize_it:
+	RETiRet;
+}
+
+
 BEGINrunInput
 CODESTARTrunInput
 	/* this is an endless loop - it is terminated when the thread is
 	 * signalled to do so. This, however, is handled by the framework,
 	 * right into the sleep below.
 	 */
-	int count = 0;
-
 	if (cs.stateFile[0] != '/') {
 		char *new_stateFile;
 
@@ -436,14 +473,32 @@ CODESTARTrunInput
 	}
 
 	while (glbl.GetGlobalInputTermState() == 0) {
+		int count = 0, r;
+
+		r = sd_journal_next(j);
+		if (r < 0) {
+			char errStr[256];
+
+			rs_strerror_r(errno, errStr, sizeof(errStr));
+			errmsg.LogError(0, RS_RET_ERR,
+				"sd_journal_next() failed: '%s'", errStr);
+			ABORT_FINALIZE(RS_RET_ERR);
+		}
+
+		if (r == 0) {
+			/* No new messages, wait for activity. */
+			CHKiRet(pollJournal());
+			continue;
+		}
+
 		CHKiRet(readjournal());
+		/* TODO: This could use some finer metric. */
 		count++;
 		if (count == cs.iPersistStateInterval) {
 			count = 0;
 			persistJournalState();
 		}
 	}
-	persistJournalState();
 
 finalize_it:
 ENDrunInput
@@ -490,6 +545,7 @@ ENDwillRun
 /* close journal */
 BEGINafterRun
 CODESTARTafterRun
+	persistJournalState();
 	sd_journal_close(j);
 ENDafterRun
 
-- 
1.7.10.4

_______________________________________________
rsyslog mailing list
http://lists.adiscon.net/mailman/listinfo/rsyslog
http://www.rsyslog.com/professional-services/
What's up with rsyslog? Follow https://twitter.com/rgerhards
NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad of 
sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you DON'T LIKE 
THAT.

Reply via email to