Go 语言中的 io.Pipe 是一个非常强大且常用的工具,它在内存中创建一个同步的管道,可以将一个 io.Writer(写入端)和一个 io.Reader(读取端)连接起来,实现数据的边写边读,而不需要中间的临时文件或巨大的内存缓冲区。
io.Pipe 的核心概念
调用 io.Pipe() 会返回两个对象:
*PipeReader:管道的读取端,实现了 io.Reader 接口。
*PipeWriter:管道的写入端,实现了 io.Writer 接口。
关键特性:
- 同步与阻塞(Synchronous & Blocking):这是最重要的特性。
io.Pipe 内部没有缓冲区(或者说缓冲区极小)。
- 当你往
PipeWriter 写入数据时,写入操作会阻塞,直到有另一个 goroutine 从 PipeReader 把这些数据读走。
- 反之亦然,读取操作也会阻塞,直到有数据被写入。
- 并发安全(Thread-safe):它的设计初衷就是在不同的 Goroutine 之间传递数据。你必须在不同的 Goroutine 中分别进行读和写,否则会导致死锁(Deadlock)。
- 内存高效:因为它不需要把所有数据都缓存起来再读取,所以非常适合处理大数据流(Streaming),能有效防止 OOM 的发生。
示例
Goroutine 通信
这是最简单的用法,展示了如何在两个 Goroutine 之间传递数据。
package main
import (
"fmt"
"io"
"time"
)
func main() {
// 创建一个管道
pr, pw := io.Pipe()
// 开启一个 Goroutine 用于写入数据
go func() {
// 模拟耗时操作
fmt.Println("Writer: 开始写入数据...")
for i := 1; i <= 3; i++ {
msg := fmt.Sprintf("数据块 %d\n", i)
pw.Write([]byte(msg)) // 这里会阻塞,直到 Main Goroutine 读取
fmt.Printf("Writer: 已写入 %d\n", i)
time.Sleep(500 * time.Millisecond)
}
// 写完后必须关闭 Writer,否则 Reader 会一直等待(导致死锁或泄露)
pw.Close()
fmt.Println("Writer: 关闭管道")
}()
// 主 Goroutine 用于读取数据
fmt.Println("Reader: 准备读取...")
// buffer 用于存储读取的数据
buf := make([]byte, 1024)
for {
n, err := pr.Read(buf)
if err == io.EOF {
break // 管道关闭,读取结束
}
if err != nil {
fmt.Println("读取错误:", err)
break
}
fmt.Printf("Reader: 收到 -> %s", string(buf[:n]))
}
fmt.Println("Reader: 读取完成")
}
示将 Response 写入管道
这个示例展示了如何将 HTTP 响应的 Body 导入到 io.Pipe 中。虽然在这个简单的例子中看起来有点多此一举(因为可以直接读 Body),但它是理解数据流向的基础。
数据流向:
HTTP Response -> Goroutine (io.Copy) -> PipeWriter -> Pipe (内存) -> PipeReader -> Main (读取)
package main
import (
"fmt"
"io"
"net/http"
"os"
)
func main() {
// 1. 发起 HTTP 请求
url := "https://httpbin.org/stream/5" // 返回 5 行流式 JSON 数据
resp, err := http.Get(url)
if err != nil {
panic(err)
}
// 注意:这里不要立即关闭 resp.Body,要在读取完后再关,或者在 Goroutine 中关
// 2. 创建管道
pr, pw := io.Pipe()
// 3. 启动 Goroutine:充当 "搬运工"
go func() {
// 任务结束后关闭 Writer,否则 Reader 会死锁
defer pw.Close()
defer resp.Body.Close()
// 将 HTTP Body 的内容拷贝到 PipeWriter
// 这一步会阻塞,直到 Main Goroutine 从 pr 读取数据
_, err := io.Copy(pw, resp.Body)
if err != nil {
// 如果下载中断,将错误传递给 Reader
pw.CloseWithError(err)
}
}()
// 4. 主程:从 PipeReader 读取数据
// 此时 pr 就像是一个本地的数据流,其实数据源来自网络
fmt.Println("开始从管道读取 HTTP 数据...")
if _, err := io.Copy(os.Stdout, pr); err != nil {
fmt.Println("读取出错:", err)
}
fmt.Println("\n读取完成")
}
边下载边压缩
这是 io.Pipe 真正的用武之地。假设你要下载一个大文件,你想把它存成 .gz 压缩包。
- 如果不由
io.Pipe:你需要先下载下来(占磁盘/内存),然后再压缩。
- 使用
io.Pipe:你可以把“压缩”这个动作串联在下载和保存之间。
数据流向:
HTTP Response -> Gzip Writer -> PipeWriter -> Pipe -> PipeReader -> File
package main
import (
"compress/gzip"
"fmt"
"io"
"net/http"
"os"
)
func main() {
// 1. 模拟下载源
url := "https://www.xiexianbin.cn"
resp, err := http.Get(url)
if err != nil {
panic(err)
}
// 2. 创建管道
// 我们希望最终得到一个 Reader,它读出来的就是压缩后的数据
pr, pw := io.Pipe()
// 3. 启动 Goroutine 进行 "下载 + 压缩 + 写入管道"
go func() {
defer resp.Body.Close()
defer pw.Close() // 必须关闭 PipeWriter
// 创建一个 gzip writer,它输出的目标是 PipeWriter
gw := gzip.NewWriter(pw)
defer gw.Close() // 必须关闭 Gzip Writer 以刷新缓冲区
// 核心魔法:
// Copy 读 HTTP Body -> 写入 gw (进行压缩) -> gw 写入 pw -> 管道
_, err := io.Copy(gw, resp.Body)
if err != nil {
pw.CloseWithError(err)
}
}()
// 4. 主程:消费 PipeReader
// 此时 pr 读取到的数据已经是被 Gzip 压缩过的二进制流了
outFile, _ := os.Create("baidu_index.html.gz")
defer outFile.Close()
fmt.Println("开始下载并实时压缩...")
n, err := io.Copy(outFile, pr)
if err != nil {
panic(err)
}
fmt.Printf("处理完成!已写入文件大小: %d 字节 (Gzip格式)\n", n)
}
当你需要从 http.Response 读取数据,但你的后续处理逻辑需要一个 Reader 接口,且中间夹杂着原本输出为 Writer 的转换操作(如压缩、加密、图片转码)时,io.Pipe 是连接这两者的完美桥梁。
连接不兼容的接口 (Reader vs Writer)
这是 io.Pipe 最经典的用途:将一个需要 io.Writer 的函数和一个需要 io.Reader 的函数连接起来。
假设你有一个巨大的结构体要转换成 JSON 并上传到 HTTP 服务器。
json.NewEncoder(w).Encode(obj) 需要一个 Writer。
http.NewRequest("POST", url, body) 的 body 参数需要一个 Reader。
如果不使用 io.Pipe,你可能需要 json.Marshal 得到一个巨大的 []byte(占用大量内存),或者先写到临时文件。使用 io.Pipe 可以实现流式传输。
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
)
func main() {
// 1. 创建一个模拟的 HTTP 服务器接收数据
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// 模拟服务器读取上传的数据
fmt.Println("Server: 开始接收请求体...")
io.Copy(os.Stdout, r.Body) // 将接收到的内容打印到控制台
fmt.Println("\nServer: 接收完毕")
}))
defer ts.Close()
// 2. 准备大量数据
data := map[string]string{
"user": "gopher",
"lang": "golang",
"desc": "This is a streaming upload example",
}
// 3. 使用 io.Pipe 连接 JSON Encoder 和 HTTP Request
pr, pw := io.Pipe()
// 启动一个 Goroutine 进行写入(JSON 编码)
go func() {
// 关闭 Writer 是必须的,通常建议使用 CloseWithError 传递潜在的错误
defer pw.Close()
encoder := json.NewEncoder(pw)
if err := encoder.Encode(data); err != nil {
// 如果编码出错,告诉 Reader 读取时报错
pw.CloseWithError(err)
}
}()
// 4. 创建请求,将 PipeReader 作为 Body 传入
// 注意:这里 pr 是 io.Reader,http.NewRequest 会从中读取数据
req, err := http.NewRequest("POST", ts.URL, pr)
if err != nil {
panic(err)
}
// 5. 发送请求
// http.Client 内部会从 pr 中读取数据,触发上面 Goroutine 的 Encode 写入
fmt.Println("Client: 发送请求...")
resp, err := http.DefaultClient.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
fmt.Printf("Client: 请求完成,状态码: %d\n", resp.StatusCode)
}
常见注意事项
-
必须使用 Goroutine:
因为 Write 会阻塞等待 Read,如果你在同一个 Goroutine 中先写后读(或先读后写),程序会永久卡死(Deadlock)。
错误示范:
pr, pw := io.Pipe()
pw.Write([]byte("foo")) // 永远阻塞在这里,因为没人读
pr.Read(...) // 永远执行不到这里
-
一定要 Close:
- 写入端写完后必须调用
pw.Close()。否则读取端会一直认为还有数据要来,导致 Read 一直阻塞不返回 io.EOF。
- 如果有错误发生,建议使用
pw.CloseWithError(err),这样读取端 Read 时会收到这个具体的错误,而不是默认的 io.EOF。
-
不要在 Pipe 中使用 Seek:
管道是流式的,一旦数据流过去就没了,不能回头(Rewind/Seek)。
总结
io.Pipe 是 Go 语言处理流式数据、桥接 I/O 接口的神器。当你看到一个函数要求 io.Writer 而另一个函数要求 io.Reader,且你想把它们串起来时,io.Pipe 就是最佳选择。