The flood gates are open Timothy! and on the back of that, I'm gonna dump some code in here so pls excuse me - it's on GitHub <https://github.com/raymcdermott/kafka-proxy/blob/master/src/kafka_proxy/core.clj> too so maybe look there if it's too heavy for email.
I have this code in the go-loop but if I put in the transducer, that code throws an exception stating that the object (which I can print) is a String (defn sse-data [kafka-record] (...)) (defn matching-event [client-event-filter kafka-record] (...)) ;------>>>>>>>>>>>> INLINE version - works: record on ConsumerRecord (defn kafka-proxy-handler [request] (let [client-filter (or (get (:params request) "filter") ".*") consumer (topic-consumer "topic") kafka-ch (chan)] (go-loop [] (if-let [records (.poll consumer 100)] (doseq [record records] (if (matching-event client-filter record) (>! kafka-ch (sse-data record))))) (recur)) {:status 200 :body (s/->source kafka-ch)})) ;------>>>>>>>>>>>> TRANSDUCER version - fails: says method .offset does not exist on String (defn kafka-proxy-handler [request] (let [client-filter (or (get (:params request) "filter") ".*") consumer (topic-consumer "topic") kafka-ch (chan 1 (comp (map #(fn [kr] (sse-data kr))) (filter #(fn [kr] (matching-event client-filter kr)))))] (go-loop [] (if-let [records (.poll consumer 100)] (doseq [record records] (>! kafka-ch record))) (recur)) {:status 200 :body (s/->source kafka-ch)})) Is that the same thing or have I made a(nother) / different mistake? Cheers Ray On Friday, 26 August 2016 02:02:24 UTC+2, tbc++ wrote: > > I'm not sure I've ever addressed this publicly, so I assume now's as good > a time as ever. > > The reason the go block stops at function boundaries, is that changing a > function to be "async" changes it return type. With code transforms like > these a function that performs a parking take on a channel no longer > returns a object, it returns a "async object" that eventually returns an > object. Notice the difference here: > > (fn [c] > (<!! c)) > > (fn [c] > (go (<! c)) > > Adding the "go" to the function (which is required to kick off the > transformation) changes the return type to being a channel of objects > instead of a single object. This is surfaced in other languages as well > that support parking behavior. Performing an async/await operation in C# > for example changes the type from "Object" to "Task<Object>". In essence, > parking is infectious. Any function that calls an async function must > itself either become blocking or parking. > > For example, for this code to work: > > (go (vec (map <! [c1 c2 c3 c4]))) > > You would have to transform all the code in map, persistent vectors, seqs > and a few other bits of code. > > Now there are some languages/libraries that support this behavior on the > JVM, two are Erjang and Pulsar. Both of these provide this functionality > via whole program code transformation. That is to say they perform > async/go-like transforms to all the code in the JVM, or at least all the > code that interfaces with async code. I consider this a valid approach, but > it is rather invasive. As such, we made a design decision early on in the > development of core.async to only perform local transformation. > > Hopefully that provides some context. > > Timothy > > On Thu, Aug 25, 2016 at 5:29 PM, Kevin Downey <red...@gmail.com > <javascript:>> wrote: > >> The analysis for the go macro to determine that the fn never escapes the >> go block is not something core.async does. Because of that functions are >> sort of a black box to the transforms the go macro does. >> >> http://dev.clojure.org/jira/browse/ASYNC-93 is a declined issue >> regarding this. http://dev.clojure.org/jira/browse/ASYNC-57 is another >> similar declined issue. >> >> On 08/25/2016 04:21 PM, hiskennyness wrote: >> > I am getting an error about >! not being in a go block with this code: >> > >> > | >> > (go-loop [state :nl >> > column 0 >> > last-ws nil >> > buff ""] >> > (let [line-out(fn [c b] >> > (>!out(apply str b (repeat (-col-width (count >> > b))\space))))] >> > (cond >> > (>=column col-width) >> > (condp =state >> > :ws (do >> > (line-out\|buff) >> > (recur :nl 0nil"")) >> > ..etc etc >> > | >> > >> > I just changed the line-out call to just do... >> > >> > | >> > (>!out-chan buff) >> > | >> > >> > ...and it worked fine. >> > >> > So the failing code is both dynamically and lexically within the scope >> > of the go-loop --- is that supposed to be that way? Or am I completely >> > missing something? >> > >> > -kt >> > >> > -- >> > You received this message because you are subscribed to the Google >> > Groups "Clojure" group. >> > To post to this group, send email to clo...@googlegroups.com >> <javascript:> >> > Note that posts from new members are moderated - please be patient with >> > your first post. >> > To unsubscribe from this group, send email to >> > clojure+u...@googlegroups.com <javascript:> >> > For more options, visit this group at >> > http://groups.google.com/group/clojure?hl=en >> > --- >> > You received this message because you are subscribed to the Google >> > Groups "Clojure" group. >> > To unsubscribe from this group and stop receiving emails from it, send >> > an email to clojure+u...@googlegroups.com <javascript:> >> > <mailto:clojure+u...@googlegroups.com <javascript:>>. >> > For more options, visit https://groups.google.com/d/optout. >> >> >> -- >> And what is good, Phaedrus, >> And what is not good— >> Need we ask anyone to tell us these things? >> >> -- >> You received this message because you are subscribed to the Google >> Groups "Clojure" group. >> To post to this group, send email to clo...@googlegroups.com >> <javascript:> >> Note that posts from new members are moderated - please be patient with >> your first post. >> To unsubscribe from this group, send email to >> clojure+u...@googlegroups.com <javascript:> >> For more options, visit this group at >> http://groups.google.com/group/clojure?hl=en >> --- >> You received this message because you are subscribed to the Google Groups >> "Clojure" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to clojure+u...@googlegroups.com <javascript:>. >> For more options, visit https://groups.google.com/d/optout. >> > > > > -- > “One of the main causes of the fall of the Roman Empire was that–lacking > zero–they had no way to indicate successful termination of their C > programs.” > (Robert Firth) > -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.