branch: main commit 52abfc170ed705c845d5a42b4ba9f0488383a190 Author: Romain GARBAGE <romain.garb...@inria.fr> AuthorDate: Mon Feb 24 15:38:08 2025 +0100
base: Add event-log agent. * src/cuirass/base.scm (event-log-service, spawn-event-log-service): New variables. * tests/base.scm: Add tests for the event-log-service agent. Signed-off-by: Ludovic Courtès <l...@gnu.org> --- src/cuirass/base.scm | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++- tests/base.scm | 46 ++++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 1 deletion(-) diff --git a/src/cuirass/base.scm b/src/cuirass/base.scm index 631e81f..fa3481f 100644 --- a/src/cuirass/base.scm +++ b/src/cuirass/base.scm @@ -73,6 +73,7 @@ spawn-remote-builder spawn-channel-update-service spawn-jobset-evaluator + spawn-event-log-service spawn-jobset-registry spawn-gc-root-cleaner spawn-build-maintainer @@ -113,7 +114,11 @@ ;;; 'guix-daemon' process, while the remote build delegates builds to ;;; 'cuirass remote-server'. ;;; -;;; - Each jobset as an associated "monitor"; it requests channel updates, +;;; - The "event-log" agent receives events from the different agents. It +;;; also dispatches these events to the agents which subscribed to event +;;; notification. +;;; +;;; - Each jobset has an associated "monitor"; it requests channel updates, ;;; evaluations, and builds to the actors above. It also receives requests ;;; such as evaluation triggers that can come, for example, from the ;;; /jobset/NAME/hook/evaluate HTTP endpoint. @@ -770,6 +775,70 @@ concurrently; it sends derivation build requests to BUILDER." max-parallel-evaluations)) channel)) +;;; +;;; Logging events +;;; + +(define event-log-buffer-size + (make-parameter 1000)) + +(define (event-log-service channel) + "Keep events received on CHANNEL in a circular buffer, keeping track of +notification subscriptions." + (lambda () + (define events (ring-buffer (event-log-buffer-size))) + + (let loop ((events events) + (subscribers '())) + (match (get-message channel) + (`(subscribe ,channel) + (loop events (cons channel subscribers))) + (`(unsubscribe ,channel) + (loop events (delq channel subscribers))) + (`(new-event ,event) + ;; Events are stored as a list in the format + ;; (event-type timestamp rest-of-data) + (let* ((data (match event + ((event-type . rest) + (append `(,event-type ,(current-time time-utc)) + rest)))) + (events (ring-buffer-insert data events))) + (match event + ;; This is what is received from the builders. This code aims to + ;; rebuild the original build object related to the derivation + ;; (when it exists), so it can be used by other agents. + (`(derivation-built ,derivation ,status) + (spawn-fiber + (lambda () + (let ((build (db-get-build derivation))) + (when build + (put-message channel + `(new-event (build-status-changed ,build)))))))) + (_ #t)) + ;; For now, every new event is sent to all subscribers. + (for-each (lambda (reply) + (put-message reply data)) + subscribers) + (loop events subscribers))) + (`(recent-events ,reply) + (put-message reply (ring-buffer->list events)) + (loop events subscribers)) + ;; Catchall for malformed messages. + (message + (log-error "malformed message sent to the event-log-service: ~s" + message) + (loop events subscribers)))))) + +(define (spawn-event-log-service) + "Spawn an actor responsible for centralizing events." + (let ((channel (make-channel))) + (spawn-fiber (event-log-service channel)) + channel)) + +;;; +;;; Monitoring jobsets +;;; + (define %jobset-trigger-rate-window ;; Window (seconds) over which the jobset trigger rate is computed. (* 5 60)) ;5 minutes diff --git a/tests/base.scm b/tests/base.scm index 1cfbecb..e3709ee 100644 --- a/tests/base.scm +++ b/tests/base.scm @@ -17,6 +17,10 @@ ;;; along with Cuirass. If not, see <http://www.gnu.org/licenses/>. (use-modules (cuirass base) + (fibers) + (fibers channels) + (ice-9 match) + (srfi srfi-1) (srfi srfi-64)) (test-begin "base") @@ -25,4 +29,46 @@ 'wrong-type-arg (%package-cachedir #f)) +(test-equal "event-log recent-events" + ;; Elements are in reverse order since they come from a ring-buffer. + '((test-event "Third message") + (test-event "Second message") + (test-event "First message")) + (run-fibers + (lambda () + (let ((event-log (spawn-event-log-service)) + (reply-channel (make-channel))) + (put-message event-log '(new-event (test-event "First message"))) + (put-message event-log '(new-event (test-event "Second message"))) + (put-message event-log '(new-event (test-event "Third message"))) + (put-message event-log `(recent-events ,reply-channel)) + (filter-map (match-lambda + ((type timestamp value) + ;; Drop timestamp. + (list type value)) + (_ #f)) + (get-message reply-channel)))))) + +(test-equal "event-log recent-events (empty buffer)" + '() + (run-fibers + (lambda () + (let ((event-log (spawn-event-log-service)) + (reply-channel (make-channel))) + (put-message event-log `(recent-events ,reply-channel)) + (get-message reply-channel))))) + +(test-equal "event-log subscribe" + '(test-event "test-value") + (run-fibers + (lambda () + (let ((event-log (spawn-event-log-service)) + (reply (make-channel))) + (put-message event-log `(subscribe ,reply)) + (put-message event-log '(new-event (test-event "test-value"))) + (match (get-message reply) + ((type timestamp value) + ;; Drop timestamp. + (list type value))))))) + (test-end)