The flood gates are open Timothy! and on the back of that, I'm gonna dump 
some code in here so pls excuse me - it's on GitHub 
<https://github.com/raymcdermott/kafka-proxy/blob/master/src/kafka_proxy/core.clj>
 
too so maybe look there if it's too heavy for email.

I have this code in the go-loop but if I put in the transducer, that code 
throws an exception stating that the object (which I can print) is a String

(defn sse-data 
  [kafka-record] (...))

(defn matching-event
  [client-event-filter kafka-record] (...))

;------>>>>>>>>>>>> INLINE version - works: record on ConsumerRecord
(defn kafka-proxy-handler
  [request]
  (let [client-filter (or (get (:params request) "filter") ".*")
        consumer (topic-consumer "topic")
        kafka-ch (chan)]
    (go-loop []
      (if-let [records (.poll consumer 100)]
        (doseq [record records]
          (if (matching-event client-filter record)
            (>! kafka-ch (sse-data record)))))
      (recur))
    {:status  200
     :body    (s/->source kafka-ch)}))

;------>>>>>>>>>>>> TRANSDUCER version - fails: says method .offset does 
not exist on String 
(defn kafka-proxy-handler
  [request]
  (let [client-filter (or (get (:params request) "filter") ".*")
        consumer (topic-consumer "topic")
        kafka-ch (chan 1 (comp (map #(fn [kr] (sse-data kr)))
                               (filter #(fn [kr] (matching-event 
client-filter kr)))))]
    (go-loop []
      (if-let [records (.poll consumer 100)]
        (doseq [record records]
          (>! kafka-ch record)))
      (recur))
    {:status  200
     :body    (s/->source kafka-ch)}))


Is that the same thing or have I made a(nother) / different mistake?

Cheers

Ray


On Friday, 26 August 2016 02:02:24 UTC+2, tbc++ wrote:
>
> I'm not sure I've ever addressed this publicly, so I assume now's as good 
> a time as ever. 
>
> The reason the go block stops at function boundaries, is that changing a 
> function to be "async" changes it return type. With code transforms like 
> these a function that performs a parking take on a channel no longer 
> returns a object, it returns a "async object" that eventually returns an 
> object. Notice the difference here: 
>
> (fn [c]
>   (<!! c))
>
> (fn [c]
>   (go (<! c))
>
> Adding the "go" to the function (which is required to kick off the 
> transformation) changes the return type to being a channel of objects 
> instead of a single object. This is surfaced in other languages as well 
> that support parking behavior. Performing an async/await operation in C# 
> for example changes the type from "Object" to "Task<Object>". In essence, 
> parking is infectious. Any function that calls an async function must 
> itself either become blocking or parking.
>
> For example, for this code to work:
>
> (go (vec (map <! [c1 c2 c3 c4])))
>
> You would have to transform all the code in map, persistent vectors, seqs 
> and a few other bits of code. 
>
> Now there are some languages/libraries that support this behavior on the 
> JVM, two are Erjang and Pulsar. Both of these provide this functionality 
> via whole program code transformation. That is to say they perform 
> async/go-like transforms to all the code in the JVM, or at least all the 
> code that interfaces with async code. I consider this a valid approach, but 
> it is rather invasive. As such, we made a design decision early on in the 
> development of core.async to only perform local transformation. 
>
> Hopefully that provides some context. 
>
> Timothy
>
> On Thu, Aug 25, 2016 at 5:29 PM, Kevin Downey <red...@gmail.com 
> <javascript:>> wrote:
>
>> The analysis for the go macro to determine that the fn never escapes the
>> go block is not something core.async does. Because of that functions are
>> sort of a black box to the transforms the go macro does.
>>
>> http://dev.clojure.org/jira/browse/ASYNC-93 is a declined issue
>> regarding this. http://dev.clojure.org/jira/browse/ASYNC-57 is another
>> similar declined issue.
>>
>> On 08/25/2016 04:21 PM, hiskennyness wrote:
>> > I am getting an error about >! not being in a go block with this code:
>> >
>> > |
>> >       (go-loop [state :nl
>> >                 column 0
>> >                 last-ws nil
>> >                 buff ""]
>> >         (let [line-out(fn [c b]
>> >                          (>!out(apply str b (repeat (-col-width (count
>> > b))\space))))]
>> >           (cond
>> >             (>=column col-width)
>> >             (condp =state
>> >               :ws (do
>> >                     (line-out\|buff)
>> >                     (recur :nl 0nil""))
>> >          ..etc etc
>> > |
>> >
>> > I just changed the line-out call to just do...
>> >
>> > |
>> > (>!out-chan buff)
>> > |
>> >
>> > ...and it worked fine.
>> >
>> > So the failing code is both dynamically and lexically within the scope
>> > of the go-loop --- is that supposed to be that way? Or am I completely
>> > missing something?
>> >
>> > -kt
>> >
>> > --
>> > You received this message because you are subscribed to the Google
>> > Groups "Clojure" group.
>> > To post to this group, send email to clo...@googlegroups.com 
>> <javascript:>
>> > Note that posts from new members are moderated - please be patient with
>> > your first post.
>> > To unsubscribe from this group, send email to
>> > clojure+u...@googlegroups.com <javascript:>
>> > For more options, visit this group at
>> > http://groups.google.com/group/clojure?hl=en
>> > ---
>> > You received this message because you are subscribed to the Google
>> > Groups "Clojure" group.
>> > To unsubscribe from this group and stop receiving emails from it, send
>> > an email to clojure+u...@googlegroups.com <javascript:>
>> > <mailto:clojure+u...@googlegroups.com <javascript:>>.
>> > For more options, visit https://groups.google.com/d/optout.
>>
>>
>> --
>> And what is good, Phaedrus,
>> And what is not good—
>> Need we ask anyone to tell us these things?
>>
>> --
>> You received this message because you are subscribed to the Google
>> Groups "Clojure" group.
>> To post to this group, send email to clo...@googlegroups.com 
>> <javascript:>
>> Note that posts from new members are moderated - please be patient with 
>> your first post.
>> To unsubscribe from this group, send email to
>> clojure+u...@googlegroups.com <javascript:>
>> For more options, visit this group at
>> http://groups.google.com/group/clojure?hl=en
>> ---
>> You received this message because you are subscribed to the Google Groups 
>> "Clojure" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to clojure+u...@googlegroups.com <javascript:>.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> “One of the main causes of the fall of the Roman Empire was that–lacking 
> zero–they had no way to indicate successful termination of their C 
> programs.”
> (Robert Firth) 
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to