首先在程序中封装了下面一个函数用来执行系统命令:
// 执行系统命令
func executeCommand(command string, output, outerr io.Writer) error {
cmd := exec.Command("/bin/bash", "-c", command)
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
defer stdout.Close()
stderr, err := cmd.StderrPipe()
if err != nil {
return err
}
defer stderr.Close()
if err := cmd.Start(); err != nil {
return err
}
errRet := make(chan error, 2)
go func() {
if _, err := io.Copy(output, stdout); err != nil {
errRet <- fmt.Errorf("[executeCommand] copy cmd stdout error: %s", err)
} else {
errRet <- nil
}
}()
go func() {
if _, err := io.Copy(outerr, stderr); err != nil {
errRet <- fmt.Errorf("[executeCommand] copy cmd stderr error: %s", err)
} else {
errRet <- nil
}
}()
if err := cmd.Wait(); err != nil {
return err
}
for i := 0; i < 2; i++ {
if err := <-errRet; err != nil {
return err
}
}
return nil
}
我们使用的是 os/exec
包来执行命令,我们这里选择使用异步的方式来执行命令,所以需要通过创建管道来拿到命令的标准输出和错误。上面 cmd.Start
不会阻塞当前的执行,所以我们后面开启两个协程来异步收集错误,最后通过 cmd.Wait
来等待命令执行完成,最后通过信道等待收集错误的协程执行完毕。函数接收两个 io.Writer
参数,用来写入标准输出和标准错误,外部可以读取其中的内容。
这整个函数看似没有任何问题,我们在外部来调用一下:
err := executeCommand("ls", os.Stdout, os.Stderr)
if err != nil {
fmt.Println(err)
}
err = executeCommand("cat /dev/null", os.Stdout, os.Stderr)
if err != nil {
fmt.Println(err)
}
err = executeCommand("cat /dev/null", os.Stdout, os.Stderr)
if err != nil {
fmt.Println(err)
}
我们这里直接使用 os.Stdout
和 os.Stderr
接受命令的输出和错误,这会直接输出到控制台。当我们执行时会发现有时候运行会出现下面的报错:
read |0: file already closed
write /dev/stdout: file already closed
这两种报错都有一定的几率出现,而且不一定同时出现,有些时候甚至不出现,由于我们的方法中存在协程,所以这种存在概率的问题大多是因为并发情况下没有前后固定顺序造成的。
我们首先来看第一个错误:read |0: file already closed
这个错误字面上看是读取文件的时候文件已经关闭了,我们可以看 cmd.StdoutPipe()
这部分的源码:
func (c *Cmd) StdoutPipe() (io.ReadCloser, error) {
if c.Stdout != nil {
return nil, errors.New("exec: Stdout already set")
}
if c.Process != nil {
return nil, errors.New("exec: StdoutPipe after process started")
}
pr, pw, err := os.Pipe()
if err != nil {
return nil, err
}
c.Stdout = pw
c.childIOFiles = append(c.childIOFiles, pw)
c.parentIOPipes = append(c.parentIOPipes, pr)
return pr, nil
}
其中会调用 os.Pipe
源码如下:
func Pipe() (r *File, w *File, err error) {
var p [2]int
e := syscall.Pipe2(p[0:], syscall.O_CLOEXEC)
if e != nil {
return nil, nil, NewSyscallError("pipe2", e)
}
return newFile(p[0], "|0", kindPipe), newFile(p[1], "|1", kindPipe), nil
}
可以看到这里会创建两个文件分别是 |0
还有 |1
,然后把 |1
给到 Stdout,同时放到子进程的 IO 中,|0
是作为父进程的 IO 管道,同时返回到外部,也就是说是使用 |0
作为子进程和父进程通信的管道,根据上面的报错,很明显就是 |0
文件已经关闭,所以我们无法读取。
那么什么时候会关闭呢,实际上是在执行 Wait
时,进程执行完毕后回收资源阶段会关闭:
func (c *Cmd) Wait() error {
if c.Process == nil {
return errors.New("exec: not started")
}
if c.ProcessState != nil {
return errors.New("exec: Wait was already called")
}
state, err := c.Process.Wait()
if err == nil && !state.Success() {
err = &ExitError{ProcessState: state}
}
c.ProcessState = state
// other
// ...
closeDescriptors(c.parentIOPipes)
c.parentIOPipes = nil
return err
}
上面是 Wait
的一部分代码,可以看到最后 closeDescriptors(c.parentIOPipes)
关闭了向父进程通信的管道。
原因就是我们开启协程收集输出时,进程这个时候已经执行完毕,所以就直接调用了 Wait,这个时候输出管道被关闭,而我们还没有读取完,就会出现这个报错,我们可以用简单的代码来验证一下:
// 在 Wait 后面加两行代码
n, err := stdout.Read(make([]byte, 10))
fmt.Println(n, err)
果然我们执行后会输出同样的错误,到这里第一个错误的原因就找到了,然后来看第二个错误:write /dev/stdout: file already closed
,这和错误看上去是往 /dev/stdout
写入的时候文件已经关闭了,但是 /dev/stdout
是标准输出,也不太可能关闭,而且这个错误是 io.Copy
返回的,我们同样在 Wait
后面加两行代码来验证下:
n, err := io.Copy(output, stdout)
fmt.Println(n, err)
然后执行果然出现了这个错误,那么这又是什么原因呢?我们可以借助于工具开启断点调试来定位问题,定位过程如下:
首先进入 io.Copy
:
func Copy(dst Writer, src Reader) (written int64, err error) {
return copyBuffer(dst, src, nil)
}
这里直接进入 copyBuffer
:
func copyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) {
// If the reader has a WriteTo method, use it to do the copy.
// Avoids an allocation and a copy.
if wt, ok := src.(WriterTo); ok {
return wt.WriteTo(dst)
}
// Similarly, if the writer has a ReadFrom method, use it to do the copy.
if rt, ok := dst.(ReaderFrom); ok {
return rt.ReadFrom(src)
}
// other...
return written, err
}
这个函数后面的代码省略了,我们只关注前面这几行,首先是 src.(WriterTo)
这个断言是失败的,也就是说管道文件没有这个方法,然后继续走到 dst.(ReaderFrom)
这个是断言成功的,于是就进入 rt.ReadFrom
中,这个 ReadFrom
是接口方法,具体的实现在 os/file
包中:
// ReadFrom implements io.ReaderFrom.
func (f *File) ReadFrom(r io.Reader) (n int64, err error) {
if err := f.checkValid("write"); err != nil {
return 0, err
}
n, handled, e := f.readFrom(r)
if !handled {
return genericReadFrom(f, r) // without wrapping
}
return n, f.wrapErr("write", e)
}
进去之后调用 f.checkValid
这个是正常通过了,然后就来到 f.readFrom(r)
中,接着会来到 copyFileRange 这个过程调用层级比较深,我们就省略直接来到错误发生的地方:
func copyFileRange(dst, src *FD, max int) (written int64, err error) {
// The signature of copy_file_range(2) is:
//
// ssize_t copy_file_range(int fd_in, loff_t *off_in,
// int fd_out, loff_t *off_out,
// size_t len, unsigned int flags);
//
// Note that in the call to unix.CopyFileRange below, we use nil
// values for off_in and off_out. For the system call, this means
// "use and update the file offsets". That is why we must acquire
// locks for both file descriptors (and why this whole machinery is
// in the internal/poll package to begin with).
if err := dst.writeLock(); err != nil {
return 0, err
}
defer dst.writeUnlock()
if err := src.readLock(); err != nil {
return 0, err
}
defer src.readUnlock()
var n int
for {
n, err = unix.CopyFileRange(src.Sysfd, nil, dst.Sysfd, nil, max, 0)
if err != syscall.EINTR {
break
}
}
return int64(n), err
}
上面会先对 dst
加写锁,然后对 src
加读锁,结果是执行 readLock
时出现了问题:
// readLock adds a reference to fd and locks fd for reading.
// It returns an error when fd cannot be used for reading.
func (fd *FD) readLock() error {
if !fd.fdmu.rwlock(true) {
return errClosing(fd.isFile)
}
return nil
}
这里由于文件已经关闭,所以会走到 errClosing
这里:
// ErrFileClosing is returned when a file descriptor is used after it
// has been closed.
var ErrFileClosing = errors.New("use of closed file")
// Return the appropriate closing error based on isFile.
func errClosing(isFile bool) error {
if isFile {
return ErrFileClosing
}
return ErrNetClosing
}
这里会返回错误:use of closed file
然后一层一层返回,最终回到 ReadFrom 方法中走到 f.wrapErr("write", e)
这个地方:
// wrapErr wraps an error that occurred during an operation on an open file.
// It passes io.EOF through unchanged, otherwise converts
// poll.ErrFileClosing to ErrClosed and wraps the error in a PathError.
func (f *File) wrapErr(op string, err error) error {
if err == nil || err == io.EOF {
return err
}
if err == poll.ErrFileClosing {
err = ErrClosed
} else if checkWrapErr && errors.Is(err, poll.ErrFileClosing) {
panic("unexpected error wrapping poll.ErrFileClosing: " + err.Error())
}
return &PathError{Op: op, Path: f.name, Err: err}
}
wrapErr
将 err
重新设置为 ErrClosed
然后包装到 PathError
中返回,结果中 Op
就是 write
,Path
就是目标文件名称,即 /dev/stdout
,err
就是 ErrClosed
也就是 file already closed
,也就得到了最后的结果:write /dev/stdout: file already closed
,这个错误确实具有一定的迷惑性,当写入文件加锁失败时表示的确实是写入的这个文件已关闭,但是当读取文件加锁失败时,这里虽然是 write /dev/stdout
但是表示的是从来源读取失败,前面的标识无论什么情况下都是目标文件而已,这样第二个错误的原因就清楚了。
那么这两个报错有哪些不同呢?
read |0: file already closed
这个报错是正在读取文件时,文件被关闭而出现的报错,也就是说这个是在执行 unix.CopyFileRange
过程中文件被关闭导致的错误。
write /dev/stdout: file already closed
这个错误是先关闭文件后,再进行读取时在 readLock
加锁时就检测出来已关闭而导致的错误。
所以这两个错误都是文件被关闭导致的,关键的区别是读在关闭后还是在关闭过程中。
那么原因清楚了,最原始的代码应该如何修改呢?
首先我们直接让获取输出一定在 Wait
调用之前就可以保证读取没问题,所以将遍历结果信道的操作放在 Wait
前面,这样保证一定读取完成再执行 Wait
操作回收资源:
func executeCommand(command string, output, outerr io.Writer) error {
cmd := exec.Command("/bin/bash", "-c", command)
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
stderr, err := cmd.StderrPipe()
if err != nil {
return err
}
if err := cmd.Start(); err != nil {
return err
}
errRet := make(chan error, 2)
go func() {
if _, err := io.Copy(output, stdout); err != nil {
errRet <- fmt.Errorf("[executeCommand] copy cmd stdout error: %s", err)
} else {
errRet <- nil
}
}()
go func() {
if _, err := io.Copy(outerr, stderr); err != nil {
errRet <- fmt.Errorf("[executeCommand] copy cmd stderr error: %s", err)
} else {
errRet <- nil
}
}()
for i := 0; i < 2; i++ {
if err := <-errRet; err != nil {
return err
}
}
if err := cmd.Wait(); err != nil {
return err
}
return nil
}
这样就不会出现问题了,而且管道我们也不需要在外面代码中关闭,Wait
回收资源时会自动关闭。
其实上面写法还是太复杂了,对于我们这个场景其实就是同步执行命令,不需要这么麻烦,我们只需要手动设置 cmd 实例的 Stdout
和 Stderr
即可:
func executeCommand(command string, output, outerr io.Writer) error {
cmd := exec.Command("/bin/bash", "-c", command)
cmd.Stdout = output
cmd.Stderr = outerr
if err := cmd.Start(); err != nil {
return err
}
if err := cmd.Wait(); err != nil {
return err
}
return nil
}
这样代码就简单多了,注意设置 cmd.Stdout
和 cmd.Stderr
必须放在 cmd.Start()
之前,之后可能会丢失输出。
既然是同步的,我们可以再进一步,将 cmd.Start()
和 cmd.Wait()
合并成 cmd.Run()
,这样更简单:
func executeCommand(command string, output, outerr io.Writer) error {
cmd := exec.Command("/bin/bash", "-c", command)
cmd.Stdout = output
cmd.Stderr = outerr
if err := cmd.Run(); err != nil {
return err
}
return nil
}
这样代码就更简洁了,而且我们需要的功能也完全可以实现。不过今天这个例子能告诉我们在异步情况下执行命令需要注意什么,以及如何定位问题,如果我们需要在不阻塞当前线程的情况下执行命令,那么也必须使用管道获取执行结果,这时候我们就要注意避免这些报错的情况。
Reference: