Skip to content

Commit 326a1a5

Browse files
authored
Merge pull request #62 from github/znull/v2-review-fixes
v2 review fixes: lifecycle cleanup and docs
2 parents 2de883d + 20b8064 commit 326a1a5

3 files changed

Lines changed: 96 additions & 11 deletions

File tree

pipe/function.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,32 @@ type StageFunc func(ctx context.Context, env Env, stdin io.Reader, stdout io.Wri
3333
// FunctionOption configures a Function stage.
3434
type FunctionOption func(*goStage)
3535

36+
// WithStdinRequirement returns a FunctionOption declaring the stage's stdin
37+
// requirement.
38+
func WithStdinRequirement(requirement StreamRequirement) FunctionOption {
39+
return func(s *goStage) {
40+
s.requirements.Stdin = requirement
41+
}
42+
}
43+
44+
// WithStdoutRequirement returns a FunctionOption declaring the stage's stdout
45+
// requirement.
46+
func WithStdoutRequirement(requirement StreamRequirement) FunctionOption {
47+
return func(s *goStage) {
48+
s.requirements.Stdout = requirement
49+
}
50+
}
51+
3652
// ForbidStdin returns a FunctionOption declaring that the stage must not be
3753
// connected to stdin.
3854
func ForbidStdin() FunctionOption {
39-
return func(s *goStage) {
40-
s.requirements.Stdin = StreamForbidden
41-
}
55+
return WithStdinRequirement(StreamForbidden)
4256
}
4357

4458
// ForbidStdout returns a FunctionOption declaring that the stage must not be
4559
// connected to stdout.
4660
func ForbidStdout() FunctionOption {
47-
return func(s *goStage) {
48-
s.requirements.Stdout = StreamForbidden
49-
}
61+
return WithStdoutRequirement(StreamForbidden)
5062
}
5163

5264
// Function returns a pipeline `Stage` that will run a `StageFunc` in

pipe/pipeline.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,16 @@ func WithDir(dir string) Option {
9999
// WithStdin assigns stdin to the first command in the pipeline. The
100100
// caller retains ownership of stdin; the pipeline will not close it,
101101
// even if `Start()` returns an error.
102+
//
103+
// If the first stage is a `Command` and stdin is not an `*os.File`,
104+
// `exec.Cmd` has to copy stdin through an internal goroutine, and
105+
// `Cmd.Wait()` waits for that copy to finish. This is fine for bounded
106+
// readers such as `strings.Reader` and `bytes.Reader`, and for
107+
// `*os.File` values, which are passed to the command directly. But a
108+
// borrowed, non-file reader that can block forever can also block the
109+
// pipeline forever if the command exits without consuming all of its
110+
// stdin. See `TestPipelineIOPipeStdinThatIsNeverClosed` for the known
111+
// limitation.
102112
func WithStdin(stdin io.Reader) Option {
103113
return func(p *Pipeline) {
104114
p.stdin = Input(stdin)
@@ -238,6 +248,12 @@ func (p *Pipeline) Start(ctx context.Context) error {
238248

239249
atomic.StoreUint32(&p.started, 1)
240250
ctx, p.cancel = context.WithCancel(ctx)
251+
startedOK := false
252+
defer func() {
253+
if !startedOK {
254+
p.cancel()
255+
}
256+
}()
241257

242258
if len(p.stages) == 0 {
243259
if p.stdout == nil {
@@ -290,11 +306,15 @@ func (p *Pipeline) Start(ctx context.Context) error {
290306
requirements := s.Requirements()
291307
if err := requirements.Stdin.Validate(); err != nil {
292308
closePipes()
293-
return fmt.Errorf("stdin: %w", err)
309+
return fmt.Errorf(
310+
"stage %q has invalid stdin requirement: %w", s.Name(), err,
311+
)
294312
}
295313
if err := requirements.Stdout.Validate(); err != nil {
296314
closePipes()
297-
return fmt.Errorf("stdout: %w", err)
315+
return fmt.Errorf(
316+
"stage %q has invalid stdout requirement: %w", s.Name(), err,
317+
)
298318
}
299319

300320
stageJoiners[i].nextStage = s
@@ -362,10 +382,19 @@ func (p *Pipeline) Start(ctx context.Context) error {
362382
}
363383
}
364384

385+
startedOK = true
365386
return nil
366387
}
367388

368389
func (p *Pipeline) Output(ctx context.Context) ([]byte, error) {
390+
if p.hasStarted() {
391+
panic("attempt to get output from a pipeline that has already started")
392+
}
393+
394+
if err := p.stdout.Close(); err != nil {
395+
return nil, fmt.Errorf("closing previous stdout: %w", err)
396+
}
397+
369398
var buf bytes.Buffer
370399
p.stdout = Output(&buf)
371400
err := p.Run(ctx)

pipe/pipeline_test.go

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,23 @@ func TestPipelineEmptyOutput(t *testing.T) {
5454
}
5555
}
5656

57+
func TestPipelineOutputClosesConfiguredStdoutCloser(t *testing.T) {
58+
t.Parallel()
59+
ctx := context.Background()
60+
stdout := &closeTrackingWriter{}
61+
p := pipe.New(
62+
pipe.WithStdin(strings.NewReader("hello world\n")),
63+
pipe.WithStdoutCloser(stdout),
64+
)
65+
66+
out, err := p.Output(ctx)
67+
if assert.NoError(t, err) {
68+
assert.Equal(t, "hello world\n", string(out))
69+
assert.Equal(t, "", stdout.buf.String())
70+
assert.True(t, stdout.closed, "WithStdoutCloser destination should be closed")
71+
}
72+
}
73+
5774
func TestPipelineEmptyWithStdoutCloser(t *testing.T) {
5875
t.Parallel()
5976
ctx := context.Background()
@@ -951,6 +968,24 @@ func TestFunctionOptionsForbidStreams(t *testing.T) {
951968
})
952969
}
953970

971+
func TestFunctionOptionsSetStreamRequirements(t *testing.T) {
972+
t.Parallel()
973+
974+
stage := pipe.Function(
975+
"file-preferring",
976+
func(_ context.Context, _ pipe.Env, _ io.Reader, _ io.Writer) error {
977+
return nil
978+
},
979+
pipe.WithStdinRequirement(pipe.StreamPreferFile),
980+
pipe.WithStdoutRequirement(pipe.StreamPreferFile),
981+
)
982+
983+
assert.Equal(t, pipe.StageRequirements{
984+
Stdin: pipe.StreamPreferFile,
985+
Stdout: pipe.StreamPreferFile,
986+
}, stage.Requirements())
987+
}
988+
954989
func TestStreamForbiddenStdin(t *testing.T) {
955990
t.Parallel()
956991
ctx := context.Background()
@@ -1024,7 +1059,10 @@ func TestInvalidStreamRequirements(t *testing.T) {
10241059
Stdin: pipe.StreamRequirement(123),
10251060
},
10261061
})
1027-
require.ErrorContains(t, p.Run(ctx), `stdin: invalid stream requirement 123`)
1062+
require.ErrorContains(
1063+
t, p.Run(ctx),
1064+
`stage "source" has invalid stdin requirement: invalid stream requirement 123`,
1065+
)
10281066
assert.True(t, stdout.closed, "WithStdoutCloser destination should be closed")
10291067
})
10301068

@@ -1038,7 +1076,10 @@ func TestInvalidStreamRequirements(t *testing.T) {
10381076
Stdout: pipe.StreamRequirement(123),
10391077
},
10401078
})
1041-
require.ErrorContains(t, p.Run(ctx), `stdout: invalid stream requirement 123`)
1079+
require.ErrorContains(
1080+
t, p.Run(ctx),
1081+
`stage "sink" has invalid stdout requirement: invalid stream requirement 123`,
1082+
)
10421083
assert.True(t, stdout.closed, "WithStdoutCloser destination should be closed")
10431084
})
10441085
}
@@ -1071,7 +1112,10 @@ func TestInvalidStreamRequirement(t *testing.T) {
10711112
Stdin: pipe.StreamRequirement(99),
10721113
},
10731114
})
1074-
require.ErrorContains(t, p.Run(ctx), `stdin: invalid stream requirement 99`)
1115+
require.ErrorContains(
1116+
t, p.Run(ctx),
1117+
`stage "invalid" has invalid stdin requirement: invalid stream requirement 99`,
1118+
)
10751119
}
10761120

10771121
func TestFunctionNoInput(t *testing.T) {

0 commit comments

Comments
 (0)