Rob - I’d go with Gary's approach, which essentially moves the splitting up of the chunk of results from the core.async channel’s transducer to the producing function. You can do that using a channel with a fixed buffer of 50 and >!!. As long as the next db query is blocked until each of the results from the previous query are put onto the channel it’ll work as you want.
I think the behavior in our examples differ because the blocking puts will complete whenever there is a take and the buffer is not full, ignoring whether the transducer is still outputting values. This bug may be relevant, though there it arises in a less common scenario (fixed buffer of size 0, which is now disallowed) https://dev.clojure.org/jira/browse/ASYNC-140 <https://dev.clojure.org/jira/browse/ASYNC-140> > On Jan 5, 2018, at 8:01 PM, Gary Verhaegen <gary.verhae...@gmail.com> wrote: > > On 5 January 2018 at 19:44, Rob Nikander <rob.nikan...@gmail.com > <mailto:rob.nikan...@gmail.com>> wrote: > Hi, > > I’m wondering if there is a core.async design idiom for this situation... > > - A buffered channel > - One producer feeding it > - A bunch of consumers pulling from it. > - Producer should wake up and fill the channel only when it’s empty. In other > words, the producer should work in chunks. > > My first idea is to have two channels. The second will be used by consumers > to signal the producer that the primary channel is empty. But I'm wondering > if there is a better way. > > The motive for this is that the producer is doing a DB query that is more > efficient in bulk. `select ... limit 50` rather than `select ... limit 1` 50 > times. > > Rob > > What about simply having the producer put items one by one on the channel? > > (ns t.core > (:require [clojure.core.async :as async])) > > (defn ap > "atomic print" > [m] > (print (str (pr-str m) "\n"))) > > (defn produce-next-batch > [s] > (let [m (+ s 10)] > [(range s m) m])) > > (defn chunked-producer > [init-state] > (let [result-chan (async/chan)] > (async/go > (loop [[batch cursor] (produce-next-batch init-state)] > (ap {:produced batch}) > (doseq [elem batch] > (async/>! result-chan elem)) > (recur (produce-next-batch cursor)))) > result-chan)) > > (defn run-consumer > [ch id n] > (async/go > (dotimes [_ n] > (ap {:id id :received (async/<! ch)}) > (async/<! (async/timeout 10))))) > > (defn run > [] > (let [c (chunked-producer 0) > c1 (run-consumer c 1 10) > c2 (run-consumer c 2 10) > c3 (run-consumer c 3 10)] > (->> [c1 c2 c3] > (mapv async/<!!)) > (flush))) > Here produce-next-batch has been deliberately written to evoke the idea that > you have some sort of state or cursor that lets you produce the next batch. > Real code would obviously need to account for exceptions, handle channel > closing, etc., but hopefully this illustrate the idea. > > > > -- > You received this message because you are subscribed to the Google > Groups "Clojure" group. > To post to this group, send email to clojure@googlegroups.com > Note that posts from new members are moderated - please be patient with your > first post. > To unsubscribe from this group, send email to > clojure+unsubscr...@googlegroups.com > For more options, visit this group at > http://groups.google.com/group/clojure?hl=en > <http://groups.google.com/group/clojure?hl=en> > --- > You received this message because you are subscribed to the Google Groups > "Clojure" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to clojure+unsubscr...@googlegroups.com > <mailto:clojure+unsubscr...@googlegroups.com>. > For more options, visit https://groups.google.com/d/optout > <https://groups.google.com/d/optout>. -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.