GitXplorerGitXplorer
u

gopipe

public
4 stars
0 forks
0 issues

Commits

List of commits on branch master.
Unverified
fe5133e9de893bbed1b50f3e74c5560e8162d3ea

Convert to go mod

uurjitbhatia committed 5 years ago
Verified
797b46e8978e77319cc9eeb586885dbb7a0c1099

Merge pull request #3 from urjitbhatia/v3/FuncAsPipe

uurjitbhatia committed 5 years ago
Unverified
dfc25f047b23ebe0faa842e90bfaba1d6811120d

update travis config

uurjitbhatia committed 6 years ago
Unverified
de09ce2dd7be46bb41df27624075b3f7f71634ce

make pipeline tail buffered as well

uurjitbhatia committed 6 years ago
Unverified
38c4cc8bc5208f8b8a6f8f4843b3866195a26d3d

Simplify Pipe interface - now pipe just takes a fn

uurjitbhatia committed 7 years ago
Verified
6eb536c8b559c559d1bde58ae8b69b61d157dd48

Merge pull request #2 from urjitbhatia/add-license-1

uurjitbhatia committed 7 years ago

README

The README file for this repository.

gopipe

A stream-filter like pipeline primitive for go

Build Status GoDoc

Gopipe exposes a simple interface that your "Pipe" must implement:

/*
A single pipe component that processes items. Pipes can be composed to form a pipeline
*/
type Pipe interface {
	Process(in chan interface{}, out chan interface{})
}

Any such pipe can then be combined into a pipeline like:

// Make a pipeline
pipeline := gopipe.NewPipeline(
  jsonUnmarshalPipe,
  redisWriterPipe,
  logWriterPipe
)

// Or Make a Buffered Pipeline
// This allows up to bufSize elements to queue at *Each Pipe stage
bufSize := 10
// Buffersize 10 throughout the pipe
bufP := gopipe.NewBufferedPipeline(bufSize, redisWriterPipe, logWriterPipe)

// Attach some source
jobs := make(chan interface{})
pipeline.AttachSource(jobs)

// Attach Sink
processedJobs := make(chan interface{})
pipeline.AttachSink(processedJobs)

// Or Enqueue from somewhere (Block if the pipeline has no capacity)
pipeline.Enqueue("foo")

// And Dequeue (Blocks if nothing is flowing)
bar := pipeline.Dequeue()

// Dequeue with timeout
baz := pipeline.DequeueTimeout(10 * time.Millisecond)

Complex pipelining

You can also create a "routing" junction and attach other Pipelines downstream.

// Create a RoutingFunc func(interface{}) interface{}
routingFn := RoutingFunc(func(val interface{}) interface{} {
  if val > 10 && val < 100 {
    return "smallishNumber"
  } else if val >= 100 {
    return "biggishNumber"
  }
  return "eh!", errors.New("dwarfnumber")
})

// Create a junction
j := NewJunction(routingFn)
j.AddPipeline("smallishNumber", NewPipeline(smallNumPipe)).AddPipeline("biggishNumber", NewPipeline(bigNumPipe)

// Now attach the junction - as soon as this is attached, data will start flowing
pipeline.AddJunction(j)

Example Pipe:

This is a pipe that takes in integers and doubles them. If the input is invalid, it effectively "filters" it from going down the pipeline. In a more complex scenario, you can update the incoming structs with error flags etc and might still want to propagate it dowstream.

To filter, simply don't put it on the out chan.

type doublingPipe struct{}

func (dp doublingPipe) Process(in chan interface{}, out chan interface{}) {
	for {
		select {
		case item, more := <-in:
			if !more {
				log.Println("Pipe-in closed")
				close(out)
				return
			}
			if intval, ok := item.(int); ok {
				out <- intval * 2
			} else {
			  log.Println("not ok - filtering...")
			}
		}
	}
}

More Examples: