This repository has been archived by the owner on May 12, 2021. It is now read-only.
/
stream.go
129 lines (109 loc) · 2.18 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"io"
"sync"
"syscall"
"github.com/containerd/fifo"
)
// The buffer size used to specify the buffer for IO streams copy
const bufSize = 32 << 10
var (
bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, bufSize)
return &buffer
},
}
)
type ttyIO struct {
Stdin io.ReadCloser
Stdout io.Writer
Stderr io.Writer
}
func (tty *ttyIO) close() {
if tty.Stdin != nil {
tty.Stdin.Close()
}
cf := func(w io.Writer) {
if w == nil {
return
}
if c, ok := w.(io.WriteCloser); ok {
c.Close()
}
}
cf(tty.Stdout)
cf(tty.Stderr)
}
func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (*ttyIO, error) {
var in io.ReadCloser
var outw io.Writer
var errw io.Writer
var err error
if stdin != "" {
in, err = fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return nil, err
}
}
if stdout != "" {
outw, err = fifo.OpenFifo(ctx, stdout, syscall.O_RDWR, 0)
if err != nil {
return nil, err
}
}
if !console && stderr != "" {
errw, err = fifo.OpenFifo(ctx, stderr, syscall.O_RDWR, 0)
if err != nil {
return nil, err
}
}
ttyIO := &ttyIO{
Stdin: in,
Stdout: outw,
Stderr: errw,
}
return ttyIO, nil
}
func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
var wg sync.WaitGroup
var closeOnce sync.Once
if tty.Stdin != nil {
wg.Add(1)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(stdinPipe, tty.Stdin, *p)
// notify that we can close process's io safely.
close(stdinCloser)
wg.Done()
}()
}
if tty.Stdout != nil {
wg.Add(1)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(tty.Stdout, stdoutPipe, *p)
wg.Done()
closeOnce.Do(tty.close)
}()
}
if tty.Stderr != nil && stderrPipe != nil {
wg.Add(1)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(tty.Stderr, stderrPipe, *p)
wg.Done()
}()
}
wg.Wait()
closeOnce.Do(tty.close)
close(exitch)
}