Cool project; Thanks for working on and sharing this. Worth mentioning that Christian Weilbach built a thing called superv (based on the supervisor pattern in Erlang) which solves some similar problems using macros with some of the other core.async api, but I don't think implemented a version of either pipeline or pipeline blocking as you have here. So maybe something to look at either for complementary tooling, or for other ideas about how to approach this problem.
https://github.com/replikativ/superv.async Chris On Sunday, September 12, 2021 at 9:38:51 PM UTC-7 Tsutomu YANO wrote: > (Sorry, I send a response before writing text fully. This is a repost) > > The most large difference is safe exception-handling and easy per-request > result-handling. > > 'concurrently' is built on top of pipeline/pipeline-blocking of > core.async. It have functions like > `concurrent-process` and `concurrent-process-blocking` that depend on > `pipeline` and `pipeline-blocking`, > so you can pass input and output channels and parallel-count to the > functions same as core.async, > 'concurrently' internally create pipeline and use it for execution of a > supplied transducer. > > But APIs of core.async are very primitive. Programmers must handle > channels very carefully for protecting > program from accidental exceptions by passing exception-handlers for ALL > transducers (if you forget it, > the exceptions thrown by the exception-handler never be caught and just a > stack-trace is printed to stdout. > Application easily lost a chance to handle exceptions). > > And in a usecase where you are building a web application that have a > shared single pipeline, and many > requests use the same shared pipeline for calculation, you must carefully > handle the output of the shared > pipeline for retrieving only results for a request (because the pipeline > is shared, the output contains results of > other requester-threads). If some requester-thread mishandling the > output-channel and stop reading their own > results, the data will remain in the output-channel eternally and the the > pipeline will stop working. > It means that a thing similar to DEAD-LOCK of multithread programming > occurs easily. > > 'concurrently' is useful for such usecase. Most of verbose > exception-handlings and per-thread result-handlings are > handled by 'concurrently'. Things programmers must to do is just passing > input data to 'concurrently' function, > and read the result-channel returned by the function by calling > 'get-results'. > > 'concurrently' is a kind of a high-level API for core.async. > > > Tsutomu YANO > > > > 2021/09/13 13:09、'Tsutomu YANO' via Clojure <clo...@googlegroups.com > >のメール: > > > > The most large difference is safe exception-handling and per-thread > > > > 'concurrently' is built on top of pipeline/pipeline-blocking of > core.async. It have functions like > > `concurrent-process` and `concurrent-process-blocking` that depend on > `pipeline` and `pipeline-blocking`, > > so you can pass input and output channels and parallel-count to the > functions. It is same with core.async. > > > > But 'concurrently' wraps the created pipeline for protecting the > pipeline > > > > > > > >> 2021/09/13 0:36、Rangel <rasp...@gmail.com>のメール: > >> > >> Interesting project. > >> > >> Can you expand on any differences or similarities with core.async's > pipeline, pipeline-async, etc ? > >> > >> https://clojuredocs.org/clojure.core.async/pipeline > >> > >> On Sun, Sep 12, 2021 at 5:30 AM 'Tsutomu YANO' via Clojure < > clo...@googlegroups.com> wrote: > >> Hi clojurians, > >> > >> We publish our library for concurrent processing of data with > core.async. named 'concurrently' > >> > >> https://github.com/uzabase/concurrently > >> > >> > >> With 'concurrently', programmers can create shared process-pipelines > backed by core.async and > >> can share the pipelines safely/easily from multiple requester-threads. > >> > >> Shared pipeline can accepts requests from many requester-threads and > handle the requests in > >> shared concurrent pipeline of core.async and then split the calculated > results to each requesters. > >> > >> Using core.async for creating shared pipeline causes many difficulties > for programmers because: > >> > >> * Shared channels easily stack, because if some requester threads stop > reading a channel for > >> accidental exceptions, some data remain unread and nobody can write the > channel. it might > >> causes a shared pipeline stops (stacks) for waiting to write data to > the unread channel. > >> * You must carefully handle channels as they NEVER stack. > >> * All input-data are put onto a same pipeline and you must SPLIT data > from output of the shared pipeline > >> in any way for returning the processed results to each > requester-threads. > >> > >> > >> 'concurrently' handles all the problems described above. > >> Each requster-threads just pass input-data by calling a 'concurrently' > functions. Shared pipeline will return a 'Job', > >> then Each requster-threads can get all calculated results for the > input-data by calling 'get-results' function on the job. > >> No verbose tasks for protecting channels from accidental stack or > splitting calculated results for each requesters. > >> > >> > >> UZABASE is a company based on Japan that is processing/publishing > financial data on SaaS applications, > >> and uses Clojure language for many tasks. > >> > >> This 'concurrently' library is made for our streaming-data-processing > tasks and already is used for over 1 year. > >> Problems of this library are found and already are fixed while the long > application on real tasks, > >> So we decided to publish this library for many programmers. > >> > >> We hope that it is useful for someone. > >> > >> Thank you. > >> > >> > >> > >> Tsutomu YANO > >> > >> > >> -- > >> You received this message because you are subscribed to the Google > >> Groups "Clojure" group. > >> To post to this group, send email to clo...@googlegroups.com > >> Note that posts from new members are moderated - please be patient with > your first post. > >> To unsubscribe from this group, send email to > >> clojure+u...@googlegroups.com > >> For more options, visit this group at > >> http://groups.google.com/group/clojure?hl=en > >> --- > >> You received this message because you are subscribed to the Google > Groups "Clojure" group. > >> To unsubscribe from this group and stop receiving emails from it, send > an email to clojure+u...@googlegroups.com. > >> To view this discussion on the web visit > https://groups.google.com/d/msgid/clojure/47D5CA02-79AA-4267-9EFA-4B63D415957D%40me.com > . > >> > >> -- > >> You received this message because you are subscribed to the Google > >> Groups "Clojure" group. > >> To post to this group, send email to clo...@googlegroups.com > >> Note that posts from new members are moderated - please be patient with > your first post. > >> To unsubscribe from this group, send email to > >> clojure+u...@googlegroups.com > >> For more options, visit this group at > >> http://groups.google.com/group/clojure?hl=en > >> --- > >> You received this message because you are subscribed to the Google > Groups "Clojure" group. > >> To unsubscribe from this group and stop receiving emails from it, send > an email to clojure+u...@googlegroups.com. > >> To view this discussion on the web visit > https://groups.google.com/d/msgid/clojure/CAPo-QOsZ8WQRZkKvEsMzx1_c68a93e9eMZRaZgzuf_0i_ziQDQ%40mail.gmail.com > . > > > > -- > > You received this message because you are subscribed to the Google > > Groups "Clojure" group. > > To post to this group, send email to clo...@googlegroups.com > > Note that posts from new members are moderated - please be patient with > your first post. > > To unsubscribe from this group, send email to > > clojure+u...@googlegroups.com > > For more options, visit this group at > > http://groups.google.com/group/clojure?hl=en > > --- > > You received this message because you are subscribed to the Google > Groups "Clojure" group. > > To unsubscribe from this group and stop receiving emails from it, send > an email to clojure+u...@googlegroups.com. > > To view this discussion on the web visit > https://groups.google.com/d/msgid/clojure/9D6F96FC-171F-4E0B-9CCA-5CC252C5FD37%40me.com > . > > -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/clojure/4aee9510-4b93-4741-9154-a8130a6bbe86n%40googlegroups.com.