Go io.Pipe 介绍与示例

发布时间: 更新时间: 总字数:2596 阅读时间:6m 作者: IP上海 分享 网址

Go 语言中的 io.Pipe 是一个非常强大且常用的工具,它在内存中创建一个同步的管道,可以将一个 io.Writer(写入端)和一个 io.Reader(读取端)连接起来,实现数据的边写边读,而不需要中间的临时文件或巨大的内存缓冲区。

io.Pipe 的核心概念

调用 io.Pipe() 会返回两个对象:

  • *PipeReader:管道的读取端,实现了 io.Reader 接口。
  • *PipeWriter:管道的写入端,实现了 io.Writer 接口。

关键特性:

  1. 同步与阻塞(Synchronous & Blocking):这是最重要的特性。io.Pipe 内部没有缓冲区(或者说缓冲区极小)。
    • 当你往 PipeWriter 写入数据时,写入操作会阻塞,直到有另一个 goroutine 从 PipeReader 把这些数据读走。
    • 反之亦然,读取操作也会阻塞,直到有数据被写入。
  2. 并发安全(Thread-safe):它的设计初衷就是在不同的 Goroutine 之间传递数据。你必须在不同的 Goroutine 中分别进行读和写,否则会导致死锁(Deadlock)。
  3. 内存高效:因为它不需要把所有数据都缓存起来再读取,所以非常适合处理大数据流(Streaming),能有效防止 OOM 的发生。

示例

Goroutine 通信

这是最简单的用法,展示了如何在两个 Goroutine 之间传递数据。

package main

import (
	"fmt"
	"io"
	"time"
)

func main() {
	// 创建一个管道
	pr, pw := io.Pipe()

	// 开启一个 Goroutine 用于写入数据
	go func() {
		// 模拟耗时操作
		fmt.Println("Writer: 开始写入数据...")
		for i := 1; i <= 3; i++ {
			msg := fmt.Sprintf("数据块 %d\n", i)
			pw.Write([]byte(msg)) // 这里会阻塞,直到 Main Goroutine 读取
			fmt.Printf("Writer: 已写入 %d\n", i)
			time.Sleep(500 * time.Millisecond)
		}

		// 写完后必须关闭 Writer,否则 Reader 会一直等待(导致死锁或泄露)
		pw.Close()
		fmt.Println("Writer: 关闭管道")
	}()

	// 主 Goroutine 用于读取数据
	fmt.Println("Reader: 准备读取...")

	// buffer 用于存储读取的数据
	buf := make([]byte, 1024)
	for {
		n, err := pr.Read(buf)
		if err == io.EOF {
			break // 管道关闭,读取结束
		}
		if err != nil {
			fmt.Println("读取错误:", err)
			break
		}
		fmt.Printf("Reader: 收到 -> %s", string(buf[:n]))
	}

	fmt.Println("Reader: 读取完成")
}

示将 Response 写入管道

这个示例展示了如何将 HTTP 响应的 Body 导入到 io.Pipe 中。虽然在这个简单的例子中看起来有点多此一举(因为可以直接读 Body),但它是理解数据流向的基础。

数据流向: HTTP Response -> Goroutine (io.Copy) -> PipeWriter -> Pipe (内存) -> PipeReader -> Main (读取)

package main

import (
	"fmt"
	"io"
	"net/http"
	"os"
)

func main() {
	// 1. 发起 HTTP 请求
	url := "https://httpbin.org/stream/5" // 返回 5 行流式 JSON 数据
	resp, err := http.Get(url)
	if err != nil {
		panic(err)
	}
	// 注意:这里不要立即关闭 resp.Body,要在读取完后再关,或者在 Goroutine 中关

	// 2. 创建管道
	pr, pw := io.Pipe()

	// 3. 启动 Goroutine:充当 "搬运工"
	go func() {
		// 任务结束后关闭 Writer,否则 Reader 会死锁
		defer pw.Close()
		defer resp.Body.Close()

		// 将 HTTP Body 的内容拷贝到 PipeWriter
		// 这一步会阻塞,直到 Main Goroutine 从 pr 读取数据
		_, err := io.Copy(pw, resp.Body)
		if err != nil {
			// 如果下载中断,将错误传递给 Reader
			pw.CloseWithError(err)
		}
	}()

	// 4. 主程:从 PipeReader 读取数据
	// 此时 pr 就像是一个本地的数据流,其实数据源来自网络
	fmt.Println("开始从管道读取 HTTP 数据...")
	if _, err := io.Copy(os.Stdout, pr); err != nil {
		fmt.Println("读取出错:", err)
	}
	fmt.Println("\n读取完成")
}

边下载边压缩

这是 io.Pipe 真正的用武之地。假设你要下载一个大文件,你想把它存成 .gz 压缩包。

  • 如果不由 io.Pipe:你需要先下载下来(占磁盘/内存),然后再压缩。
  • 使用 io.Pipe:你可以把“压缩”这个动作串联在下载和保存之间

数据流向: HTTP Response -> Gzip Writer -> PipeWriter -> Pipe -> PipeReader -> File

package main

import (
	"compress/gzip"
	"fmt"
	"io"
	"net/http"
	"os"
)

