44 "context"
55 "fmt"
66 "io"
7+ "sync"
78)
89
910type multirun struct {
@@ -29,6 +30,8 @@ func (m multirun) run(ctx context.Context) error {
2930
3031// unboundedExecution execute multiple commands without concurrency limit
3132func (m multirun ) unboundedExecution (ctx context.Context ) error {
33+ ctx , cancel := context .WithCancel (ctx )
34+ defer cancel ()
3235 errs := make (chan error )
3336
3437 for _ , cmd := range m .commands {
@@ -48,7 +51,9 @@ func (m multirun) unboundedExecution(ctx context.Context) error {
4851
4952 var firstError error
5053 for range m .commands {
51- if err := <- errs ; firstError == nil {
54+ err := <- errs
55+ if err != nil && firstError == nil {
56+ cancel ()
5257 firstError = err
5358 }
5459 }
@@ -58,34 +63,50 @@ func (m multirun) unboundedExecution(ctx context.Context) error {
5863
5964// boundedExecution execute multiple commands using a sized worker pool
6065func (m multirun ) boundedExecution (ctx context.Context ) error {
66+ ctx , cancel := context .WithCancel (ctx )
67+ defer cancel ()
6168 // errs should be buffered to avoid blocking
6269 // when len(m.commands) > m.jobs
6370 errs := make (chan error , len (m .commands ))
64- commands := make (chan command )
65-
66- // start worker pool
67- for w := 0 ; w < m .jobs ; w ++ {
68- go m .spawnWorker (ctx , commands , errs )
69- }
71+ commands := make (chan command , len (m .commands ))
7072
7173 // send command to worker pool
7274 for _ , cmd := range m .commands {
7375 commands <- cmd
7476 }
7577 close (commands )
7678
77- var firstError error
78- for range m .commands {
79- if err := <- errs ; firstError == nil {
80- firstError = err
81- }
79+ // start worker pool
80+ var wg sync.WaitGroup
81+ wg .Add (m .jobs )
82+ for w := 0 ; w < m .jobs ; w ++ {
83+ go func () {
84+ defer wg .Done ()
85+ err := m .spawnWorker (ctx , commands )
86+ if err != nil {
87+ errs <- err // first error must go first, cancel after it has been sent.
88+ cancel ()
89+ }
90+ }()
8291 }
8392
84- return firstError
93+ wg .Wait ()
94+ close (errs )
95+
96+ for err := range errs {
97+ return err
98+ }
99+
100+ return nil
85101}
86102
87- func (m multirun ) spawnWorker (ctx context.Context , commands <- chan command , errs chan <- error ) {
103+ func (m multirun ) spawnWorker (ctx context.Context , commands <- chan command ) error {
88104 for cmd := range commands {
105+ select {
106+ case <- ctx .Done ():
107+ return ctx .Err ()
108+ default :
109+ }
89110 p := process {
90111 tag : cmd .Tag ,
91112 path : cmd .Path ,
@@ -103,6 +124,10 @@ func (m multirun) spawnWorker(ctx context.Context, commands <-chan command, errs
103124 fmt .Fprintf (m .stderrSink , "Running %s\n " , cmd .Tag )
104125 }
105126
106- errs <- p .run (ctx )
127+ err := p .run (ctx )
128+ if err != nil {
129+ return err
130+ }
107131 }
132+ return nil
108133}
0 commit comments