branch: main
commit 52abfc170ed705c845d5a42b4ba9f0488383a190
Author: Romain GARBAGE <romain.garb...@inria.fr>
AuthorDate: Mon Feb 24 15:38:08 2025 +0100

    base: Add event-log agent.
    
    * src/cuirass/base.scm (event-log-service, spawn-event-log-service): New 
variables.
    * tests/base.scm: Add tests for the event-log-service agent.
    
    Signed-off-by: Ludovic Courtès <l...@gnu.org>
---
 src/cuirass/base.scm | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++-
 tests/base.scm       | 46 ++++++++++++++++++++++++++++++++++
 2 files changed, 116 insertions(+), 1 deletion(-)

diff --git a/src/cuirass/base.scm b/src/cuirass/base.scm
index 631e81f..fa3481f 100644
--- a/src/cuirass/base.scm
+++ b/src/cuirass/base.scm
@@ -73,6 +73,7 @@
             spawn-remote-builder
             spawn-channel-update-service
             spawn-jobset-evaluator
+            spawn-event-log-service
             spawn-jobset-registry
             spawn-gc-root-cleaner
             spawn-build-maintainer
@@ -113,7 +114,11 @@
 ;;;    'guix-daemon' process, while the remote build delegates builds to
 ;;;    'cuirass remote-server'.
 ;;;
-;;;  - Each jobset as an associated "monitor"; it requests channel updates,
+;;;  - The "event-log" agent receives events from the different agents. It
+;;;    also dispatches these events to the agents which subscribed to event
+;;;    notification.
+;;;
+;;;  - Each jobset has an associated "monitor"; it requests channel updates,
 ;;;    evaluations, and builds to the actors above.  It also receives requests
 ;;;    such as evaluation triggers that can come, for example, from the
 ;;;    /jobset/NAME/hook/evaluate HTTP endpoint.
@@ -770,6 +775,70 @@ concurrently; it sends derivation build requests to 
BUILDER."
                                    max-parallel-evaluations))
     channel))
 
+;;;
+;;; Logging events
+;;;
+
+(define event-log-buffer-size
+  (make-parameter 1000))
+
+(define (event-log-service channel)
+  "Keep events received on CHANNEL in a circular buffer, keeping track of
+notification subscriptions."
+  (lambda ()
+    (define events (ring-buffer (event-log-buffer-size)))
+
+    (let loop ((events events)
+               (subscribers '()))
+      (match (get-message channel)
+        (`(subscribe ,channel)
+         (loop events (cons channel subscribers)))
+        (`(unsubscribe ,channel)
+         (loop events (delq channel subscribers)))
+        (`(new-event ,event)
+         ;; Events are stored as a list in the format
+         ;; (event-type timestamp rest-of-data)
+         (let* ((data (match event
+                        ((event-type . rest)
+                         (append `(,event-type ,(current-time time-utc))
+                                 rest))))
+                (events (ring-buffer-insert data events)))
+           (match event
+             ;; This is what is received from the builders. This code aims to
+             ;; rebuild the original build object related to the derivation
+             ;; (when it exists), so it can be used by other agents.
+             (`(derivation-built ,derivation ,status)
+              (spawn-fiber
+               (lambda ()
+                 (let ((build (db-get-build derivation)))
+                   (when build
+                     (put-message channel
+                                  `(new-event (build-status-changed 
,build))))))))
+             (_ #t))
+           ;; For now, every new event is sent to all subscribers.
+           (for-each (lambda (reply)
+                       (put-message reply data))
+                     subscribers)
+           (loop events subscribers)))
+        (`(recent-events ,reply)
+         (put-message reply (ring-buffer->list events))
+         (loop events subscribers))
+        ;; Catchall for malformed messages.
+        (message
+         (log-error "malformed message sent to the event-log-service: ~s"
+                    message)
+         (loop events subscribers))))))
+
+(define (spawn-event-log-service)
+  "Spawn an actor responsible for centralizing events."
+  (let ((channel (make-channel)))
+    (spawn-fiber (event-log-service channel))
+    channel))
+
+;;;
+;;; Monitoring jobsets
+;;;
+
 (define %jobset-trigger-rate-window
   ;; Window (seconds) over which the jobset trigger rate is computed.
   (* 5 60))                                       ;5 minutes
diff --git a/tests/base.scm b/tests/base.scm
index 1cfbecb..e3709ee 100644
--- a/tests/base.scm
+++ b/tests/base.scm
@@ -17,6 +17,10 @@
 ;;; along with Cuirass.  If not, see <http://www.gnu.org/licenses/>.
 
 (use-modules (cuirass base)
+             (fibers)
+             (fibers channels)
+             (ice-9 match)
+             (srfi srfi-1)
              (srfi srfi-64))
 
 (test-begin "base")
@@ -25,4 +29,46 @@
   'wrong-type-arg
   (%package-cachedir #f))
 
+(test-equal "event-log recent-events"
+  ;; Elements are in reverse order since they come from a ring-buffer.
+  '((test-event "Third message")
+    (test-event "Second message")
+    (test-event "First message"))
+  (run-fibers
+   (lambda ()
+     (let ((event-log (spawn-event-log-service))
+           (reply-channel (make-channel)))
+       (put-message event-log '(new-event (test-event "First message")))
+       (put-message event-log '(new-event (test-event "Second message")))
+       (put-message event-log '(new-event (test-event "Third message")))
+       (put-message event-log `(recent-events ,reply-channel))
+       (filter-map (match-lambda
+                     ((type timestamp value)
+                      ;; Drop timestamp.
+                      (list type value))
+                     (_ #f))
+                   (get-message reply-channel))))))
+
+(test-equal "event-log recent-events (empty buffer)"
+  '()
+  (run-fibers
+   (lambda ()
+     (let ((event-log (spawn-event-log-service))
+           (reply-channel (make-channel)))
+       (put-message event-log `(recent-events ,reply-channel))
+       (get-message reply-channel)))))
+
+(test-equal "event-log subscribe"
+  '(test-event "test-value")
+  (run-fibers
+   (lambda ()
+     (let ((event-log (spawn-event-log-service))
+           (reply (make-channel)))
+       (put-message event-log `(subscribe ,reply))
+       (put-message event-log '(new-event (test-event "test-value")))
+       (match (get-message reply)
+         ((type timestamp value)
+          ;; Drop timestamp.
+          (list type value)))))))
+
 (test-end)

Reply via email to