Hi Clement,

it's a nice investigation of the benefits and limits of using reflect.

I guess it would also be as interesting to investigate what this library
would look like if the current draft for generics were to be accepted :)

-s


On Sun, Aug 4, 2019 at 4:13 PM clement auger <clementauger...@gmail.com>
wrote:

> You tell me.
>
>
> https://github.com/clementauger/sta
>
>
> sta - stream async
>
> Implements responsive data stream pipeline.
>
> It is a reasearch and concept project not to be used in production.
> <https://github.com/clementauger/sta#install>Install go get -u
> github.com/clementauger/sta <https://github.com/clementauger/sta#doc>Doc
> godoc.org/github.com/clementauger/sta
> <https://github.com/clementauger/sta#example>Example
>
> most simple.
>
> sta.Map([]string{"a", "b", "c"}).
>       Map(strings.ToUpper).
>       Sink(fmt.Println)
>
> This example demonstrate the implementation of a pipeline that bulks by
> slices of strings of length 4. It declares two parallel workers to change
> the strings cases. It outputs resulting values to stdout.
>
> sta.Map([]string{"a","b"}).
>   Map(sta.Accumulate(make([]string, 4), time.Second)).
>   Map(sta.Each(strings.toUpper), sta.Each(strings.toUpper)).
>   Sink(fmt.Println)
>
> <https://github.com/clementauger/sta#rationale>Rationale
> <https://github.com/clementauger/sta#introduction>Introduction
>
> sta takes full advantage of the CSP channel based go capabilities to
> provide a simple implementation to compose, implement and refactor
> responsive data stream pipeline.
>
> A data stream pipeline is said to be responsive when it is able to react
> with its downstream at any point in time in response to a variation of its
> input.
>
> An implementation is said to be simple to compose, implement and refactor
> a data stream pipeline if its overall result expresses the solution with
> less lines of code, easier understanding, improved rewriting capabilities
> and testing experience.
>
> Does this attempt reaches its goal ? yes and no...
> <https://github.com/clementauger/sta#concepts>Concepts
>
> https://blog.golang.org/pipelines
> <https://github.com/clementauger/sta#usage>Usage
>
> sta exposes a Map function, to create stream instances.
>
>   s := sta.Map(src)
>
> src is a value that can take a plurality of data kind.
>
>   s := sta.Map([]string{"a","b"})
>   s = sta.Map([]int{1,2})
>   s = sta.Map(make(chan string))
>   s = sta.Map(func(output chan string){ output<-"hello world!" })
>
> sta.Map reads the given input in a separate routine and manages for it
> the required output communication channels.
>
> The generated output channels are given to downstream transforms of the
> stream.
>
>   s := sta.Map([]string{"a","b"}).
>         Map(func(v string) int { return len(v)})
>
> stream.Map transforms a given input to an output, in a separate routine.
> It generates the required communication channels and connects them with the
> upstream and downstream automatically.
>
> To handle fine control of the data flow, stream.Map can handle functions
> that receives the upstream channel. Those functions must return a processor
> function that implements the loop over the upstream channel, and an output
> channel they are writing. The output channel is closed after that the
> processor function has terminated.
>
>   s := sta.Map([]string{"a","b"}).
>         Map(func(input chan string) (func()error, chan int) {
>           output := make(chan int)
>           processor := func()error {
>             for v := range input {
>               output<-len(v)
>             }
>           }
>           return processor, output
>         })
>
> To execute the pipeline, the developer must call for the stream.Sink
> function. stream.Sink is realy just like stream.Map except that it closes
> the stream by executing it.
>
>   err := sta.Map([]string{"a","b"}).
>         Map(strings.ToUpper).
>         Sink(sta.DevNull)
>
> stream.Sink writes the destination in a separate routine.
>
> The given destination value can be of kinds such as slice pointers,
> channels or functions.
>
>   outSlice := []string{}
>   sta.Map([]string{"a","b"}).Sink(&outSlice)
>
>   outChan := make(chan string)
>   sta.Map([]string{"a","b"}).Sink(outChan)
>
>   outFn := func(v string){}
>   sta.Map([]string{"a","b"}).Sink(outFn)
>
>   outChanFn := func(v chan string) (func() error) { return func()error{return 
> nil}}
>   sta.Map([]string{"a","b"}).Sink(outChanFn)
>
> <https://github.com/clementauger/sta#merge>Merge
>
> To merge a source, simply add more sources to the stream. Each source runs
> into their own routine.
>
> It results in a simple merge operation of the output values.
>
> Sources can have different kind, but they should converge to a compatible
> output type.
>
>   sta.Map(
>     []string{"a","b"},
>     []string{"c","d"},
>     func(output chan string) {
>       output<-"e"
>       output<-"f"
>     },
>     func(output chan string) {
>       output<-"e"
>       output<-"f"
>     },
>   )
>
> <https://github.com/clementauger/sta#parallel>Parallel
>
> To implement parallel transforms, simply add more transform to the
> targeted step. Each added transform runs into its own routine.
>
> The stream will implement automatic distribution of input data and inline
> output data to downstream.
>
> sta.Map([]string{"a","b"}).
>   Map(strings.ToUpper, strings.ToUpper, strings.ToUpper)
>
> sta.Map([]string{"a","b"}).
>   Map(sta.Workers(3, strings.ToUpper)...)
>
> sta.Map([]string{"a","b"}).
>   Map(sta.Workers(3, func() interface{} {
>     // per routine scoped values goes here.
>     return strings.ToUpper
>   })...)
>
> This applies to stream.Sink aswell.
> <https://github.com/clementauger/sta#broadcast>Broadcast
>
> SinkBroadcast applies to sinks, it invokes each destination with every
> received value. Each destination runs into its own routine.
>
> sta.Map([]string{"a","b"}).
>   Map(strings.ToUpper).
>   Sink(sta.SinkBroadcast(sta.DevNull, sta.DevNull, sta.DevNull))
>
> <https://github.com/clementauger/sta#bulk-processing>Bulk processing
>
> To facilitate bulk processing sta.Accumulate and sta.Each provides
> automatic plumbing.
>
> sta.Accumulate is a responsive data buffer. It reads upstream and
> accumulates every input into slices. When the slice exceeds a maximum
> length, it is copied downstream. The given duration will ensure that if the
> upstream starves, pending data is being sent to the downstream asap.
>
> sta.Each is an helper returns function that takes slices in input and
> invoke given function for each value.
>
> This example demonstrate the implementation of a pipeline that bulks by
> slices of strings of length 4. It declares two parallel workers to change
> the strings cases. It outputs resulting values to stdout.
>
> sta.Map([]string{"a","b"}).
>   Map(sta.Accumulate(make([]string, 4), time.Second)).
>   Map(sta.Each(strings.toUpper), sta.Each(strings.toUpper)).
>   Sink(fmt.Println)
>
> To implement an alternative logic to handle the data flow see sta.Tick.
> <https://github.com/clementauger/sta#author-notes>Author notes
>
> 1- While this defintely provides faster and somehow easier programming
> capabilities of complex algorithm, the lacks of methods signature (it uses
> interface{} everywhere) make it easy to missuse.
>
> 2 - Because it deals with so many various inputs, unders some
> circumstances, it has to create additionnal communication channels and
> routines that would not be required with human taylor made writing.
>
> 3 - I like i don t have to care anymore about closing mechanism and slices
> precise management.
>
> 4 - if you want to improve that attempts, i suggest you to work backward
> from idiomatic patterns to their reflective version. And more generally,
> write a full paper at first.
>
> 5 - The implementation always tries to converge to a common usable form of
> input to deal with the variety of user-input. sources converges to func()
> (processor func() error, out chan <O>), transforms to func(src chan <I>)
> (processor func() error, out chan <O>), and sinks to func(src chan <I>)
> (func() error). This was handy because i developed this lib under few
> hours, but I question this decision for a better implementation.
>
> 6 - Because this is all aync, all values are returned in random order. It
> should not be too hard to write a buffered accumulator that outputs values
> in orders, however i have not because it requires to box all input values
> so they have their input order attached to it. The added level of
> complexity of the resulting api was judged unsatisfying to be pursued for
> now.
>
> 7 - its reflective programming thus it is slower, unoptimized etc. you
> know the song.
>
> 8 - MapBroadcast does not exist because of a lack of interest.
>
> --
> You received this message because you are subscribed to the Google Groups
> "golang-nuts" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to golang-nuts+unsubscr...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/golang-nuts/cbcc7e32-a9fa-49e3-861f-04b00969ed49%40googlegroups.com
> <https://groups.google.com/d/msgid/golang-nuts/cbcc7e32-a9fa-49e3-861f-04b00969ed49%40googlegroups.com?utm_medium=email&utm_source=footer>
> .
>

-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to golang-nuts+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/golang-nuts/CAAV3P_DEJzERAo9HwYCo0-JBfQOkwSZhse6MVys%3D-jm5P7DX-A%40mail.gmail.com.

Reply via email to