func main() {
	// 1. 模拟下载源
	url := "https://www.xiexianbin.cn"
	resp, err := http.Get(url)
	if err != nil {
		panic(err)
	}

	// 2. 创建管道
	// 我们希望最终得到一个 Reader,它读出来的就是压缩后的数据
	pr, pw := io.Pipe()

	// 3. 启动 Goroutine 进行 "下载 + 压缩 + 写入管道"
	go func() {
		defer resp.Body.Close()
		defer pw.Close() // 必须关闭 PipeWriter

		// 创建一个 gzip writer,它输出的目标是 PipeWriter
		gw := gzip.NewWriter(pw)
		defer gw.Close() // 必须关闭 Gzip Writer 以刷新缓冲区

		// 核心魔法:
		// Copy 读 HTTP Body -> 写入 gw (进行压缩) -> gw 写入 pw -> 管道
		_, err := io.Copy(gw, resp.Body)
		if err != nil {
			pw.CloseWithError(err)
		}
	}()

	// 4. 主程:消费 PipeReader
	// 此时 pr 读取到的数据已经是被 Gzip 压缩过的二进制流了
	outFile, _ := os.Create("baidu_index.html.gz")
	defer outFile.Close()

	fmt.Println("开始下载并实时压缩...")
	n, err := io.Copy(outFile, pr)
	if err != nil {
		panic(err)
	}

	fmt.Printf("处理完成!已写入文件大小: %d 字节 (Gzip格式)\n", n)
}

当你需要从 http.Response 读取数据,但你的后续处理逻辑需要一个 Reader 接口,且中间夹杂着原本输出为 Writer 的转换操作(如压缩、加密、图片转码)时,io.Pipe 是连接这两者的完美桥梁。

连接不兼容的接口 (Reader vs Writer)

这是 io.Pipe 最经典的用途:将一个需要 io.Writer 的函数和一个需要 io.Reader 的函数连接起来。

假设你有一个巨大的结构体要转换成 JSON 并上传到 HTTP 服务器。

  • json.NewEncoder(w).Encode(obj) 需要一个 Writer
  • http.NewRequest("POST", url, body)body 参数需要一个 Reader

如果不使用 io.Pipe,你可能需要 json.Marshal 得到一个巨大的 []byte(占用大量内存),或者先写到临时文件。使用 io.Pipe 可以实现流式传输。

package main

import (
	"encoding/json"
	"fmt"
	"io"
	"net/http"
	"net/http/httptest"
	"os"
)

func main() {
	// 1. 创建一个模拟的 HTTP 服务器接收数据
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		// 模拟服务器读取上传的数据
		fmt.Println("Server: 开始接收请求体...")
		io.Copy(os.Stdout, r.Body) // 将接收到的内容打印到控制台
		fmt.Println("\nServer: 接收完毕")
	}))
	defer ts.Close()

	// 2. 准备大量数据
	data := map[string]string{
		"user": "gopher",
		"lang": "golang",
		"desc": "This is a streaming upload example",
	}

	// 3. 使用 io.Pipe 连接 JSON Encoder 和 HTTP Request
	pr, pw := io.Pipe()

	// 启动一个 Goroutine 进行写入(JSON 编码)
	go func() {
		// 关闭 Writer 是必须的,通常建议使用 CloseWithError 传递潜在的错误
		defer pw.Close()

		encoder := json.NewEncoder(pw)
		if err := encoder.Encode(data); err != nil {
			// 如果编码出错,告诉 Reader 读取时报错
			pw.CloseWithError(err)
		}
	}()

	// 4. 创建请求,将 PipeReader 作为 Body 传入
	// 注意:这里 pr 是 io.Reader,http.NewRequest 会从中读取数据
	req, err := http.NewRequest("POST", ts.URL, pr)
	if err != nil {
		panic(err)
	}

	// 5. 发送请求
	// http.Client 内部会从 pr 中读取数据,触发上面 Goroutine 的 Encode 写入
	fmt.Println("Client: 发送请求...")
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()

	fmt.Printf("Client: 请求完成,状态码: %d\n", resp.StatusCode)
}

常见注意事项

  1. 必须使用 Goroutine: 因为 Write 会阻塞等待 Read,如果你在同一个 Goroutine 中先写后读(或先读后写),程序会永久卡死(Deadlock)。 错误示范:

    pr, pw := io.Pipe()
    pw.Write([]byte("foo")) // 永远阻塞在这里,因为没人读
    pr.Read(...)            // 永远执行不到这里
    
  2. 一定要 Close

    • 写入端写完后必须调用 pw.Close()。否则读取端会一直认为还有数据要来,导致 Read 一直阻塞不返回 io.EOF
    • 如果有错误发生,建议使用 pw.CloseWithError(err),这样读取端 Read 时会收到这个具体的错误,而不是默认的 io.EOF
  3. 不要在 Pipe 中使用 Seek: 管道是流式的,一旦数据流过去就没了,不能回头(Rewind/Seek)。

总结

io.Pipe 是 Go 语言处理流式数据、桥接 I/O 接口的神器。当你看到一个函数要求 io.Writer 而另一个函数要求 io.Reader,且你想把它们串起来时,io.Pipe 就是最佳选择。

本文总阅读量 次 本站总访问量 次 本站总访客数
Home Archives Categories Tags Statistics