;; Oups I managed to send the message by accident without finishing it. ;; The server part is similar and I will drop the actual pipeline and highlight the differenece ;; which is that we will get a message stream it to an scheme object call server-.lambda below ;; with it and finally from that lambdas return value reply to the client we will create a loop where ;; the server waits for questions
(define* (make-server server-lambda address ip-server? #key (context #f)) (define schemer (fpipe-schemer ch1 ch2)) (spawn-fiber (lambda () (let lp () (schemer %fpipe-eof%) (lp))))) Finally the idea is to use it as (define context (make-zmq-context)) (define address "") ;; ZeroMQ address (define ip-server? #t) ;; if we will use bind or connect (make-server (lambda (x) (cons '() x)) context address ip-server?) (define client (make-client context address ip-server?)) ;; send a mesage that is not compressed > (client "abc") "abc" > (length (client (iota 1000000) #:compress? #t)) 1000000 in a run fibers context On Wed, Aug 25, 2021 at 12:25 AM Stefan Israelsson Tampe < stefan.ita...@gmail.com> wrote: > I have now made a fiber enables stream library called fpipe in my > stis-engine repository see > > https://gitlab.com/tampe/stis-engine/-/tree/master/ > > The idea is to focus on a high performance byte streaming library on top > of wingo's fibers library and make heavy use of bytevector buffers. We will > also gp between a stream of scheme > values and these byte streams to seamlessly be able to integrate a good > overview of the data pipeline. > > The following code uses a c-based serializer and deserializer of scheme > data structures > and allow for optional streamed compression and decompression and > transport it over ZeroMQ > networking which allow for thread/process/computer movement of data. The > end result is a way to create servers and clients. > > It is instructive to show the code for the client and server pipelines is > constructed tp show off the fpipes library. This is not the final design > but moste components are done > > Here is the client > > (define* (make-client address ip-server? #key (context #f)) > ;; First we setup the zmq networking > (define ctx (if context context (make-zmq-context))) > (define socket (zmq-socket context ZMQ_REQ)) > > (if ip-server? > (zmq-bind socket address) > (zmq-connect socket address)) > > ;; we will define to fiber channels, channel in = ch1 and channel out = > ch2 > (define-values (ch1 ch2) > > ;; fpipe-construct is the general pipelining macro > (fpipe-construct > > ;; this is a scheme condition that will match check a message > bounded to it > (cond > (#:scm it) > > ;; format of the matcher is (predicate . translatot) where if > predicate is true we will > ;; push the message to the branching pipline this assumes a message > is the form > ;; ((list-of-features) . message) > (((memq 'compress (car it)) #:tr (cdr it)) > > ;; the c-based stremed serializer that integrates nicely with > fibers and streams > ;; the message transport is the form scm->bytesteam > > (mk-c-atom->fpipe) > > ;; the zlib compressor node will tarnsport as bytestream->bytestream > compress-from-fpipe-to-fpipe > > ;; a bytestream->bytestream that will prepend a message with 1 to > indicate that the stream > ;; has been compressed > #:prepend #u8(1)) > > ;; if we do not have the compress feature then we will simply > generate the stream and > ;; prepend a one e.g. not doing any compression > (else > (mk-c-atom->fpipe) > #:prepend #u8(0))) > > ;; transport the message byetstream over the zmq socket this will > retrun in a scheme > ;; stream where eof will survive as all control messages are and > will initiate the next > ;; reading from the socket (when the request message has been fully > sent. > (fpipe->zmq socket) > > ;; so here we get the return message > (zmq->fpipe socket) > > ;; This is a bytestream cond and has no it part, > (cond > ;; We try to match the beginning of the bytestream message and if > it starts with 1 > ;; then we know that the reply message has been compressed > ((#:match u8(1) #:skip 1) > decompress-from-pipe-to-pipe) > > ;; else no compression. > ((else #:skip 1) > )) > > ;; the final step is to take the bytestream and make a scheme object > and put that > ;; to the scheme stream and the pipe is finished > (mk-fpipe->c-atom))) > > ;; fpipe-scheme takes a piplend from scm to scm and creates a function > of it. > ;; each time the function is called with a scheme object we will send > it ot the server > ;; from the return message create a scheme object that is returned from > the funciton > (define action (fpipe-schemer ch1 ch2)) > > ;; A little nicer interface and we are finished > (lambda* (message #:key (compress? #f)) > (action (cons (if compress? '(compress) '()) message)))) > > > ;; SERVER > (define* (make-server server-lambda address ip-server? #key (context #f)) > > (define schemer (fpipe-schemer ch1 ch2)) > > (spawn-fiber > (lambda () > (let lp () > (schemer %fpipe-eof%) > (lp))))) > >