Ok, I think a got into a reasonable solution :) The code first:
(defn -expand [v f] (if (satisfies? cljs.core.async.impl.protocols/ReadPort v) (take! v f) (f v))) (defn async [t f & args] (fn [f1] (fn ([] (f1)) ([result] (f1 result)) ([result input] (-expand (f input) (fn [v] (((apply t (cons identity args)) f1) result v))))))) (let [value-chan (fn [v] (let [c (chan 1)] ; simple helper to simulate an async operation that just returns the input (put! c v) (close! c) c)) xform (comp (async map value-chan) ; using async! (async filter value-chan) ; same thing with filter ) c (chan 1 xform)] (async/onto-chan c [1 false 2 3]) (go (println (<! (async/into [] c))))) ; [1 2 3] Ok, what I did was create a transducer wrapper that relies into the wrapped transducer to take a function as the first argument (that's usually the function that needs the expanded value) and makes the proper calls for it. It worked great for the most transducers that I needed here, so, I consider a solution to my issue :) Hope it can help others that may got into similar issue. --- Wilker Lúcio http://about.me/wilkerlucio/bio Woboinc Consultant +55 81 82556600 On Sun, Sep 21, 2014 at 11:29 PM, Wilker <wilkerlu...@gmail.com> wrote: > Hi, I did some progress here, I was able to manage to create a custom > filter transducer that works seamlessly with regular values and channels > and can be composed with other transducers: > > ; on JVM I did this using protocols, but in ClojureScript I could not make > it work (some error when I tried to extend the ReadPort type), but this > works as same > (defn -expand [v f] > (if (satisfies? cljs.core.async.impl.protocols/ReadPort v) > (take! v f) > (f v))) > > ; my filter transducer that supports async operations > (defn filter-ext [pred] > (fn [f1] > (fn > ([] (f1)) > ([result] (f1 result)) > ([result input] > (-expand (pred input); here is the trick, so in this case I made > every operation dependent on a callback, so they can be normalized > (fn [v] > (if v > (f1 result input) > result))))))) > > (let [value-chan (fn [v] (let [c (chan 1)] ; simple helper to simulate an > async operation that just returns the input > (put! c v) > (close! c) > c)) > xform (comp (filter-ext value-chan) ; ok, filter async > (filter-ext #(>= % 2))) ; now filter sync > c (chan 1 xform)] > (async/onto-chan c [1 false 2 3]) > (go > (println (<! (async/into [] c))))) ; prints: [2 3] > > I could have my extended versions of every transducer to add the support, > the code is pretty much the same but has the extra step of checking for the > returned value on the operation. > > Not sure how that could ever be integrated into regular transducers (maybe > they could be extended somehow to include this feature selectively to avoid > the overhead), but for now this is the best that I could came up with. > > > --- > Wilker Lúcio > http://about.me/wilkerlucio/bio > Woboinc Consultant > +55 81 82556600 > > On Sun, Sep 21, 2014 at 10:39 PM, Wilker <wilkerlu...@gmail.com> wrote: > >> >> On Sun, Sep 21, 2014 at 10:08 PM, Sean Corfield <s...@corfield.org> >> wrote: >> >>> >> Hi Sean, >> >> Sorry, I don't really understood your suggestion... But let me try to >> make myself more clear of what I'm trying to accomplish: >> >> First let me say that I have a more Javascript background than Java, and >> my issue is more into ClojureScript world >> >> Javascript has a lot of async issues, specially when you are working on >> Node.JS world, every IO operation is async, although they have sync >> versions those versions block the world so I need to keep away from then, >> so, async callbacks is my only real options here. >> >> The first thing that I like to do in my async code is to wrap than into >> something that I can handle generically, in JS I used to do a lot of >> promises code, but now I'm core async, so I think making then "single >> channel value" just fits nice, and wrapping that way I can compose very >> well. >> >> Now I'm at this place, I have all those async functions that I wanna use >> to process my data, I wanna use those async functions, also regular >> functions. >> >> Before transducers, I've used the David Nolen's reactive helpers (on his >> blog source) to do my data processing using those channels, a good example >> on how I used to manage complex async pipelines are like this: >> >> (->> (scan-path "root/path") ; this will produce a channel that will >> output every file/dir path recursive from given path >> (r/filter-async is-file?) ; this requires async call to filter >> (f/filter (match-extension? #{"avi" "mpg"})) ; this will just check >> on the name, no async required >> (r/map-async read-hash-info) ; another async call, will generate a >> hash info from the path >> (r/map #(hash-map :hash %)) ; build a map from the value >> ) >> >> So, I like how the previous code is build, because I can really just >> composed from my simple sync and async functions, but all these use "custom >> transducers" you can say... >> >> Then comes Transducers that can rise the bar on the abstraction level, so >> I now can get rid of all those custom transducers, right? >> >> So, it just would be nice if I could get the same clean way to build my >> pipeline like I did before, but with transducers. >> >> --- >> Wilker Lúcio >> http://about.me/wilkerlucio/bio >> Woboinc Consultant >> +55 81 82556600 >> > > -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.