You tell me.


https://github.com/clementauger/sta


sta - stream async 

Implements responsive data stream pipeline.

It is a reasearch and concept project not to be used in production.
<https://github.com/clementauger/sta#install>Install go get -u 
github.com/clementauger/sta <https://github.com/clementauger/sta#doc>Doc 
godoc.org/github.com/clementauger/sta 
<https://github.com/clementauger/sta#example>Example 

most simple.

sta.Map([]string{"a", "b", "c"}).
        Map(strings.ToUpper).
        Sink(fmt.Println)

This example demonstrate the implementation of a pipeline that bulks by 
slices of strings of length 4. It declares two parallel workers to change 
the strings cases. It outputs resulting values to stdout.

sta.Map([]string{"a","b"}).
  Map(sta.Accumulate(make([]string, 4), time.Second)).
  Map(sta.Each(strings.toUpper), sta.Each(strings.toUpper)).
  Sink(fmt.Println)

<https://github.com/clementauger/sta#rationale>Rationale 
<https://github.com/clementauger/sta#introduction>Introduction 

sta takes full advantage of the CSP channel based go capabilities to 
provide a simple implementation to compose, implement and refactor 
responsive data stream pipeline.

A data stream pipeline is said to be responsive when it is able to react 
with its downstream at any point in time in response to a variation of its 
input.

An implementation is said to be simple to compose, implement and refactor a 
data stream pipeline if its overall result expresses the solution with less 
lines of code, easier understanding, improved rewriting capabilities and 
testing experience.

Does this attempt reaches its goal ? yes and no...
<https://github.com/clementauger/sta#concepts>Concepts 

https://blog.golang.org/pipelines
<https://github.com/clementauger/sta#usage>Usage 

sta exposes a Map function, to create stream instances.

  s := sta.Map(src)

src is a value that can take a plurality of data kind.

  s := sta.Map([]string{"a","b"})
  s = sta.Map([]int{1,2})
  s = sta.Map(make(chan string))
  s = sta.Map(func(output chan string){ output<-"hello world!" })

sta.Map reads the given input in a separate routine and manages for it the 
required output communication channels.

The generated output channels are given to downstream transforms of the 
stream.

  s := sta.Map([]string{"a","b"}).
        Map(func(v string) int { return len(v)})

stream.Map transforms a given input to an output, in a separate routine. It 
generates the required communication channels and connects them with the 
upstream and downstream automatically.

To handle fine control of the data flow, stream.Map can handle functions 
that receives the upstream channel. Those functions must return a processor 
function that implements the loop over the upstream channel, and an output 
channel they are writing. The output channel is closed after that the 
processor function has terminated.

  s := sta.Map([]string{"a","b"}).
        Map(func(input chan string) (func()error, chan int) {
          output := make(chan int)
          processor := func()error {
            for v := range input {
              output<-len(v)
            }
          }
          return processor, output
        })

To execute the pipeline, the developer must call for the stream.Sink 
function. stream.Sink is realy just like stream.Map except that it closes 
the stream by executing it.

  err := sta.Map([]string{"a","b"}).
        Map(strings.ToUpper).
        Sink(sta.DevNull)

stream.Sink writes the destination in a separate routine.

The given destination value can be of kinds such as slice pointers, 
channels or functions.

  outSlice := []string{}
  sta.Map([]string{"a","b"}).Sink(&outSlice)

  outChan := make(chan string)
  sta.Map([]string{"a","b"}).Sink(outChan)

  outFn := func(v string){}
  sta.Map([]string{"a","b"}).Sink(outFn)

  outChanFn := func(v chan string) (func() error) { return func()error{return 
nil}}
  sta.Map([]string{"a","b"}).Sink(outChanFn)

<https://github.com/clementauger/sta#merge>Merge 

To merge a source, simply add more sources to the stream. Each source runs 
into their own routine.

It results in a simple merge operation of the output values.

Sources can have different kind, but they should converge to a compatible 
output type.

  sta.Map(
    []string{"a","b"},
    []string{"c","d"},
    func(output chan string) {
      output<-"e"
      output<-"f"
    },
    func(output chan string) {
      output<-"e"
      output<-"f"
    },
  )

<https://github.com/clementauger/sta#parallel>Parallel 

To implement parallel transforms, simply add more transform to the targeted 
step. Each added transform runs into its own routine.

The stream will implement automatic distribution of input data and inline 
output data to downstream.

sta.Map([]string{"a","b"}).
  Map(strings.ToUpper, strings.ToUpper, strings.ToUpper)

sta.Map([]string{"a","b"}).
  Map(sta.Workers(3, strings.ToUpper)...)

sta.Map([]string{"a","b"}).
  Map(sta.Workers(3, func() interface{} {
    // per routine scoped values goes here.
    return strings.ToUpper
  })...)

This applies to stream.Sink aswell.
<https://github.com/clementauger/sta#broadcast>Broadcast 

SinkBroadcast applies to sinks, it invokes each destination with every 
received value. Each destination runs into its own routine.

sta.Map([]string{"a","b"}).
  Map(strings.ToUpper).
  Sink(sta.SinkBroadcast(sta.DevNull, sta.DevNull, sta.DevNull))

<https://github.com/clementauger/sta#bulk-processing>Bulk processing 

To facilitate bulk processing sta.Accumulate and sta.Each provides 
automatic plumbing.

sta.Accumulate is a responsive data buffer. It reads upstream and 
accumulates every input into slices. When the slice exceeds a maximum 
length, it is copied downstream. The given duration will ensure that if the 
upstream starves, pending data is being sent to the downstream asap.

sta.Each is an helper returns function that takes slices in input and 
invoke given function for each value.

This example demonstrate the implementation of a pipeline that bulks by 
slices of strings of length 4. It declares two parallel workers to change 
the strings cases. It outputs resulting values to stdout.

sta.Map([]string{"a","b"}).
  Map(sta.Accumulate(make([]string, 4), time.Second)).
  Map(sta.Each(strings.toUpper), sta.Each(strings.toUpper)).
  Sink(fmt.Println)

To implement an alternative logic to handle the data flow see sta.Tick.
<https://github.com/clementauger/sta#author-notes>Author notes 

1- While this defintely provides faster and somehow easier programming 
capabilities of complex algorithm, the lacks of methods signature (it uses 
interface{} everywhere) make it easy to missuse.

2 - Because it deals with so many various inputs, unders some 
circumstances, it has to create additionnal communication channels and 
routines that would not be required with human taylor made writing.

3 - I like i don t have to care anymore about closing mechanism and slices 
precise management.

4 - if you want to improve that attempts, i suggest you to work backward 
from idiomatic patterns to their reflective version. And more generally, 
write a full paper at first.

5 - The implementation always tries to converge to a common usable form of 
input to deal with the variety of user-input. sources converges to func() 
(processor func() error, out chan <O>), transforms to func(src chan <I>) 
(processor func() error, out chan <O>), and sinks to func(src chan <I>) 
(func() error). This was handy because i developed this lib under few 
hours, but I question this decision for a better implementation.

6 - Because this is all aync, all values are returned in random order. It 
should not be too hard to write a buffered accumulator that outputs values 
in orders, however i have not because it requires to box all input values 
so they have their input order attached to it. The added level of 
complexity of the resulting api was judged unsatisfying to be pursued for 
now.

7 - its reflective programming thus it is slower, unoptimized etc. you know 
the song.

8 - MapBroadcast does not exist because of a lack of interest.

-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to golang-nuts+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/golang-nuts/cbcc7e32-a9fa-49e3-861f-04b00969ed49%40googlegroups.com.

Reply via email to