Good evening all,

I have wondered for some time why pmap did not have a transducer form
analogous to map's. Today I took a shot at writing such a transducer form.
The result seems to work correctly and the implementation behaves as I
would expect a pmap transducer to behave, so I thought I would ask here
what factors I might not have considered. The exercise seemed somewhat
easy, and that usually means I've overlooked something.

My second question: while I was examining the original source, I noticed
that pmap does not actually behave how I always believed it did: I thought
that it performed its parallel computation in chunks of (+ 2
availableProcessors). Upon closer reading it seems that rather it simply
makes sure that the futures in flight stay *at least* (+ 2
availableProcessors) items ahead of consumption, and that in fact the
futures are realized in chunks of 32 due to the implementation details of
map. Was this intended? In my limited benchmarks using criterium, it seemed
that this did create (admittedly quite minor) unnecessary contention.

How might this implementation be improved, or what flaws do you think it
may have?

(defn pmap
  ([f]
   (let [n (+ 2 (.. Runtime getRuntime availableProcessors))
         xs (volatile! [])]
     (fn [rf]
       (let [handle (fn [result fut]
                      (let [snap (vswap! xs conj fut)]
                        (if (= (count snap) n)
                          (do (vreset! xs [])
                              (transduce (map deref) rf result snap))
                          (rf result))))]
         (fn ([] (rf))
           ([result]
            (if (empty? @xs)
              (rf result)
              (transduce (map deref) rf result @xs)))
           ([result input] (handle result (future (f input))))
           ([result input & inputs] (handle result (future (apply f input
inputs)))))))))
  ([f coll] (sequence (pmap f) coll))
  ([f coll & colls] (apply sequence (pmap f) coll colls)))

This can easily be modified to behave more similarly to the original pmap
so as to stay always *n* items ahead of consumption:

(defn pmap
  ([f]
   (let [n (+ 2 (.. Runtime getRuntime availableProcessors))
         xs (volatile! clojure.lang.PersistentQueue/EMPTY)]
     (fn [rf]
       (let [handle (fn [result fut]
                      (let [snap (vswap! xs conj fut)]
                        (if (> (count snap) n)
                          (do (vswap! xs pop)
                              (rf result (-> snap peek deref)))
                          (rf result))))]
         (fn ([] (rf))
           ([result]
            (if (empty? @xs)
              (rf result)
              (transduce (map deref) rf result @xs)))
           ([result input] (handle result (future (f input))))
           ([result input & inputs] (handle result (future (apply f input
inputs)))))))))
  ([f coll] (sequence (pmap f) coll))
  ([f coll & colls] (apply sequence (pmap f) coll colls)))

~Timothy Dean

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to