Yeah Dan Thomas's code is an awesome example for anyone looking to solve a similar solution with variable load. In my case this is a predictable batch process and I know the load exactly :-) I ended up implementing the futures approach for the reasons you listed, as it was our only use of core.async and now it's gone from our dependencies :-)
On Thursday, September 18, 2014 6:11:24 PM UTC+10, Dan Kersten wrote: > > I personally think Thomas' is best if load may vary as it is more > predictable and straightforward to understand. If we're talking about line > code, here's a shortened version that I don't feel sacrifices readability > (typed on a phone so please excuse typos...): > > (let [exec (Executors newFixedThreadPool 4) > results (->> widgets > (mapv (fn [it] (.submit exec > #(long-running-widget-processor it)))) > (mapv #(.get %)))] > (.shutdown exec) > results) > > I think the pure futures version kinda like what Gary suggested is best > though because it's straightforward, to the point, idiomatic and you > eliminate the need for core.async. > > Having said that, if you are already using core.async elsewhere in your > project then I don't see anything particularly wrong with your way. > On 18 Sep 2014 06:41, "Beau Fabry" <imf...@gmail.com <javascript:>> > wrote: > >> Larry your solution includes the cognitive overhead of another entire >> library and process model. "future" is part of core, and as I realised when >> Gary posted the doall's were unnecessary anyway. >> >> On Thursday, September 18, 2014 3:26:36 PM UTC+10, larry google groups >> wrote: >>> >>> >>> > Thanks for that Larry but I think this is a bit of overkill for my >>> scenario. >>> >>> If I'm counting correctly, your original example has 10 lines of code, >>> and my example has 11 lines of code (minus the try/catch and the closure >>> and the namespace declaration). So these 2 solutions are the same length. >>> These are the 11 lines of code: >>> >>> (def ^:private persistence-channel (lamina/channel)) >>> >>> (defn persist-this-item [context-wrapper-for-database-call] >>> (lamina/enqueue persistence-channel >>> (fn [] (persistence/make-consistent >>> context-wrapper-for-database-call)))) >>> >>> (defn worker [] >>> (loop [closure-with-item-inside @(lamina/read-channel >>> persistence-channel)] >>> (closure-with-item-inside) >>> (recur @(lamina/read-channel persistence-channel)))) >>> >>> (defn start-workers [] >>> (dotimes [_ 6] >>> (future (worker)))) >>> >>> Maybe its just that I'm familiar with the code, but these 11 lines of >>> code seem cleaner to me than your 10 lines of code, at least in part >>> because you are doing stuff like calling (doall) which strikes me as a bit >>> suspicious. >>> >>> Do you really feel the 1 extra line of code is overkill? This solution >>> seems to do what you want, and it's the same length as your solution. >>> >>> >>> >>> >>> >>> On Wednesday, September 17, 2014 8:48:09 PM UTC-4, Beau Fabry wrote: >>>> >>>> Thanks for that Larry but I think this is a bit of overkill for my >>>> scenario. The code I pasted is almost verbatim what we have in our >>>> production codebase, so the ability to queue new jobs etc is really not >>>> needed. Cheers though. >>>> >>>> On Thursday, September 18, 2014 9:38:47 AM UTC+10, larry google groups >>>> wrote: >>>>> >>>>> >>>>> > We don't have streams of data here, the long running tasks have >>>>> side-effects. I would >>>>> > prefer to avoid adding another whole framework just to run a few >>>>> long running jobs in p//. >>>>> >>>>> >>>>> I guess I should show you some code, so you can see how simple this >>>>> is. I'll copy-and-paste some code that I use. >>>>> >>>>> One simple way I use Lamina is to save stuff to a database. I don't >>>>> want the "save" action happening in my main thread, so I put the data in >>>>> a >>>>> channel, and I let some workers pull that data off the channel and put it >>>>> in the database. So what follows is the whole file, this about 30 lines >>>>> of >>>>> code, including some try/catch stuff that you probably don't need: >>>>> >>>>> (ns loupi.persistence-queue >>>>> (:require >>>>> [loupi.persistence :as persistence] >>>>> [slingshot.slingshot :as ss] >>>>> [lamina.core :as lamina])) >>>>> >>>>> (def ^:private persistence-channel (lamina/channel)) >>>>> >>>>> (defn persist-this-item [context-wrapper-for-database-call] >>>>> (lamina/enqueue persistence-channel >>>>> (fn [] >>>>> (ss/try+ >>>>> (persistence/make-consistent >>>>> context-wrapper-for-database-call) >>>>> (catch Object o (ss/throw+ {:type >>>>> :loupi.supervisor/problem >>>>> :message "Error in >>>>> persistence-queue/persist-this-itme." >>>>> :data o})))))) >>>>> >>>>> (defn worker [] >>>>> (loop [closure-with-item-inside @(lamina/read-channel >>>>> persistence-channel)] >>>>> (ss/try+ >>>>> (closure-with-item-inside) >>>>> (catch Object o (ss/throw+ {:type :loupi.supervisor/problem >>>>> :message "Error in >>>>> persistence-queue/worker." >>>>> :closure closure-with-item-inside >>>>> :data o}))) >>>>> (recur @(lamina/read-channel persistence-channel)))) >>>>> >>>>> (defn start-workers [] >>>>> (dotimes [_ 6] >>>>> (println "Starting up the persist queue workers.") >>>>> (future (worker)))) >>>>> >>>>> >>>>> >>>>> I call (start-workers) when the app starts. When I save something to >>>>> the database, I call (persist-this-item) and I put a closure on the >>>>> channel. The workers eventually grab that closure and execute it. >>>>> >>>>> Clearly, that closure can do whatever you like. To borrow from your >>>>> original example, that closure is where you would put: >>>>> >>>>> (long-running-widget-processor widget) >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Tuesday, September 16, 2014 10:00:07 PM UTC-4, Beau Fabry wrote: >>>>>> >>>>>> We don't have streams of data here, the long running tasks have >>>>>> side-effects. I would prefer to avoid adding another whole framework >>>>>> just >>>>>> to run a few long running jobs in p//. >>>>>> >>>>>> I have a list of jobs to do, I'm partitioning that list up into 4 sub >>>>>> lists to be worked through by 4 p// workers, I then want to block and >>>>>> wait >>>>>> until all 4 workers have finished their tasks. >>>>>> >>>>>> On Wednesday, September 17, 2014 3:27:07 AM UTC+10, larry google >>>>>> groups wrote: >>>>>>> >>>>>>> >>>>>>> This does not look correct to me. Perhaps someone else has more >>>>>>> insight into this. I am suspicious about 2 things: >>>>>>> >>>>>>> 1.) your use of doall >>>>>>> >>>>>>> 2.) your use of (thread) >>>>>>> >>>>>>> It looks to me like you are trying to hack together a kind of >>>>>>> pipeline or channel. Clojure has a wealth of libraries that can handle >>>>>>> that >>>>>>> for you. The main thing you are trying to do is this: >>>>>>> >>>>>>> (long-running-widget-processor widget)) >>>>>>> >>>>>>> >>>>>>> You go to some trouble to set up workers, all to ensure that >>>>>>> long-running-widget-processor is handled in its own thread. >>>>>>> >>>>>>> I would suggest you look at Lamina: >>>>>>> >>>>>>> https://github.com/ztellman/lamina >>>>>>> >>>>>>> In particular, look at pipelines: >>>>>>> >>>>>>> https://github.com/ztellman/lamina/wiki/Pipelines >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Friday, September 5, 2014 1:46:02 AM UTC-4, Beau Fabry wrote: >>>>>>>> >>>>>>>> Is the kinda ugly constant (doall usage a sign that I'm doing >>>>>>>> something silly? >>>>>>>> >>>>>>>> (let [num-workers 4 >>>>>>>> widgets-per-worker (inc (int (/ (count widgets) num-workers))) >>>>>>>> bucketed-widgets (partition-all widgets-per-worker widgets) >>>>>>>> workers (doall (map (fn [widgets] >>>>>>>> (thread >>>>>>>> (doseq [widget widgets] >>>>>>>> (long-running-widget-processor >>>>>>>> widget)) >>>>>>>> true)) >>>>>>>> bucketed-widgets))] >>>>>>>> (doall (map <!! workers))) >>>>>>>> >>>>>>>> https://gist.github.com/bfabry/ad830b1888e4fc550f88 >>>>>>>> >>>>>>>> All comments appreciated :-) >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Beau >>>>>>>> >>>>>>> -- >> You received this message because you are subscribed to the Google >> Groups "Clojure" group. >> To post to this group, send email to clo...@googlegroups.com >> <javascript:> >> Note that posts from new members are moderated - please be patient with >> your first post. >> To unsubscribe from this group, send email to >> clojure+u...@googlegroups.com <javascript:> >> For more options, visit this group at >> http://groups.google.com/group/clojure?hl=en >> --- >> You received this message because you are subscribed to the Google Groups >> "Clojure" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to clojure+u...@googlegroups.com <javascript:>. >> For more options, visit https://groups.google.com/d/optout. >> > -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.