Hi Clement, it's a nice investigation of the benefits and limits of using reflect.
I guess it would also be as interesting to investigate what this library would look like if the current draft for generics were to be accepted :) -s On Sun, Aug 4, 2019 at 4:13 PM clement auger <clementauger...@gmail.com> wrote: > You tell me. > > > https://github.com/clementauger/sta > > > sta - stream async > > Implements responsive data stream pipeline. > > It is a reasearch and concept project not to be used in production. > <https://github.com/clementauger/sta#install>Install go get -u > github.com/clementauger/sta <https://github.com/clementauger/sta#doc>Doc > godoc.org/github.com/clementauger/sta > <https://github.com/clementauger/sta#example>Example > > most simple. > > sta.Map([]string{"a", "b", "c"}). > Map(strings.ToUpper). > Sink(fmt.Println) > > This example demonstrate the implementation of a pipeline that bulks by > slices of strings of length 4. It declares two parallel workers to change > the strings cases. It outputs resulting values to stdout. > > sta.Map([]string{"a","b"}). > Map(sta.Accumulate(make([]string, 4), time.Second)). > Map(sta.Each(strings.toUpper), sta.Each(strings.toUpper)). > Sink(fmt.Println) > > <https://github.com/clementauger/sta#rationale>Rationale > <https://github.com/clementauger/sta#introduction>Introduction > > sta takes full advantage of the CSP channel based go capabilities to > provide a simple implementation to compose, implement and refactor > responsive data stream pipeline. > > A data stream pipeline is said to be responsive when it is able to react > with its downstream at any point in time in response to a variation of its > input. > > An implementation is said to be simple to compose, implement and refactor > a data stream pipeline if its overall result expresses the solution with > less lines of code, easier understanding, improved rewriting capabilities > and testing experience. > > Does this attempt reaches its goal ? yes and no... > <https://github.com/clementauger/sta#concepts>Concepts > > https://blog.golang.org/pipelines > <https://github.com/clementauger/sta#usage>Usage > > sta exposes a Map function, to create stream instances. > > s := sta.Map(src) > > src is a value that can take a plurality of data kind. > > s := sta.Map([]string{"a","b"}) > s = sta.Map([]int{1,2}) > s = sta.Map(make(chan string)) > s = sta.Map(func(output chan string){ output<-"hello world!" }) > > sta.Map reads the given input in a separate routine and manages for it > the required output communication channels. > > The generated output channels are given to downstream transforms of the > stream. > > s := sta.Map([]string{"a","b"}). > Map(func(v string) int { return len(v)}) > > stream.Map transforms a given input to an output, in a separate routine. > It generates the required communication channels and connects them with the > upstream and downstream automatically. > > To handle fine control of the data flow, stream.Map can handle functions > that receives the upstream channel. Those functions must return a processor > function that implements the loop over the upstream channel, and an output > channel they are writing. The output channel is closed after that the > processor function has terminated. > > s := sta.Map([]string{"a","b"}). > Map(func(input chan string) (func()error, chan int) { > output := make(chan int) > processor := func()error { > for v := range input { > output<-len(v) > } > } > return processor, output > }) > > To execute the pipeline, the developer must call for the stream.Sink > function. stream.Sink is realy just like stream.Map except that it closes > the stream by executing it. > > err := sta.Map([]string{"a","b"}). > Map(strings.ToUpper). > Sink(sta.DevNull) > > stream.Sink writes the destination in a separate routine. > > The given destination value can be of kinds such as slice pointers, > channels or functions. > > outSlice := []string{} > sta.Map([]string{"a","b"}).Sink(&outSlice) > > outChan := make(chan string) > sta.Map([]string{"a","b"}).Sink(outChan) > > outFn := func(v string){} > sta.Map([]string{"a","b"}).Sink(outFn) > > outChanFn := func(v chan string) (func() error) { return func()error{return > nil}} > sta.Map([]string{"a","b"}).Sink(outChanFn) > > <https://github.com/clementauger/sta#merge>Merge > > To merge a source, simply add more sources to the stream. Each source runs > into their own routine. > > It results in a simple merge operation of the output values. > > Sources can have different kind, but they should converge to a compatible > output type. > > sta.Map( > []string{"a","b"}, > []string{"c","d"}, > func(output chan string) { > output<-"e" > output<-"f" > }, > func(output chan string) { > output<-"e" > output<-"f" > }, > ) > > <https://github.com/clementauger/sta#parallel>Parallel > > To implement parallel transforms, simply add more transform to the > targeted step. Each added transform runs into its own routine. > > The stream will implement automatic distribution of input data and inline > output data to downstream. > > sta.Map([]string{"a","b"}). > Map(strings.ToUpper, strings.ToUpper, strings.ToUpper) > > sta.Map([]string{"a","b"}). > Map(sta.Workers(3, strings.ToUpper)...) > > sta.Map([]string{"a","b"}). > Map(sta.Workers(3, func() interface{} { > // per routine scoped values goes here. > return strings.ToUpper > })...) > > This applies to stream.Sink aswell. > <https://github.com/clementauger/sta#broadcast>Broadcast > > SinkBroadcast applies to sinks, it invokes each destination with every > received value. Each destination runs into its own routine. > > sta.Map([]string{"a","b"}). > Map(strings.ToUpper). > Sink(sta.SinkBroadcast(sta.DevNull, sta.DevNull, sta.DevNull)) > > <https://github.com/clementauger/sta#bulk-processing>Bulk processing > > To facilitate bulk processing sta.Accumulate and sta.Each provides > automatic plumbing. > > sta.Accumulate is a responsive data buffer. It reads upstream and > accumulates every input into slices. When the slice exceeds a maximum > length, it is copied downstream. The given duration will ensure that if the > upstream starves, pending data is being sent to the downstream asap. > > sta.Each is an helper returns function that takes slices in input and > invoke given function for each value. > > This example demonstrate the implementation of a pipeline that bulks by > slices of strings of length 4. It declares two parallel workers to change > the strings cases. It outputs resulting values to stdout. > > sta.Map([]string{"a","b"}). > Map(sta.Accumulate(make([]string, 4), time.Second)). > Map(sta.Each(strings.toUpper), sta.Each(strings.toUpper)). > Sink(fmt.Println) > > To implement an alternative logic to handle the data flow see sta.Tick. > <https://github.com/clementauger/sta#author-notes>Author notes > > 1- While this defintely provides faster and somehow easier programming > capabilities of complex algorithm, the lacks of methods signature (it uses > interface{} everywhere) make it easy to missuse. > > 2 - Because it deals with so many various inputs, unders some > circumstances, it has to create additionnal communication channels and > routines that would not be required with human taylor made writing. > > 3 - I like i don t have to care anymore about closing mechanism and slices > precise management. > > 4 - if you want to improve that attempts, i suggest you to work backward > from idiomatic patterns to their reflective version. And more generally, > write a full paper at first. > > 5 - The implementation always tries to converge to a common usable form of > input to deal with the variety of user-input. sources converges to func() > (processor func() error, out chan <O>), transforms to func(src chan <I>) > (processor func() error, out chan <O>), and sinks to func(src chan <I>) > (func() error). This was handy because i developed this lib under few > hours, but I question this decision for a better implementation. > > 6 - Because this is all aync, all values are returned in random order. It > should not be too hard to write a buffered accumulator that outputs values > in orders, however i have not because it requires to box all input values > so they have their input order attached to it. The added level of > complexity of the resulting api was judged unsatisfying to be pursued for > now. > > 7 - its reflective programming thus it is slower, unoptimized etc. you > know the song. > > 8 - MapBroadcast does not exist because of a lack of interest. > > -- > You received this message because you are subscribed to the Google Groups > "golang-nuts" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to golang-nuts+unsubscr...@googlegroups.com. > To view this discussion on the web visit > https://groups.google.com/d/msgid/golang-nuts/cbcc7e32-a9fa-49e3-861f-04b00969ed49%40googlegroups.com > <https://groups.google.com/d/msgid/golang-nuts/cbcc7e32-a9fa-49e3-861f-04b00969ed49%40googlegroups.com?utm_medium=email&utm_source=footer> > . > -- You received this message because you are subscribed to the Google Groups "golang-nuts" group. To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/CAAV3P_DEJzERAo9HwYCo0-JBfQOkwSZhse6MVys%3D-jm5P7DX-A%40mail.gmail.com.