On Tue, Nov 10, 2009 at 07:41:41AM -0800, pmf wrote:

>> This thing could easily create a lazy sequence, in fact, the code
>> would look a lot like the code for seque, with just a separation of
>> the writer from the reader.  I'll have to think about it to make sure
>> that it can be used safely.
>
>You might want to look at the (recently added) fill-queue (in
>clojure.contrib.seq-utils), which provides a lazy seq that is filled
>by another thread and blocks if readers consume faster than the queue
>is filled; maybe your problem fits into this mechanism.

Well, almost.  Except that it would create a future that I really
don't have any use for.  Last night, I posted very similar code to do
the same kind of thing with an agent (I have since fixed the exception
handling in the agent so it terminates the queue so the reader will
get an exception rather than just hang).

With fill-queue, send-queued, and seque all looking nearly identical,
I wonder if we're missing how this should be abstracted.

I did learn, though from reading it that seque stops short if the
computation runs ahead enough to fill up the queue.  I'm not quite
sure what this would be useful for.

(import '(java.util.concurrent BlockingQueue LinkedBlockingQueue))
(defn send-queued
   "Dispatch blocking action to agent.  The state of the agent will be
   set to the value of:
   (apply action-fn state-of-agent enqueue args)
   The agent should call enqueue for each item to return to the caller
   of send-queued.  send-queued returns a lazy sequence of the items the
   agent passes to enqueue (in order).  The agent may enqueue 'n' items
   before blocking on its call to enqueue."
   [a n f & args]
   (let [#^BlockingQueue q (LinkedBlockingQueue. (int n))
         NIL (Object.) ;nil sentinel since LBQ doesn't support nils
         enqueue (fn [x]
                   (.put q (if (nil? x) NIL x)))
         action (fn [state1]
                  (try
                    (let [state2 (apply f state1 enqueue args)]
                      (.put q q) ; q itself is eos sentinel
                      state2)
                    (catch Exception e
                      (.put q q)
                      (throw e))))
         drain (fn drain []
                 (lazy-seq
                   (let [x (.take q)]
                     (if (identical? x q) ;q itself is eos sentinel
                       (do @a nil)  ;touch agent just to propagate errors
                       (cons (if (identical? x NIL) nil x) (drain))))))]
     (send-off a action)
     (drain)))

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en

Reply via email to