Humm... actually, I was happy too soon...

Ends up this following example doesn't work:

(let [value-chan (fn [v] (let [c (chan 1)] ; simple helper to simulate an
async operation that just returns the input
                           (go
                             (>! c v)
                             (close! c))
                           c))
      xform (comp (async filter value-chan)                 ; can be used
                  ) ; now filter sync
      c (chan 1 xform)]
  (async/onto-chan c [1 false 2 3])
  (go
    (println (<! (async/into [] c))))) ; got an empty list instead of [1 2
3]

It seems that it only works if the channel had the value before the
transducer call the `take!`, seems that unless it has the value immediately
available, it doesn't works...

---
Wilker Lúcio
http://about.me/wilkerlucio/bio
Woboinc Consultant
+55 81 82556600

On Mon, Sep 22, 2014 at 12:00 AM, Wilker <wilkerlu...@gmail.com> wrote:

> Ok, I think a got into a reasonable solution :)
>
> The code first:
>
> (defn -expand [v f]
>   (if (satisfies? cljs.core.async.impl.protocols/ReadPort v)
>     (take! v f)
>     (f v)))
>
> (defn async [t f & args]
>   (fn [f1]
>     (fn
>       ([] (f1))
>       ([result] (f1 result))
>       ([result input]
>        (-expand (f input)
>                 (fn [v]
>                   (((apply t (cons identity args)) f1) result v)))))))
>
> (let [value-chan (fn [v] (let [c (chan 1)] ; simple helper to simulate an
> async operation that just returns the input
>                            (put! c v)
>                            (close! c)
>                            c))
>       xform (comp (async map value-chan)  ; using async!
>                   (async filter value-chan) ; same thing with filter
>                   )
>       c (chan 1 xform)]
>   (async/onto-chan c [1 false 2 3])
>   (go
>     (println (<! (async/into [] c))))) ; [1 2 3]
>
> Ok, what I did was create a transducer wrapper that relies into the
> wrapped transducer to take a function as the first argument (that's usually
> the function that needs the expanded value) and makes the proper calls for
> it.
>
> It worked great for the most transducers that I needed here, so, I
> consider a solution to my issue :)
>
> Hope it can help others that may got into similar issue.
>
> ---
> Wilker Lúcio
> http://about.me/wilkerlucio/bio
> Woboinc Consultant
> +55 81 82556600
>
> On Sun, Sep 21, 2014 at 11:29 PM, Wilker <wilkerlu...@gmail.com> wrote:
>
>> Hi, I did some progress here, I was able to manage to create a custom
>> filter transducer that works seamlessly with regular values and channels
>> and can be composed with other transducers:
>>
>> ; on JVM I did this using protocols, but in ClojureScript I could not
>> make it work (some error when I tried to extend the ReadPort type), but
>> this works as same
>> (defn -expand [v f]
>>   (if (satisfies? cljs.core.async.impl.protocols/ReadPort v)
>>     (take! v f)
>>     (f v)))
>>
>> ; my filter transducer that supports async operations
>> (defn filter-ext [pred]
>>   (fn [f1]
>>     (fn
>>       ([] (f1))
>>       ([result] (f1 result))
>>       ([result input]
>>        (-expand (pred input); here is the trick, so in this case I made
>> every operation dependent on a callback, so they can be normalized
>>                 (fn [v]
>>                   (if v
>>                     (f1 result input)
>>                     result)))))))
>>
>> (let [value-chan (fn [v] (let [c (chan 1)] ; simple helper to simulate an
>> async operation that just returns the input
>>                            (put! c v)
>>                            (close! c)
>>                            c))
>>       xform (comp (filter-ext value-chan) ; ok, filter async
>>                   (filter-ext #(>= % 2))) ; now filter sync
>>       c (chan 1 xform)]
>>   (async/onto-chan c [1 false 2 3])
>>   (go
>>     (println (<! (async/into [] c))))) ; prints: [2 3]
>>
>> I could have my extended versions of every transducer to add the support,
>> the code is pretty much the same but has the extra step of checking for the
>> returned value on the operation.
>>
>> Not sure how that could ever be integrated into regular transducers
>> (maybe they could be extended somehow to include this feature selectively
>> to avoid the overhead), but for now this is the best that I could came up
>> with.
>>
>>
>> ---
>> Wilker Lúcio
>> http://about.me/wilkerlucio/bio
>> Woboinc Consultant
>> +55 81 82556600
>>
>> On Sun, Sep 21, 2014 at 10:39 PM, Wilker <wilkerlu...@gmail.com> wrote:
>>
>>>
>>> On Sun, Sep 21, 2014 at 10:08 PM, Sean Corfield <s...@corfield.org>
>>> wrote:
>>>
>>>>
>>> Hi Sean,
>>>
>>> Sorry, I don't really understood your suggestion... But let me try to
>>> make myself more clear of what I'm trying to accomplish:
>>>
>>> First let me say that I have a more Javascript background than Java, and
>>> my issue is more into ClojureScript world
>>>
>>> Javascript has a lot of async issues, specially when you are working on
>>> Node.JS world, every IO operation is async, although they have sync
>>> versions those versions block the world so I need to keep away from then,
>>> so, async callbacks is my only real options here.
>>>
>>> The first thing that I like to do in my async code is to wrap than into
>>> something that I can handle generically, in JS I used to do a lot of
>>> promises code, but now I'm core async, so I think making then "single
>>> channel value" just fits nice, and wrapping that way I can compose very
>>> well.
>>>
>>> Now I'm at this place, I have all those async functions that I wanna use
>>> to process my data, I wanna use those async functions, also regular
>>> functions.
>>>
>>> Before transducers, I've used the David Nolen's reactive helpers (on his
>>> blog source) to do my data processing using those channels, a good example
>>> on how I used to manage complex async pipelines are like this:
>>>
>>> (->> (scan-path "root/path") ; this will produce a channel that will
>>> output every file/dir path recursive from given path
>>>      (r/filter-async is-file?)  ; this requires async call to filter
>>>      (f/filter (match-extension? #{"avi" "mpg"})) ; this will just check
>>> on the name, no async required
>>>      (r/map-async read-hash-info) ; another async call, will generate a
>>> hash info from the path
>>>      (r/map #(hash-map :hash %)) ; build a map from the value
>>>      )
>>>
>>> So, I like how the previous code is build, because I can really just
>>> composed from my simple sync and async functions, but all these use "custom
>>> transducers" you can say...
>>>
>>> Then comes Transducers that can rise the bar on the abstraction level,
>>> so I now can get rid of all those custom transducers, right?
>>>
>>> So, it just would be nice if I could get the same clean way to build my
>>> pipeline like I did before, but with transducers.
>>>
>>> ---
>>> Wilker Lúcio
>>> http://about.me/wilkerlucio/bio
>>> Woboinc Consultant
>>> +55 81 82556600
>>>
>>
>>
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to