Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
shimv2: fix the issue of close IO stream
Browse files Browse the repository at this point in the history
It should wait until the stdin io copy
termianted to close the process's io stream,
otherwise, it would miss forwarding some contents
to process stdin.

Fixes: #2884

Signed-off-by: fupan.lfp <fupan.lfp@antgroup.com>
  • Loading branch information
fupan.lfp authored and lifupan committed Aug 11, 2020
1 parent bdcd2dd commit 3a0cd87
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 47 deletions.
66 changes: 35 additions & 31 deletions containerd-shim-v2/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package containerdshim

import (
"io"
"time"

"github.com/containerd/containerd/api/types/task"
Expand All @@ -17,23 +18,25 @@ import (
)

type container struct {
s *service
ttyio *ttyIO
spec *specs.Spec
exitTime time.Time
execs map[string]*exec
exitIOch chan struct{}
exitCh chan uint32
id string
stdin string
stdout string
stderr string
bundle string
cType vc.ContainerType
exit uint32
status task.Status
terminal bool
mounted bool
s *service
ttyio *ttyIO
spec *specs.Spec
exitTime time.Time
execs map[string]*exec
exitIOch chan struct{}
stdinPipe io.WriteCloser
stdinCloser chan struct{}
exitCh chan uint32
id string
stdin string
stdout string
stderr string
bundle string
cType vc.ContainerType
exit uint32
status task.Status
terminal bool
mounted bool
}

func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.ContainerType, spec *specs.Spec, mounted bool) (*container, error) {
Expand All @@ -47,20 +50,21 @@ func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.Con
}

c := &container{
s: s,
spec: spec,
id: r.ID,
bundle: r.Bundle,
stdin: r.Stdin,
stdout: r.Stdout,
stderr: r.Stderr,
terminal: r.Terminal,
cType: containerType,
execs: make(map[string]*exec),
status: task.StatusCreated,
exitIOch: make(chan struct{}),
exitCh: make(chan uint32, 1),
mounted: mounted,
s: s,
spec: spec,
id: r.ID,
bundle: r.Bundle,
stdin: r.Stdin,
stdout: r.Stdout,
stderr: r.Stderr,
terminal: r.Terminal,
cType: containerType,
execs: make(map[string]*exec),
status: task.StatusCreated,
exitIOch: make(chan struct{}),
exitCh: make(chan uint32, 1),
stdinCloser: make(chan struct{}),
mounted: mounted,
}
return c, nil
}
19 changes: 12 additions & 7 deletions containerd-shim-v2/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package containerdshim

import (
"fmt"
"io"
"strings"
"time"

Expand All @@ -32,6 +33,9 @@ type exec struct {
exitIOch chan struct{}
exitCh chan uint32

stdinCloser chan struct{}
stdinPipe io.WriteCloser

exitTime time.Time
}

Expand Down Expand Up @@ -108,13 +112,14 @@ func newExec(c *container, stdin, stdout, stderr string, terminal bool, jspec *g
}

exec := &exec{
container: c,
cmds: cmds,
tty: tty,
exitCode: exitCode255,
exitIOch: make(chan struct{}),
exitCh: make(chan uint32, 1),
status: task.StatusCreated,
container: c,
cmds: cmds,
tty: tty,
exitCode: exitCode255,
exitIOch: make(chan struct{}),
stdinCloser: make(chan struct{}),
exitCh: make(chan uint32, 1),
status: task.StatusCreated,
}

return exec, nil
Expand Down
16 changes: 10 additions & 6 deletions containerd-shim-v2/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,19 +725,23 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *pt
return nil, err
}

tty := c.ttyio
stdin := c.stdinPipe
stdinCloser := c.stdinCloser

if r.ExecID != "" {
execs, err := c.getExec(r.ExecID)
if err != nil {
return nil, err
}
tty = execs.ttyio
stdin = execs.stdinPipe
stdinCloser = execs.stdinCloser
}

if tty != nil && tty.Stdin != nil {
if err := tty.Stdin.Close(); err != nil {
return nil, errors.Wrap(err, "close stdin")
}
// wait until the stdin io copy terminated, otherwise
// some contents would not be forwarded to the process.
<-stdinCloser
if err := stdin.Close(); err != nil {
return nil, errors.Wrap(err, "close stdin")
}

return empty, nil
Expand Down
12 changes: 10 additions & 2 deletions containerd-shim-v2/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,22 @@ func startContainer(ctx context.Context, s *service, c *container) error {
return err
}

c.stdinPipe = stdin

if c.stdin != "" || c.stdout != "" || c.stderr != "" {
tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal)
if err != nil {
return err
}
c.ttyio = tty
go ioCopy(c.exitIOch, tty, stdin, stdout, stderr)
go ioCopy(c.exitIOch, c.stdinCloser, tty, stdin, stdout, stderr)
} else {
//close the io exit channel, since there is no io for this container,
//otherwise the following wait goroutine will hang on this channel.
close(c.exitIOch)
//close the stdin closer channel to notify that it's safe to close process's
// io.
close(c.stdinCloser)
}

go wait(s, c, "")
Expand Down Expand Up @@ -111,13 +116,16 @@ func startExec(ctx context.Context, s *service, containerID, execID string) (*ex
if err != nil {
return nil, err
}

execs.stdinPipe = stdin

tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal)
if err != nil {
return nil, err
}
execs.ttyio = tty

go ioCopy(execs.exitIOch, tty, stdin, stdout, stderr)
go ioCopy(execs.exitIOch, execs.stdinCloser, tty, stdin, stdout, stderr)

go wait(s, c, execID)

Expand Down
4 changes: 3 additions & 1 deletion containerd-shim-v2/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (
return ttyIO, nil
}

func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
var wg sync.WaitGroup
var closeOnce sync.Once

Expand All @@ -95,6 +95,8 @@ func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPi
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(stdinPipe, tty.Stdin, *p)
// notify that we can close process's io safely.
close(stdinCloser)
wg.Done()
}()
}
Expand Down

0 comments on commit 3a0cd87

Please sign in to comment.