Just to avoid any possibility of giving the impression that I'm
offering the above as a well-tested component: I wouldn't actually use
closing-channel as defined above without giving much more thought to
all implications. The current version is just a quick sketch.

Also, if the alt! / alts! approach works satisfactorily, or if another
approach based on the available public primitives does, I'd definitely
avoid implementing new funky stuff.

Fun problem to think about, though.


On 14 January 2014 09:47, Michał Marczyk <michal.marc...@gmail.com> wrote:
> A sketch of a different approach:
>
> alt!  / alts! on the main channel and an extra channel with :priority
> true when putting. Use an unblocking buffer in the extra channel. If
> you end up putting on the extra channel, close the main channel.
> (core.async channels can be closed multiple times.)
>
> Here a consumer could take something off the channel before we close,
> making it possible to a different producer to put something on the
> channel possibly before we close, meaning that the channel doesn't,
> strictly speaking, get closed as soon as an "overflowing put" arrives.
> Of course if the timing is that close, the end result is no different
> to what would happen had we arrived at a slightly later time.
>
>
> On 14 January 2014 09:37, Michał Marczyk <michal.marc...@gmail.com> wrote:
>> Ah, of course, the mutex is not reentrant.
>>
>> So, one possible way to fix the situation would be to close the owned
>> channel manually by resetting its "closed" atom to true. Thinking
>> through all the implications should make for an interesting exercise.
>> :-) (For example, I think cleanup is performed correctly anyway, but I
>> haven't put in the time to make sure properly. Clearly this relies on
>> the implementation details of channels, so even if it is in fact not
>> broken with the current alpha, it might be in the future, and so it
>> might still be a better idea to implement a new channel. And so on.)
>>
>> Here's the implementation, see below for an example from the REPL:
>>
>> (defn closing-buffer [n]
>>   (let [b (async/buffer n)
>>         c (clojure.lang.Box. nil)]
>>     (reify
>>       clojure.core.async.impl.protocols/Buffer
>>       (full? [this]
>>         false)
>>       (remove! [this]
>>         (clojure.core.async.impl.protocols/remove! b))
>>       (add! [this itm]
>>         (if (clojure.core.async.impl.protocols/full? b)
>>           (do
>>             (reset! (.-closed
>> ^clojure.core.async.impl.channels.ManyToManyChannel (.-val c)) true)
>>             nil)
>>           (clojure.core.async.impl.protocols/add! b itm)))
>>       clojure.core.async.impl.protocols/UnblockingBuffer
>>       clojure.lang.Counted
>>       (count [this]
>>         (count b))
>>       PBufferThatMightCloseAChannel
>>       (this-is-your-channel! [this given-c]
>>         (if (nil? (.-val c))
>>           (do
>>             (set! (.-val c) given-c)
>>             true)
>>           false)))))
>>
>> (defn closing-channel [n]
>>   (let [b (closing-buffer n)
>>         c (async/chan b)]
>>     (this-is-your-channel! b c)
>>     c))
>>
>> From the REPL:
>>
>> user> (def test-chan (closing-channel 2))
>> #'user/test-chan
>> user> (async/>!! test-chan :foo)
>> nil
>> user> (async/>!! test-chan :foo)
>> nil
>> user> (async/>!! test-chan :foo)
>> nil
>> user> (async/<!! test-chan)
>> :foo
>> user> (async/<!! test-chan)
>> :foo
>> user> (async/<!! test-chan)
>> nil
>> user> (async/>!! test-chan :foo)
>> nil
>> user> (async/<!! test-chan)
>> nil
>>
>> Cheers,
>> Michał
>>
>>
>> On 14 January 2014 07:41, t x <txrev...@gmail.com> wrote:
>>> If I understand your strategy correctly, it works as follows:
>>>
>>> * write new buffer class
>>> * extend buffer class with a field/atom that stores the channel that owns
>>> the buffer
>>>
>>> * on overflow, call (async/close!) on the owned channel
>>>
>>> However, looking at :
>>>
>>> *
>>> https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/channels.clj#L179
>>> *
>>> https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/channels.clj#L55
>>>
>>> isn't this a deadlock by acquiring the mutex while holding the mutex?
>>>
>>>
>>> On Mon, Jan 13, 2014 at 10:27 PM, Michał Marczyk <michal.marc...@gmail.com>
>>> wrote:
>>>>
>>>> Instead of actually implementing a new channel type, you could use the
>>>> regular MTMCs, but with your own factory function, custom buffers and
>>>> a sprinkle of mutability:
>>>>
>>>> (defprotocol PBufferThatMightCloseAChannel
>>>>   (this-is-your-channel! [this c]
>>>>     "Informs the buffer that c is the channel it might need to close"))
>>>>
>>>> ;; implement a buffer with an impl for the above protocol;
>>>> ;; have it close the appropriate channel on overflow;
>>>> ;; suppose closing-buffer is the factory function
>>>> ;; producing such channels
>>>>
>>>> (defn closing-channel [n]
>>>>   (let [b (closing-buffer n)
>>>>         c (chan b)]
>>>>     (this-is-your-channel! b c)
>>>>     c))
>>>>
>>>> Cheers,
>>>> Michał
>>>>
>>>>
>>>> On 14 January 2014 05:47, t x <txrev...@gmail.com> wrote:
>>>> > I am aware of:
>>>> >
>>>> >   DroppingBuffer and SliddingBuffer
>>>> >
>>>> > I would like to build a channel with different semantics:
>>>> >
>>>> >   When I try to put an item on a full channel:
>>>> >
>>>> >   * DroppingBuffer drops the new item
>>>> >   * SliddingBuffer evicts the oldest item
>>>> >
>>>> >   * ClosingBuffer should _close the channel_
>>>> >
>>>> > (thus, the receiver eventually gets a nil, and realizes "ah, was
>>>> > disconnected")
>>>> >
>>>> > Now, I am looking at the github core.async code:
>>>> >
>>>> > DroppingBuffer:
>>>> >
>>>> > https://github.com/clojure/core.async/blob/76317035d386ce2a1d98c2c349da9b898b480c55/src/main/clojure/clojure/core/async/impl/buffers.clj#L33-L45
>>>> >
>>>> > ManyToManyChannel:
>>>> >
>>>> > https://github.com/clojure/core.async/blob/76317035d386ce2a1d98c2c349da9b898b480c55/src/main/clojure/clojure/core/async/impl/channels.clj#L31-L198
>>>> >
>>>> > ## Problem:
>>>> >
>>>> > DroppingBuffer is easy to understand. However, I don't think I can
>>>> > implement
>>>> > "ClosingBuffer" as Buffer.
>>>> >
>>>> > I feel taht "ClosingBuffer" can only be implemneted at the Channel
>>>> > Level.
>>>> >
>>>> > However, the Channel Code is complicated.
>>>> >
>>>> > Is there a neat hack / trick to do what I described above?
>>>> >
>>>> > Thanks!
>>>> >
>>>> > --
>>>> > --
>>>> > You received this message because you are subscribed to the Google
>>>> > Groups "Clojure" group.
>>>> > To post to this group, send email to clojure@googlegroups.com
>>>> > Note that posts from new members are moderated - please be patient with
>>>> > your
>>>> > first post.
>>>> > To unsubscribe from this group, send email to
>>>> > clojure+unsubscr...@googlegroups.com
>>>> > For more options, visit this group at
>>>> > http://groups.google.com/group/clojure?hl=en
>>>> > ---
>>>> > You received this message because you are subscribed to the Google
>>>> > Groups
>>>> > "Clojure" group.
>>>> > To unsubscribe from this group and stop receiving emails from it, send
>>>> > an
>>>> > email to clojure+unsubscr...@googlegroups.com.
>>>> > For more options, visit https://groups.google.com/groups/opt_out.
>>>>
>>>> --
>>>> --
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Clojure" group.
>>>> To post to this group, send email to clojure@googlegroups.com
>>>> Note that posts from new members are moderated - please be patient with
>>>> your first post.
>>>> To unsubscribe from this group, send email to
>>>> clojure+unsubscr...@googlegroups.com
>>>> For more options, visit this group at
>>>> http://groups.google.com/group/clojure?hl=en
>>>> ---
>>>> You received this message because you are subscribed to the Google Groups
>>>> "Clojure" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send an
>>>> email to clojure+unsubscr...@googlegroups.com.
>>>> For more options, visit https://groups.google.com/groups/opt_out.
>>>
>>>
>>> --
>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "Clojure" group.
>>> To post to this group, send email to clojure@googlegroups.com
>>> Note that posts from new members are moderated - please be patient with your
>>> first post.
>>> To unsubscribe from this group, send email to
>>> clojure+unsubscr...@googlegroups.com
>>> For more options, visit this group at
>>> http://groups.google.com/group/clojure?hl=en
>>> ---
>>> You received this message because you are subscribed to the Google Groups
>>> "Clojure" group.
>>> To unsubscribe from this group and stop receiving emails from it, send an
>>> email to clojure+unsubscr...@googlegroups.com.
>>> For more options, visit https://groups.google.com/groups/opt_out.

-- 
-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Reply via email to