You tell me.
https://github.com/clementauger/sta sta - stream async Implements responsive data stream pipeline. It is a reasearch and concept project not to be used in production. <https://github.com/clementauger/sta#install>Install go get -u github.com/clementauger/sta <https://github.com/clementauger/sta#doc>Doc godoc.org/github.com/clementauger/sta <https://github.com/clementauger/sta#example>Example most simple. sta.Map([]string{"a", "b", "c"}). Map(strings.ToUpper). Sink(fmt.Println) This example demonstrate the implementation of a pipeline that bulks by slices of strings of length 4. It declares two parallel workers to change the strings cases. It outputs resulting values to stdout. sta.Map([]string{"a","b"}). Map(sta.Accumulate(make([]string, 4), time.Second)). Map(sta.Each(strings.toUpper), sta.Each(strings.toUpper)). Sink(fmt.Println) <https://github.com/clementauger/sta#rationale>Rationale <https://github.com/clementauger/sta#introduction>Introduction sta takes full advantage of the CSP channel based go capabilities to provide a simple implementation to compose, implement and refactor responsive data stream pipeline. A data stream pipeline is said to be responsive when it is able to react with its downstream at any point in time in response to a variation of its input. An implementation is said to be simple to compose, implement and refactor a data stream pipeline if its overall result expresses the solution with less lines of code, easier understanding, improved rewriting capabilities and testing experience. Does this attempt reaches its goal ? yes and no... <https://github.com/clementauger/sta#concepts>Concepts https://blog.golang.org/pipelines <https://github.com/clementauger/sta#usage>Usage sta exposes a Map function, to create stream instances. s := sta.Map(src) src is a value that can take a plurality of data kind. s := sta.Map([]string{"a","b"}) s = sta.Map([]int{1,2}) s = sta.Map(make(chan string)) s = sta.Map(func(output chan string){ output<-"hello world!" }) sta.Map reads the given input in a separate routine and manages for it the required output communication channels. The generated output channels are given to downstream transforms of the stream. s := sta.Map([]string{"a","b"}). Map(func(v string) int { return len(v)}) stream.Map transforms a given input to an output, in a separate routine. It generates the required communication channels and connects them with the upstream and downstream automatically. To handle fine control of the data flow, stream.Map can handle functions that receives the upstream channel. Those functions must return a processor function that implements the loop over the upstream channel, and an output channel they are writing. The output channel is closed after that the processor function has terminated. s := sta.Map([]string{"a","b"}). Map(func(input chan string) (func()error, chan int) { output := make(chan int) processor := func()error { for v := range input { output<-len(v) } } return processor, output }) To execute the pipeline, the developer must call for the stream.Sink function. stream.Sink is realy just like stream.Map except that it closes the stream by executing it. err := sta.Map([]string{"a","b"}). Map(strings.ToUpper). Sink(sta.DevNull) stream.Sink writes the destination in a separate routine. The given destination value can be of kinds such as slice pointers, channels or functions. outSlice := []string{} sta.Map([]string{"a","b"}).Sink(&outSlice) outChan := make(chan string) sta.Map([]string{"a","b"}).Sink(outChan) outFn := func(v string){} sta.Map([]string{"a","b"}).Sink(outFn) outChanFn := func(v chan string) (func() error) { return func()error{return nil}} sta.Map([]string{"a","b"}).Sink(outChanFn) <https://github.com/clementauger/sta#merge>Merge To merge a source, simply add more sources to the stream. Each source runs into their own routine. It results in a simple merge operation of the output values. Sources can have different kind, but they should converge to a compatible output type. sta.Map( []string{"a","b"}, []string{"c","d"}, func(output chan string) { output<-"e" output<-"f" }, func(output chan string) { output<-"e" output<-"f" }, ) <https://github.com/clementauger/sta#parallel>Parallel To implement parallel transforms, simply add more transform to the targeted step. Each added transform runs into its own routine. The stream will implement automatic distribution of input data and inline output data to downstream. sta.Map([]string{"a","b"}). Map(strings.ToUpper, strings.ToUpper, strings.ToUpper) sta.Map([]string{"a","b"}). Map(sta.Workers(3, strings.ToUpper)...) sta.Map([]string{"a","b"}). Map(sta.Workers(3, func() interface{} { // per routine scoped values goes here. return strings.ToUpper })...) This applies to stream.Sink aswell. <https://github.com/clementauger/sta#broadcast>Broadcast SinkBroadcast applies to sinks, it invokes each destination with every received value. Each destination runs into its own routine. sta.Map([]string{"a","b"}). Map(strings.ToUpper). Sink(sta.SinkBroadcast(sta.DevNull, sta.DevNull, sta.DevNull)) <https://github.com/clementauger/sta#bulk-processing>Bulk processing To facilitate bulk processing sta.Accumulate and sta.Each provides automatic plumbing. sta.Accumulate is a responsive data buffer. It reads upstream and accumulates every input into slices. When the slice exceeds a maximum length, it is copied downstream. The given duration will ensure that if the upstream starves, pending data is being sent to the downstream asap. sta.Each is an helper returns function that takes slices in input and invoke given function for each value. This example demonstrate the implementation of a pipeline that bulks by slices of strings of length 4. It declares two parallel workers to change the strings cases. It outputs resulting values to stdout. sta.Map([]string{"a","b"}). Map(sta.Accumulate(make([]string, 4), time.Second)). Map(sta.Each(strings.toUpper), sta.Each(strings.toUpper)). Sink(fmt.Println) To implement an alternative logic to handle the data flow see sta.Tick. <https://github.com/clementauger/sta#author-notes>Author notes 1- While this defintely provides faster and somehow easier programming capabilities of complex algorithm, the lacks of methods signature (it uses interface{} everywhere) make it easy to missuse. 2 - Because it deals with so many various inputs, unders some circumstances, it has to create additionnal communication channels and routines that would not be required with human taylor made writing. 3 - I like i don t have to care anymore about closing mechanism and slices precise management. 4 - if you want to improve that attempts, i suggest you to work backward from idiomatic patterns to their reflective version. And more generally, write a full paper at first. 5 - The implementation always tries to converge to a common usable form of input to deal with the variety of user-input. sources converges to func() (processor func() error, out chan <O>), transforms to func(src chan <I>) (processor func() error, out chan <O>), and sinks to func(src chan <I>) (func() error). This was handy because i developed this lib under few hours, but I question this decision for a better implementation. 6 - Because this is all aync, all values are returned in random order. It should not be too hard to write a buffered accumulator that outputs values in orders, however i have not because it requires to box all input values so they have their input order attached to it. The added level of complexity of the resulting api was judged unsatisfying to be pursued for now. 7 - its reflective programming thus it is slower, unoptimized etc. you know the song. 8 - MapBroadcast does not exist because of a lack of interest. -- You received this message because you are subscribed to the Google Groups "golang-nuts" group. To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/cbcc7e32-a9fa-49e3-861f-04b00969ed49%40googlegroups.com.