Good evening all, I have wondered for some time why pmap did not have a transducer form analogous to map's. Today I took a shot at writing such a transducer form. The result seems to work correctly and the implementation behaves as I would expect a pmap transducer to behave, so I thought I would ask here what factors I might not have considered. The exercise seemed somewhat easy, and that usually means I've overlooked something.
My second question: while I was examining the original source, I noticed that pmap does not actually behave how I always believed it did: I thought that it performed its parallel computation in chunks of (+ 2 availableProcessors). Upon closer reading it seems that rather it simply makes sure that the futures in flight stay *at least* (+ 2 availableProcessors) items ahead of consumption, and that in fact the futures are realized in chunks of 32 due to the implementation details of map. Was this intended? In my limited benchmarks using criterium, it seemed that this did create (admittedly quite minor) unnecessary contention. How might this implementation be improved, or what flaws do you think it may have? (defn pmap ([f] (let [n (+ 2 (.. Runtime getRuntime availableProcessors)) xs (volatile! [])] (fn [rf] (let [handle (fn [result fut] (let [snap (vswap! xs conj fut)] (if (= (count snap) n) (do (vreset! xs []) (transduce (map deref) rf result snap)) (rf result))))] (fn ([] (rf)) ([result] (if (empty? @xs) (rf result) (transduce (map deref) rf result @xs))) ([result input] (handle result (future (f input)))) ([result input & inputs] (handle result (future (apply f input inputs))))))))) ([f coll] (sequence (pmap f) coll)) ([f coll & colls] (apply sequence (pmap f) coll colls))) This can easily be modified to behave more similarly to the original pmap so as to stay always *n* items ahead of consumption: (defn pmap ([f] (let [n (+ 2 (.. Runtime getRuntime availableProcessors)) xs (volatile! clojure.lang.PersistentQueue/EMPTY)] (fn [rf] (let [handle (fn [result fut] (let [snap (vswap! xs conj fut)] (if (> (count snap) n) (do (vswap! xs pop) (rf result (-> snap peek deref))) (rf result))))] (fn ([] (rf)) ([result] (if (empty? @xs) (rf result) (transduce (map deref) rf result @xs))) ([result input] (handle result (future (f input)))) ([result input & inputs] (handle result (future (apply f input inputs))))))))) ([f coll] (sequence (pmap f) coll)) ([f coll & colls] (apply sequence (pmap f) coll colls))) ~Timothy Dean -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.