diff --git a/buffer.go b/buffer.go index bfe5f9d..31e1d37 100644 --- a/buffer.go +++ b/buffer.go @@ -8,9 +8,9 @@ import ( type buffer struct { // single writer - in chan string + in chan []byte // multiple readers - out []chan string + out []chan []byte progress []chan int64 } @@ -59,7 +59,7 @@ func (bfs *buffers) remove(key string) { delete(bfs.bufferMap, key) } -func (bfs *buffers) appendOutBuffer(key string, o chan string) error { +func (bfs *buffers) appendOutBuffer(key string, o chan []byte) error { bfs.Lock() defer bfs.Unlock() val, ok := bfs.bufferMap[key] @@ -97,12 +97,12 @@ func (bfs *buffers) all() map[string]*buffer { var buffersMap *buffers -func send(pipelineKey string, line string) { +func send(pipelineKey string, data []byte) { // line = fmt.Sprintf(time.Now().Format("2006-01-02 15:04:05")) + " " + line buf, ok := buffersMap.get(pipelineKey) if !ok { return } - buf.in <- line + buf.in <- data } diff --git a/examples/advanced/main.go b/examples/advanced/main.go index eff6e5c..0b04391 100644 --- a/examples/advanced/main.go +++ b/examples/advanced/main.go @@ -31,9 +31,9 @@ func newDownloadStep(fileName string, bytes int64, fail bool) *downloadStep { func (d *downloadStep) Exec(request *pipeline.Request) *pipeline.Result { - d.Status(fmt.Sprintf("%+v", request)) + d.Status([]byte(fmt.Sprintf("%+v", request))) - d.Status(fmt.Sprintf("Started downloading file %s", d.fileName)) + d.Status([]byte(fmt.Sprintf("Started downloading file %s", d.fileName))) client := &http.Client{} @@ -60,19 +60,17 @@ func (d *downloadStep) Exec(request *pipeline.Request) *pipeline.Result { return &pipeline.Result{Error: fmt.Errorf("File download failed %s", d.fileName)} } - d.Status(fmt.Sprintf("Successfully downloaded file %s", d.fileName)) + d.Status([]byte(fmt.Sprintf("Successfully downloaded file %s", d.fileName))) return &pipeline.Result{ - Error: nil, - Data: struct{ bytesDownloaded int64 }{bytesDownloaded: n}, - KeyVal: map[string]interface{}{"bytesDownloaded": n}, + Error: nil, + Data: struct{ bytesDownloaded int64 }{bytesDownloaded: n}, } } -func (d *downloadStep) Cancel() error { - d.Status(fmt.Sprintf("Cancel downloading file %s", d.fileName)) +func (d *downloadStep) Cancel() { + d.Status([]byte(fmt.Sprintf("Cancel downloading file %s", d.fileName))) d.cancel() - return nil } func readPipeline(pipe *pipeline.Pipeline) { @@ -98,13 +96,15 @@ func readPipeline(pipe *pipeline.Pipeline) { func main() { - workflow := pipeline.NewProgress("getfiles", 1000, time.Second*7) + workflow := pipeline.New("getfiles", + pipeline.OutBufferSize(1000), + pipeline.ExpectedDuration(7*time.Second)) //stages - stage := pipeline.NewStage("stage", false, false) + stage := pipeline.NewStage("stage", pipeline.Concurrent(true)) // in this stage, steps will be executed concurrently - concurrentStage := pipeline.NewStage("con_stage", true, false) + concurrentStage := pipeline.NewStage("con_stage", pipeline.Concurrent(true)) // another concurrent stage - concurrentErrStage := pipeline.NewStage("con_err_stage", true, false) + concurrentErrStage := pipeline.NewStage("con_err_stage", pipeline.Concurrent(true)) //steps fileStep1mb := newDownloadStep("1mbfile", 1e6, false) @@ -129,7 +129,7 @@ func main() { go readPipeline(workflow) // execute pipeline - result := workflow.Run() + result := workflow.Run(nil) if result.Error != nil { fmt.Println(result.Error) } diff --git a/examples/functional/main.go b/examples/functional/main.go new file mode 100644 index 0000000..6efc8c9 --- /dev/null +++ b/examples/functional/main.go @@ -0,0 +1,78 @@ +package main + +import ( + "context" + "fmt" + + "github.com/myntra/pipeline" +) + +// TransformStep ... +func TransformStep(context context.Context, request *pipeline.Request) *pipeline.Result { + request.Status([]byte("Starting transformstep")) + fmt.Println(request.Data) + // handle cancel step: https://blog.golang.org/context + //<-context.Done() is unblocked when step is cancelled on error returned by sibling concurrent step + return nil +} + +func readPipeline(pipe *pipeline.Pipeline) { + out, err := pipe.Out() + if err != nil { + return + } + + progress, err := pipe.GetProgressPercent() + if err != nil { + return + } + + for { + select { + case line := <-out: + fmt.Println(line) + case p := <-progress: + fmt.Println("percent done: ", p) + } + } +} + +func main() { + + logParserPipe := pipeline.New("") + stageOne := pipeline.NewStage("") + + transfromStep := pipeline.NewStep(TransformStep) + + stageOne.AddStep(transfromStep) + logParserPipe.AddStage(stageOne) + + go readPipeline(logParserPipe) + + data := map[string]string{ + "a": "b", + } + + result := logParserPipe.Run(data) + if result.Error != nil { + fmt.Println(result.Error) + } + + fmt.Println("timeTaken:", logParserPipe.GetDuration()) + + clonedlogParserPipe := logParserPipe.Clone("") + + go readPipeline(clonedlogParserPipe) + + data2 := map[string]string{ + "c": "d", + } + + result = clonedlogParserPipe.Run(data2) + if result.Error != nil { + fmt.Println(result.Error) + } + + fmt.Println("timeTaken:", clonedlogParserPipe.GetDuration()) + +} diff --git a/examples/simple/main.go b/examples/simple/main.go index ed9edcb..583ac1d 100644 --- a/examples/simple/main.go +++ b/examples/simple/main.go @@ -14,20 +14,18 @@ type work struct { func (w *work) Exec(request *pipeline.Request) *pipeline.Result { - w.Status(fmt.Sprintf("%+v", request)) + w.Status([]byte(fmt.Sprintf("%+v", request))) duration := time.Duration(1000 * w.id) time.Sleep(time.Millisecond * duration) msg := fmt.Sprintf("work %d", w.id) return &pipeline.Result{ - Error: nil, - Data: map[string]string{"msg": msg}, - KeyVal: map[string]interface{}{"msg": msg}, + Error: nil, + Data: map[string]string{"msg": msg}, } } -func (w *work) Cancel() error { - w.Status("cancel step") - return nil +func (w *work) Cancel() { + w.Status([]byte("cancel step")) } func readPipeline(pipe *pipeline.Pipeline) { @@ -53,8 +51,10 @@ func readPipeline(pipe *pipeline.Pipeline) { func main() { - workpipe := pipeline.NewProgress("myProgressworkpipe", 1000, time.Second*3) - stage := pipeline.NewStage("mypworkstage", false, false) + workpipe := pipeline.New("myProgressworkpipe", + pipeline.OutBufferSize(1000), + pipeline.ExpectedDuration(1e3*time.Millisecond)) + stage := pipeline.NewStage("mypworkstage", pipeline.Concurrent(true)) step1 := &work{id: 1} step2 := &work{id: 2} @@ -65,7 +65,7 @@ func main() { go readPipeline(workpipe) - result := workpipe.Run() + result := workpipe.Run(nil) if result.Error != nil { fmt.Println(result.Error) } diff --git a/options.go b/options.go new file mode 100644 index 0000000..8980dc1 --- /dev/null +++ b/options.go @@ -0,0 +1,73 @@ +package pipeline + +import "time" + +// DefaultOutDrainTimeout time to wait for all readers to finish consuming output +const DefaultOutDrainTimeout = time.Second * 5 + +// DefaultOutBufferSize size of the pipeline.Out channel +const DefaultOutBufferSize = 1000 + +type pipelineConfig struct { + outBufferSize int + outDrainTimeout time.Duration + expectedDuration time.Duration +} + +// Option represents an option for the pipeline. It must be used as an arg +// to New(...) or Clone(...) +type Option func(*pipelineConfig) + +// OutBufferSize is size of the pipeline.Out channel +func OutBufferSize(size int) Option { + return Option(func(c *pipelineConfig) { + c.outBufferSize = size + }) +} + +// OutDrainTimeout is the time to wait for all readers to finish consuming output +func OutDrainTimeout(timeout time.Duration) Option { + return Option(func(c *pipelineConfig) { + c.outDrainTimeout = timeout + }) +} + +// ExpectedDuration is the expected time for the pipeline to finish +func ExpectedDuration(timeout time.Duration) Option { + return Option(func(c *pipelineConfig) { + c.expectedDuration = timeout + }) +} + +type stageConfig struct { + concurrent bool + disableStrictMode bool + mergeFunc func([]*Result) *Result +} + +// StageOption represents an option for a stage. It must be used as an arg +// to NewStage(...) +type StageOption func(*stageConfig) + +// Concurrent enables concurrent execution of steps +func Concurrent(enable bool) StageOption { + return StageOption(func(c *stageConfig) { + c.concurrent = enable + }) +} + +// DisableStrictMode disables cancellation of concurrently executing steps if +// there is an error +func DisableStrictMode(disable bool) StageOption { + return StageOption(func(c *stageConfig) { + c.disableStrictMode = disable + }) +} + +// MergeFunc is used to merge results from concurrent steps and is only called +// when concurrent steps are enabled. +func MergeFunc(fn func([]*Result) *Result) StageOption { + return StageOption(func(c *stageConfig) { + c.mergeFunc = fn + }) +} diff --git a/pipeline.go b/pipeline.go index 7ca9be6..aca2c18 100644 --- a/pipeline.go +++ b/pipeline.go @@ -8,20 +8,15 @@ import ( "time" "unicode" - "github.com/fatih/color" + uuid "github.com/satori/go.uuid" ) -// DefaultDrainTimeout time to wait for all readers to finish consuming output -const DefaultDrainTimeout = time.Second * 5 - -// DefaultBuffer channel buffer size of the output buffer -const DefaultBuffer = 1000 - // Pipeline is a sequence of stages type Pipeline struct { Name string `json:"name"` Stages []*Stage `json:"stages"` - DrainTimeout time.Duration + uuid string + config *pipelineConfig expectedDuration time.Duration duration time.Duration outsubscribed bool @@ -34,51 +29,77 @@ type Pipeline struct { // New returns a new pipeline // name of the pipeline // outBufferLen is the size of the output buffered channel -func New(name string, outBufferLen int) *Pipeline { +func New(name string, opts ...Option) *Pipeline { - return newPipeline(name, outBufferLen) -} + config := &pipelineConfig{ + outBufferSize: DefaultOutBufferSize, + outDrainTimeout: DefaultOutDrainTimeout, + } -// NewProgress returns a new pipeline which returns progress updates -// name of the pipeline -// outBufferLen is the size of the output buffered channel -// -// expectedDurationInMs is the expected time for the job to finish in milliseconds -// If set, you can get the current time spent from GetDuration()int64 and -// listen on the channel returned by GetProgress() <-chan float64 to get current progress -func NewProgress(name string, outBufferLen int, expectedDuration time.Duration) *Pipeline { - - p := newPipeline(name, outBufferLen) - p.expectedDuration = expectedDuration - p.tick = time.Millisecond * 250 - return p -} + uuid := uuid.NewV1().String() + + if name == "" { + name = uuid + } else { + name = spaceMap(name) + } -func newPipeline(name string, outBufferLen int) *Pipeline { - if outBufferLen < 0 { - outBufferLen = 1 + for _, o := range opts { + o(config) } if buffersMap == nil { buffersMap = &buffers{bufferMap: make(map[string]*buffer)} } - p := &Pipeline{Name: spaceMap(name)} - p.outbufferlen = outBufferLen - - if p.DrainTimeout == 0 { - p.DrainTimeout = DefaultDrainTimeout + p := &Pipeline{ + Name: name, + config: config, + uuid: uuid, } - buf := buffer{in: make(chan string, outBufferLen), out: []chan string{}, progress: []chan int64{}} - buffersMap.set(p.Name, &buf) + buf := buffer{in: make(chan []byte, p.config.outBufferSize), out: []chan []byte{}, progress: []chan int64{}} + buffersMap.set(p.uuid, &buf) return p + } -// SetDrainTimeout sets DrainTimeout -func (p *Pipeline) SetDrainTimeout(timeout time.Duration) { - p.DrainTimeout = timeout +// Clone pipeline +func (p *Pipeline) Clone(name string, opts ...Option) *Pipeline { + + config := &pipelineConfig{ + outBufferSize: p.config.outBufferSize, + outDrainTimeout: p.config.outDrainTimeout, + } + + uuid := uuid.NewV1().String() + + if name == "" { + name = uuid + } else { + name = spaceMap(name) + } + + for _, o := range opts { + o(config) + } + + if buffersMap == nil { + buffersMap = &buffers{bufferMap: make(map[string]*buffer)} + } + + cp := &Pipeline{ + Name: name, + config: config, + Stages: p.Stages, + uuid: uuid, + } + + buf := buffer{in: make(chan []byte, p.config.outBufferSize), out: []chan []byte{}, progress: []chan int64{}} + buffersMap.set(p.uuid, &buf) + + return cp } // AddStage adds a new stage to the pipeline @@ -87,21 +108,21 @@ func (p *Pipeline) AddStage(stage ...*Stage) { for j := range stage[i].Steps { ctx := &stepContextVal{ name: p.Name + "." + stage[i].Name + "." + reflect.TypeOf(stage[i].Steps[j]).String(), - pipelineKey: p.Name, - concurrent: stage[i].Concurrent, + pipelineKey: p.uuid, + concurrent: stage[i].config.concurrent, index: j, } stage[i].Steps[j].setCtx(ctx) } - stage[i].pipelineKey = p.Name + stage[i].pipelineKey = p.uuid } p.Stages = append(p.Stages, stage...) } // Run the pipeline. The stages are executed in sequence while steps may be concurrent or sequential. -func (p *Pipeline) Run() *Result { +func (p *Pipeline) Run(data interface{}) *Result { if len(p.Stages) == 0 { return &Result{Error: fmt.Errorf("No stages to be executed")} @@ -113,7 +134,7 @@ func (p *Pipeline) Run() *Result { ticker = time.NewTicker(p.tick) ctx, cancelProgress := context.WithCancel(context.Background()) p.cancelProgress = cancelProgress - go p.updateProgress(ticker, ctx) + go p.updateProgress(ctx, ticker) } buf, ok := buffersMap.get(p.Name) @@ -130,29 +151,26 @@ func (p *Pipeline) Run() *Result { if p.expectedDuration != 0 && p.tick != 0 { defer ticker.Stop() } - defer p.status("end") - p.status("begin") - request := &Request{} + request := &Request{Data: data} result := &Result{} for i, stage := range p.Stages { stage.index = i result = stage.run(request) if result.Error != nil { - p.status("stage: " + stage.Name + " failed !!! ") + return result } request.Data = result.Data - request.KeyVal = result.KeyVal } return result } // Out collects the status output from the stages and steps -func (p *Pipeline) Out() (<-chan string, error) { +func (p *Pipeline) Out() (<-chan []byte, error) { // add a new listener - out := make(chan string, p.outbufferlen) + out := make(chan []byte, p.outbufferlen) err := buffersMap.appendOutBuffer(p.Name, out) if err != nil { return nil, err @@ -176,7 +194,7 @@ func (p *Pipeline) GetProgressPercent() (<-chan int64, error) { } // started as a goroutine -func (p *Pipeline) updateProgress(ticker *time.Ticker, ctx context.Context) { +func (p *Pipeline) updateProgress(ctx context.Context, ticker *time.Ticker) { start := time.Now() for range ticker.C { p.duration = time.Since(start) @@ -239,7 +257,7 @@ loop: if empty { break loop } - case <-time.After(p.DrainTimeout): + case <-time.After(p.config.outDrainTimeout): break loop } } @@ -251,10 +269,8 @@ loop: } // status writes a line to the out channel -func (p *Pipeline) status(line string) { - red := color.New(color.FgRed).SprintFunc() - line = red("[pipeline]") + "[" + p.Name + "]: " + line - send(p.Name, line) +func (p *Pipeline) status(data []byte) { + send(p.Name, data) } func spaceMap(str string) string { diff --git a/pipeline_test.go b/pipeline_test.go index 7d169fd..f0eda64 100644 --- a/pipeline_test.go +++ b/pipeline_test.go @@ -32,13 +32,13 @@ func TestPipeline(t *testing.T) { testpipe, errorPipe := createPipeline(test) go readPipeline(testpipe) if errorPipe { - result := testpipe.Run() + result := testpipe.Run(nil) if result.Error == nil { log.Fatalf("Test failed: %v", test.pipelineName) } } else { - result := testpipe.Run() + result := testpipe.Run(nil) if result.Error != nil { log.Fatalf("Test failed: %v", test.pipelineName) } @@ -53,13 +53,13 @@ func BenchmarkPipeline(b *testing.B) { i := rand.Intn(len(tests)) testpipe, errorPipe := createPipeline(tests[i]) if errorPipe { - err := testpipe.Run() + err := testpipe.Run(nil) if err == nil { log.Fatalf("Test failed: %v", testpipe.Name) } } else { - err := testpipe.Run() + err := testpipe.Run(nil) if err != nil { log.Fatalf("Test failed: %v", testpipe.Name) } @@ -75,16 +75,15 @@ type TestStep struct { } func (t TestStep) Exec(request *Request) *Result { - t.Status("start step") - t.Status("executing test ") + t.Status([]byte("start step")) + t.Status([]byte("executing test ")) time.Sleep(time.Millisecond * 200) - t.Status("end step") + t.Status([]byte("end step")) return nil } -func (t TestStep) Cancel() error { - t.Status("cancel step") - return nil +func (t TestStep) Cancel() { + t.Status([]byte("cancel step")) } type TestStep2 struct { @@ -93,16 +92,15 @@ type TestStep2 struct { } func (t TestStep2) Exec(request *Request) *Result { - t.Status("start step") - t.Status("executing test 2") + t.Status([]byte("start step")) + t.Status([]byte("executing test 2")) time.Sleep(time.Millisecond * 200) - t.Status("end step") + t.Status([]byte("end step")) return nil } -func (t TestStep2) Cancel() error { - t.Status("cancel step") - return nil +func (t TestStep2) Cancel() { + t.Status([]byte("cancel step")) } type TestStepErr struct { @@ -112,16 +110,15 @@ type TestStepErr struct { func (t TestStepErr) Exec(request *Request) *Result { - t.Status("start step") - t.Status("executing test err") + t.Status([]byte("start step")) + t.Status([]byte("executing test err")) time.Sleep(time.Millisecond * 500) - t.Status("end step") + t.Status([]byte("end step")) return &Result{Error: errors.New("test error 1")} } -func (t TestStepErr) Cancel() error { - t.Status("cancel step") - return nil +func (t TestStepErr) Cancel() { + t.Status([]byte("cancel step")) } type TestStepErr2 struct { @@ -130,16 +127,15 @@ type TestStepErr2 struct { } func (t TestStepErr2) Exec(request *Request) *Result { - t.Status("start step") - t.Status("executing test err 2") + t.Status([]byte("start step")) + t.Status([]byte("executing test err 2")) time.Sleep(time.Millisecond * 200) - t.Status("end step") + t.Status([]byte("end step")) return &Result{Error: errors.New("test error 2")} } -func (t TestStepErr2) Cancel() error { - t.Status("cancel step") - return nil +func (t TestStepErr2) Cancel() { + t.Status([]byte("cancel step")) } type sg struct { @@ -155,14 +151,14 @@ type pipeConfig struct { } func createPipeline(testPipeConfig pipeConfig) (*Pipeline, bool) { - testpipe := New(testPipeConfig.pipelineName, testPipeConfig.pipeLineOutBufferLen) + testpipe := New(testPipeConfig.pipelineName) errorPipeline := false var testStages []*Stage for i, sg := range testPipeConfig.sgArr { // create stage - stage := NewStage(fmt.Sprintf("testStage%d", i), sg.concurrent, false) + stage := NewStage(fmt.Sprintf("testStage%d", i)) // create normal steps for j := 0; j < sg.steps; j++ { diff --git a/result_group.go b/result_group.go index 12c7af2..91a4362 100644 --- a/result_group.go +++ b/result_group.go @@ -24,7 +24,7 @@ type group struct { wg sync.WaitGroup errOnce sync.Once - result *Result + results []*Result } // WithContext returns a new Group and an associated Context derived from ctx. @@ -34,17 +34,17 @@ type group struct { // first. func withContext(ctx context.Context) (*group, context.Context) { ctx, cancel := context.WithCancel(ctx) - return &group{cancel: cancel, result: &Result{}}, ctx + return &group{cancel: cancel}, ctx } // Wait blocks until all function calls from the Go method have returned, then // returns the first non-nil error (if any) from them. -func (g *group) wait() *Result { +func (g *group) wait() []*Result { g.wg.Wait() if g.cancel != nil { g.cancel() } - return g.result + return g.results } // Go calls the given function in a new goroutine. @@ -59,7 +59,7 @@ func (g *group) run(f func() *Result) { if result := f(); result.Error != nil { g.errOnce.Do(func() { - g.result = result + g.results = append(g.results, result) if g.cancel != nil { g.cancel() } diff --git a/stage.go b/stage.go index cd1d725..1c01fdb 100644 --- a/stage.go +++ b/stage.go @@ -3,8 +3,6 @@ package pipeline import ( "context" "fmt" - - "github.com/fatih/color" ) // Stage is a collection of steps executed concurrently or sequentially @@ -13,19 +11,37 @@ import ( // disableStrictMode: In strict mode if a single step fails, all the other concurrent steps are cancelled. // Step.Cancel will be invoked for cancellation of the step. Set disableStrictMode to true to disable strict mode type Stage struct { - Name string `json:"name"` - Steps []Step `json:"steps"` - Concurrent bool `json:"concurrent"` - DisableStrictMode bool `json:"disableStrictMode"` - index int - pipelineKey string + Name string `json:"name"` + Steps []Step `json:"steps"` + config *stageConfig + index int + pipelineKey string +} + +// DefaultMergeFunc merges results from concurrent steps in the form []interface{} +func DefaultMergeFunc(results []*Result) *Result { + var mergedData []interface{} + for _, r := range results { + mergedData = append(mergedData, r.Data) + } + + return &Result{Data: mergedData} } // NewStage returns a new stage // name of the stage // concurrent flag sets whether the steps will be executed concurrently -func NewStage(name string, concurrent bool, disableStrictMode bool) *Stage { - st := &Stage{Name: name, Concurrent: concurrent} +func NewStage(name string, opts ...StageOption) *Stage { + + config := &stageConfig{ + mergeFunc: DefaultMergeFunc, + } + + for _, o := range opts { + o(config) + } + + st := &Stage{Name: name, config: config} return st } @@ -39,20 +55,16 @@ func (st *Stage) run(request *Request) *Result { if len(st.Steps) == 0 { return &Result{Error: fmt.Errorf("No steps to be executed")} } - st.status("begin") - defer st.status("end") - - if st.Concurrent { - st.status("is concurrent") + if st.config.concurrent { g, ctx := withContext(context.Background()) for _, step := range st.Steps { step := step - step.Status("begin") + step.Status([]byte("begin")) g.run(func() *Result { - defer step.Status("end") + defer step.Status([]byte("end")) //disables strict mode. g.run will wait for all steps to finish - if st.DisableStrictMode { + if st.config.disableStrictMode { return step.Exec(request) } @@ -69,9 +81,7 @@ func (st *Stage) run(request *Request) *Result { select { case <-ctx.Done(): - if err := step.Cancel(); err != nil { - st.status("Error Cancelling Step " + step.getCtx().name) - } + step.Cancel() <-resultChan return &Result{Error: ctx.Err()} @@ -86,42 +96,27 @@ func (st *Stage) run(request *Request) *Result { }) } - if result := g.wait(); result != nil && result.Error != nil { - st.status(" >>>failed !!! ") - return result + if results := g.wait(); len(results) != 0 { + return st.config.mergeFunc(results) } } else { - st.status("is not concurrent") res := &Result{} for _, step := range st.Steps { - step.Status("begin") res = step.Exec(request) if res != nil && res.Error != nil { - step.Status(">>>failed !!!") return res } if res == nil { res = &Result{} - step.Status("end") continue } request.Data = res.Data - request.KeyVal = res.KeyVal - step.Status("end") } return res } return &Result{} } - -// status writes a line to the out channel -func (st *Stage) status(line string) { - stageText := fmt.Sprintf("[stage-%d]", st.index) - yellow := color.New(color.FgYellow).SprintFunc() - line = yellow(stageText) + "[" + st.Name + "]: " + line - send(st.pipelineKey, line) -} diff --git a/step.go b/step.go index 5f6d7ed..321a76e 100644 --- a/step.go +++ b/step.go @@ -1,9 +1,7 @@ package pipeline import ( - "fmt" - - "github.com/fatih/color" + "context" ) // Result is returned by a step to dispatch data to the next step or stage @@ -11,14 +9,17 @@ type Result struct { Error error // dispatch any type Data interface{} - // dispatch key value pairs - KeyVal map[string]interface{} } // Request is the result dispatched in a previous step. type Request struct { - Data interface{} - KeyVal map[string]interface{} + stepContext *StepContext + Data interface{} +} + +// Status logs the status line to the out channel +func (r *Request) Status(data []byte) { + r.stepContext.Status(data) } // Step is the unit of work which can be concurrently or sequentially staged with other steps @@ -27,11 +28,11 @@ type Step interface { // Exec is invoked by the pipeline when it is run Exec(*Request) *Result // Cancel is invoked by the pipeline when one of the concurrent steps set Result{Error:err} - Cancel() error + Cancel() } type out interface { - Status(line string) + Status(data []byte) getCtx() *stepContextVal setCtx(ctx *stepContextVal) } @@ -57,9 +58,33 @@ func (sc *StepContext) setCtx(ctx *stepContextVal) { } // Status is used to log status from a step -func (sc *StepContext) Status(line string) { - stepText := fmt.Sprintf("[step-%d]", sc.getCtx().index) - blue := color.New(color.FgBlue).SprintFunc() - line = blue(stepText) + "[" + sc.getCtx().name + "]: " + line - send(sc.getCtx().pipelineKey, line) +func (sc *StepContext) Status(data []byte) { + send(sc.getCtx().pipelineKey, data) +} + +type step struct { + StepContext + execFunc func(context context.Context, r *Request) *Result + cancelFunc context.CancelFunc +} + +func (s *step) Exec(request *Request) *Result { + + request.stepContext = &s.StepContext + var ctx context.Context + ctx, s.cancelFunc = context.WithCancel(context.Background()) + return s.execFunc(ctx, request) + +} + +func (s *step) Cancel() { + s.cancelFunc() +} + +// NewStep creates a new step +func NewStep(exec func(context context.Context, r *Request) *Result) Step { + return &step{ + execFunc: exec, + } + }