Ah, of course, the mutex is not reentrant.

So, one possible way to fix the situation would be to close the owned
channel manually by resetting its "closed" atom to true. Thinking
through all the implications should make for an interesting exercise.
:-) (For example, I think cleanup is performed correctly anyway, but I
haven't put in the time to make sure properly. Clearly this relies on
the implementation details of channels, so even if it is in fact not
broken with the current alpha, it might be in the future, and so it
might still be a better idea to implement a new channel. And so on.)

Here's the implementation, see below for an example from the REPL:

(defn closing-buffer [n]
  (let [b (async/buffer n)
        c (clojure.lang.Box. nil)]
    (reify
      clojure.core.async.impl.protocols/Buffer
      (full? [this]
        false)
      (remove! [this]
        (clojure.core.async.impl.protocols/remove! b))
      (add! [this itm]
        (if (clojure.core.async.impl.protocols/full? b)
          (do
            (reset! (.-closed
^clojure.core.async.impl.channels.ManyToManyChannel (.-val c)) true)
            nil)
          (clojure.core.async.impl.protocols/add! b itm)))
      clojure.core.async.impl.protocols/UnblockingBuffer
      clojure.lang.Counted
      (count [this]
        (count b))
      PBufferThatMightCloseAChannel
      (this-is-your-channel! [this given-c]
        (if (nil? (.-val c))
          (do
            (set! (.-val c) given-c)
            true)
          false)))))

(defn closing-channel [n]
  (let [b (closing-buffer n)
        c (async/chan b)]
    (this-is-your-channel! b c)
    c))

>From the REPL:

user> (def test-chan (closing-channel 2))
#'user/test-chan
user> (async/>!! test-chan :foo)
nil
user> (async/>!! test-chan :foo)
nil
user> (async/>!! test-chan :foo)
nil
user> (async/<!! test-chan)
:foo
user> (async/<!! test-chan)
:foo
user> (async/<!! test-chan)
nil
user> (async/>!! test-chan :foo)
nil
user> (async/<!! test-chan)
nil

Cheers,
Michał


On 14 January 2014 07:41, t x <txrev...@gmail.com> wrote:
> If I understand your strategy correctly, it works as follows:
>
> * write new buffer class
> * extend buffer class with a field/atom that stores the channel that owns
> the buffer
>
> * on overflow, call (async/close!) on the owned channel
>
> However, looking at :
>
> *
> https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/channels.clj#L179
> *
> https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/channels.clj#L55
>
> isn't this a deadlock by acquiring the mutex while holding the mutex?
>
>
> On Mon, Jan 13, 2014 at 10:27 PM, Michał Marczyk <michal.marc...@gmail.com>
> wrote:
>>
>> Instead of actually implementing a new channel type, you could use the
>> regular MTMCs, but with your own factory function, custom buffers and
>> a sprinkle of mutability:
>>
>> (defprotocol PBufferThatMightCloseAChannel
>>   (this-is-your-channel! [this c]
>>     "Informs the buffer that c is the channel it might need to close"))
>>
>> ;; implement a buffer with an impl for the above protocol;
>> ;; have it close the appropriate channel on overflow;
>> ;; suppose closing-buffer is the factory function
>> ;; producing such channels
>>
>> (defn closing-channel [n]
>>   (let [b (closing-buffer n)
>>         c (chan b)]
>>     (this-is-your-channel! b c)
>>     c))
>>
>> Cheers,
>> Michał
>>
>>
>> On 14 January 2014 05:47, t x <txrev...@gmail.com> wrote:
>> > I am aware of:
>> >
>> >   DroppingBuffer and SliddingBuffer
>> >
>> > I would like to build a channel with different semantics:
>> >
>> >   When I try to put an item on a full channel:
>> >
>> >   * DroppingBuffer drops the new item
>> >   * SliddingBuffer evicts the oldest item
>> >
>> >   * ClosingBuffer should _close the channel_
>> >
>> > (thus, the receiver eventually gets a nil, and realizes "ah, was
>> > disconnected")
>> >
>> > Now, I am looking at the github core.async code:
>> >
>> > DroppingBuffer:
>> >
>> > https://github.com/clojure/core.async/blob/76317035d386ce2a1d98c2c349da9b898b480c55/src/main/clojure/clojure/core/async/impl/buffers.clj#L33-L45
>> >
>> > ManyToManyChannel:
>> >
>> > https://github.com/clojure/core.async/blob/76317035d386ce2a1d98c2c349da9b898b480c55/src/main/clojure/clojure/core/async/impl/channels.clj#L31-L198
>> >
>> > ## Problem:
>> >
>> > DroppingBuffer is easy to understand. However, I don't think I can
>> > implement
>> > "ClosingBuffer" as Buffer.
>> >
>> > I feel taht "ClosingBuffer" can only be implemneted at the Channel
>> > Level.
>> >
>> > However, the Channel Code is complicated.
>> >
>> > Is there a neat hack / trick to do what I described above?
>> >
>> > Thanks!
>> >
>> > --
>> > --
>> > You received this message because you are subscribed to the Google
>> > Groups "Clojure" group.
>> > To post to this group, send email to clojure@googlegroups.com
>> > Note that posts from new members are moderated - please be patient with
>> > your
>> > first post.
>> > To unsubscribe from this group, send email to
>> > clojure+unsubscr...@googlegroups.com
>> > For more options, visit this group at
>> > http://groups.google.com/group/clojure?hl=en
>> > ---
>> > You received this message because you are subscribed to the Google
>> > Groups
>> > "Clojure" group.
>> > To unsubscribe from this group and stop receiving emails from it, send
>> > an
>> > email to clojure+unsubscr...@googlegroups.com.
>> > For more options, visit https://groups.google.com/groups/opt_out.
>>
>> --
>> --
>> You received this message because you are subscribed to the Google
>> Groups "Clojure" group.
>> To post to this group, send email to clojure@googlegroups.com
>> Note that posts from new members are moderated - please be patient with
>> your first post.
>> To unsubscribe from this group, send email to
>> clojure+unsubscr...@googlegroups.com
>> For more options, visit this group at
>> http://groups.google.com/group/clojure?hl=en
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Clojure" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to clojure+unsubscr...@googlegroups.com.
>> For more options, visit https://groups.google.com/groups/opt_out.
>
>
> --
> --
> You received this message because you are subscribed to the Google
> Groups "Clojure" group.
> To post to this group, send email to clojure@googlegroups.com
> Note that posts from new members are moderated - please be patient with your
> first post.
> To unsubscribe from this group, send email to
> clojure+unsubscr...@googlegroups.com
> For more options, visit this group at
> http://groups.google.com/group/clojure?hl=en
> ---
> You received this message because you are subscribed to the Google Groups
> "Clojure" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to clojure+unsubscr...@googlegroups.com.
> For more options, visit https://groups.google.com/groups/opt_out.

-- 
-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Reply via email to