Go 并发模式:Channel 与 Context
Go 语言从诞生之初就以并发编程为核心设计理念。"Don't communicate by sharing memory; share memory by communicating"——这句话不是口号,而是 Go 并发哲学的全部。Goroutine 让并发变得廉价,Channel 让通信变得安全,Context 让取消变得优雅。
本文从底层调度模型到上层并发模式,系统梳理 Go 并发编程的核心知识。
range over func)需要 Go 1.23+。代码示例均经过测试。
一、Go 并发模型概述:CSP vs Actor
并发编程领域有两大主流模型:
| 特性 | CSP(Communicating Sequential Processes) | Actor Model |
|---|---|---|
| 代表语言 | Go | Erlang、Akka(Scala/Java) |
| 通信方式 | Channel(通道) | 消息邮箱(Mailbox) |
| 耦合度 | 松耦合——发送方不需要知道接收方是谁 | 紧耦合——需要知道目标 Actor 的地址 |
| 调度单位 | Goroutine(M:N 调度) | Actor(通常 1:1 线程) |
| 状态管理 | 共享状态通过 Channel 传递 | 状态封装在 Actor 内部 |
Go 选择 CSP 模型的核心原因:简单。Channel 是一等公民,和变量一样可以传递、存储、关闭。你不需要定义消息协议、不需要管理 Actor 生命周期,只需要 make(chan T) 就能开始通信。
// CSP 的核心思想:通过 Channel 通信,而不是共享内存
// ❌ 共享内存方式
var mu sync.Mutex
var counter int
func inc() { mu.Lock(); counter++; mu.Unlock() }
// ✅ Channel 方式
ch := make(chan int)
go func() { ch <- 1 }() // 发送
val := <-ch // 接收
二、Goroutine 基础与 GMP 调度模型
Goroutine:轻量级用户态线程
一个 Goroutine 初始栈仅 2KB(可动态扩容到 1GB),创建成本远低于 OS 线程(默认 1MB 栈)。单机轻松跑数十万 Goroutine:
// 启动 10 万个 Goroutine 毫无压力
for i := 0; i < 100000; i++ {
go func(id int) {
time.Sleep(10 * time.Second)
}(i)
}
// 内存占用约 200MB,而 10 万个 OS 线程需要 ~100GB 栈空间
GMP 调度模型
Go 运行时使用 GMP 模型调度 Goroutine,这是理解并发性能的关键:
| 组件 | 全称 | 说明 |
|---|---|---|
| G | Goroutine | 用户态协程,包含栈、指令指针、状态等 |
| M | Machine | 操作系统线程,真正执行代码的载体 |
| P | Processor | 逻辑处理器,持有本地 G 队列,数量 = GOMAXPROCS |
调度流程:
1. 每个 P 有一个本地 G 队列(256 个),减少全局锁竞争
2. M 必须绑定一个 P 才能执行 G
3. P 的本地队列空时,从全局队列或其他 P 偷取(Work Stealing)
4. G 阻塞(如系统调用)时,M 释放 P,P 绑定新的 M 继续执行
5. G 阻塞恢复后,尝试获取空闲 P,或放入全局队列等待
关键优势:
- 本地队列 → 无锁操作,快
- Work Stealing → 负载均衡
- Hand Off → 阻塞不影响其他 G
runtime.GOMAXPROCS(1) 只在纯 CPU 密集型+需要严格顺序的场景使用。I/O 密集型程序即使单核也能高效运行大量 Goroutine。
三、Channel 模式
无缓冲 Channel:同步通道
无缓冲 Channel 的发送和接收必须同时就绪,否则阻塞。本质是同步握手:
ch := make(chan int) // 无缓冲
// 发送方阻塞,直到有人接收
go func() { ch <- 42 }()
// 接收方阻塞,直到有人发送
val := <-ch // val == 42
ch := make(chan int)
ch <- 1 // 💀 死锁!没有其他 Goroutine 来接收
val := <-ch
有缓冲 Channel:异步通道
有缓冲 Channel 在缓冲区未满时发送不阻塞,未空时接收不阻塞:
ch := make(chan int, 3) // 缓冲区容量 3
ch <- 1 // 不阻塞,缓冲区:[1]
ch <- 2 // 不阻塞,缓冲区:[1, 2]
ch <- 3 // 不阻塞,缓冲区:[1, 2, 3]
// ch <- 4 // 阻塞!缓冲区已满
val := <-ch // val == 1,缓冲区:[2, 3]
缓冲区大小选择
| 容量 | 适用场景 | 说明 |
|---|---|---|
0 |
同步信号、严格握手 | 发送方必须等接收方就绪 |
1 |
最新状态、单次通知 | 只保留最近一个值 |
N(业务值) |
生产者-消费者、Worker Pool | 缓解生产消费速度差 |
单向 Channel:类型安全
Go 可以将 Channel 声明为只发送或只接收,在函数签名中强制约束:
// 生产者:只能发送
func producer(ch chan<- int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch) // 发送方负责关闭
}
// 消费者:只能接收
func consumer(ch <-chan int) {
for val := range ch { // range 自动在 close 后退出
fmt.Println(val)
}
}
func main() {
ch := make(chan int, 5)
go producer(ch)
consumer(ch)
}
四、并发模式实战
1. Fan-Out / Fan-In
Fan-Out:将一个 Channel 的数据分发给多个 Goroutine 并行处理。Fan-In:将多个 Channel 的结果合并到一个 Channel。
// Fan-Out:多个 worker 消费同一个 channel
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(time.Millisecond * 100) // 模拟耗时
results <- job * 2
}
}
func main() {
jobs := make(chan int, 10)
results := make(chan int, 10)
// Fan-Out:3 个 worker 并行消费
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 发送任务
for j := 1; j <= 9; j++ {
jobs <- j
}
close(jobs)
// Fan-In:收集所有结果
for r := 1; r <= 9; r++ {
fmt.Printf("Result: %d\n", <-results)
}
}
runtime.NumCPU()。
2. Pipeline
Pipeline 模式将数据处理拆成多个阶段(Stage),每个阶段是一个 Goroutine,通过 Channel 串联:
// Stage 1:生成数据
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// Stage 2:平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// Stage 3:打印
func print(in <-chan int) {
for n := range in {
fmt.Println(n)
}
}
func main() {
// 组装 Pipeline:generate → square → print
ch := generate(2, 3, 4)
out := square(ch)
print(out)
// 输出:4, 9, 16
}
Pipeline 可以和 Fan-Out 结合——某个阶段启动多个 Goroutine 并行处理:
// 并行 square 阶段
func merge(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// Fan-Out square + Fan-In merge
in := generate(2, 3, 4, 5, 6)
ch1 := square(in) // 并行处理
ch2 := square(in) // 并行处理
for n := range merge(ch1, ch2) {
fmt.Println(n)
}
3. Worker Pool
当任务量很大且需要限制并发数时,Worker Pool 是首选模式:
type Job struct {
ID int
Data string
}
type Result struct {
Job Job
Value string
Err error
}
func workerPool(jobs <-chan Job, results chan<- Result, workerCount int) {
var wg sync.WaitGroup
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for job := range jobs {
val, err := process(job)
results <- Result{Job: job, Value: val, Err: err}
}
}(i)
}
go func() {
wg.Wait()
close(results)
}()
}
func process(job Job) (string, error) {
time.Sleep(100 * time.Millisecond) // 模拟处理
return fmt.Sprintf("processed-%d", job.ID), nil
}
func main() {
jobs := make(chan Job, 100)
results := make(chan Result, 100)
// 启动 5 个 worker
workerPool(jobs, results, 5)
// 提交 20 个任务
go func() {
for i := 0; i < 20; i++ {
jobs <- Job{ID: i, Data: fmt.Sprintf("task-%d", i)}
}
close(jobs)
}()
// 收集结果
for result := range results {
if result.Err != nil {
fmt.Printf("Job %d failed: %v\n", result.Job.ID, result.Err)
} else {
fmt.Printf("Job %d: %s\n", result.Job.ID, result.Value)
}
}
}
4. Select 多路复用
select 同时监听多个 Channel 操作,类似网络编程中的 select/epoll:
// 基本用法:谁先就绪执行谁
select {
case msg := <-ch1:
fmt.Println("Received from ch1:", msg)
case msg := <-ch2:
fmt.Println("Received from ch2:", msg)
case ch3 <- 42:
fmt.Println("Sent to ch3")
default:
fmt.Println("No channel ready") // 非阻塞模式
}
经典模式:超时控制
select {
case result := <-ch:
fmt.Println("Got result:", result)
case <-time.After(3 * time.Second):
fmt.Println("Timeout!")
}
经典模式:退出信号
func doWork(done <-chan struct{}) {
for {
select {
case <-done:
fmt.Println("Received stop signal")
return
default:
// 模拟工作
time.Sleep(500 * time.Millisecond)
fmt.Println("Working...")
}
}
}
五、Context 包详解
Context 是 Go 并发的「取消传播机制」。一个请求链路上的所有 Goroutine 共享同一个 Context,取消时级联通知。
1. WithCancel:手动取消
func operation(ctx context.Context) error {
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
return ctx.Err() // context canceled
default:
time.Sleep(500 * time.Millisecond)
fmt.Printf("Step %d completed\n", i)
}
}
return nil
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(2 * time.Second)
cancel() // 2 秒后取消
}()
err := operation(ctx)
fmt.Println("Operation result:", err) // context canceled
}
2. WithTimeout:超时自动取消
// 3 秒超时
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel() // 习惯:始终 defer cancel,即使超时也会释放资源
result, err := doSlowOperation(ctx)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
fmt.Println("Operation timed out")
}
}
3. WithDeadline:绝对时间取消
// 在指定时间点取消
deadline := time.Date(2026, 6, 27, 15, 0, 0, 0, time.Local)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
if deadline.Sub(time.Now()) < 0 {
fmt.Println("Already past deadline!")
return
}
4. WithValue:请求级数据传递
type contextKey string
const (
requestIDKey contextKey = "requestID"
userIDKey contextKey = "userID"
)
// 设置值
ctx := context.WithValue(context.Background(), requestIDKey, "req-123")
ctx = context.WithValue(ctx, userIDKey, 42)
// 获取值
if reqID, ok := ctx.Value(requestIDKey).(string); ok {
fmt.Println("Request ID:", reqID)
}
Context 使用规范
| 规范 | 说明 |
|---|---|
| Context 作为第一个参数 | 函数签名 func DoSomething(ctx context.Context, ...) |
| 不要传递 nil Context | 不确定时用 context.TODO() |
| 始终 defer cancel() | 即使不手动取消,也要释放资源 |
| 不要传递可选参数 | WithValue 只放请求元数据 |
| 派生而非覆盖 | 子 Context 继承父 Context 的取消和值 |
六、sync 包实战
1. WaitGroup:等待一组 Goroutine
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1) // 在 goroutine 外 Add
go func(id int) {
defer wg.Done()
fmt.Printf("Worker %d done\n", id)
}(i)
}
wg.Wait() // 阻塞直到所有 Done
fmt.Println("All workers finished")
1.
Add 必须在 Goroutine 外调用,否则可能 Wait 先于 Add 执行2.
Add 和 Done 必须配对,否则 Wait 永远阻塞或 panic3. 不能复制 WaitGroup——传指针或用引用
2. Once:只执行一次
var once sync.Once
var instance *Config
func GetConfig() *Config {
once.Do(func() {
fmt.Println("Initializing config...")
instance = loadConfig()
})
return instance
}
// 无论多少 Goroutine 并发调用 GetConfig,初始化只执行一次
3. Mutex / RWMutex:互斥锁
type SafeCounter struct {
mu sync.RWMutex
m map[string]int
}
func (c *SafeCounter) Inc(key string) {
c.mu.Lock() // 写锁
defer c.mu.Unlock()
c.m[key]++
}
func (c *SafeCounter) Get(key string) int {
c.mu.RLock() // 读锁,多个读者可并发
defer c.mu.RUnlock()
return c.m[key]
}
RWMutex 可以显著提升并发读性能。否则 Mutex 更简单、开销更低。
4. Pool:对象池
var bufPool = sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}
func processRequest(data []byte) {
buf := bufPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset()
bufPool.Put(buf) // 用完归还
}()
buf.Write(data)
// ... 使用 buf 处理数据
}
七、常见并发陷阱
陷阱 1:Goroutine 泄漏
Goroutine 如果永远无法退出,就会泄漏——占用的栈空间、Channel 引用、闭包变量都不会被 GC。
// ❌ 泄漏:如果没有人读取 ch,goroutine 永远阻塞在发送
func leak() <-chan int {
ch := make(chan int)
go func() {
result := expensiveComputation()
ch <- result // 如果没人接收,永远阻塞!
}()
return ch
}
// ✅ 修复:用带缓冲的 channel 或 context 取消
func noLeak(ctx context.Context) <-chan int {
ch := make(chan int, 1) // 缓冲 1,发送不阻塞
go func() {
select {
case ch <- expensiveComputation():
case <-ctx.Done(): // 超时或取消时退出
}
}()
return ch
}
runtime.NumGoroutine() 在测试前后对比,或用 uber-go/goleak 库自动检测。
陷阱 2:Channel 死锁
// ❌ 死锁:无缓冲 channel 在同一 goroutine 发送和接收
func main() {
ch := make(chan int)
ch <- 1 // 阻塞,等待接收
<-ch // 永远执行不到
}
// ❌ 死锁:所有 goroutine 都阻塞
func main() {
ch := make(chan int)
go func() { ch <- 1 }()
go func() { ch <- 2 }() // 如果没人接收,2 个 goroutine 都阻塞
// main goroutine 退出 → 程序结束,但如果没有退出就是死锁
}
// ✅ 修复:使用缓冲 channel 或确保有接收方
func main() {
ch := make(chan int, 2) // 缓冲区够用
ch <- 1
ch <- 2
fmt.Println(<-ch, <-ch)
}
陷阱 3:闭包捕获问题
// ❌ 所有 goroutine 输出 10——循环变量 i 被闭包共享
for i := 0; i < 10; i++ {
go func() {
fmt.Println(i) // 捕获的是变量 i 的引用
}()
}
// ✅ 修复方式 1:参数传递(Go 1.21 之前推荐)
for i := 0; i < 10; i++ {
go func(n int) {
fmt.Println(n) // n 是值拷贝
}(i)
}
// ✅ 修复方式 2:Go 1.22+ 循环变量每次迭代重新创建
// Go 1.22 改变了 for 循环语义:每次迭代创建新变量
// 以下代码在 Go 1.22+ 中正确输出 0-9
for i := 0; i < 10; i++ {
go func() {
fmt.Println(i) // Go 1.22+: 每个 i 是独立变量
}()
}
for 循环变量从「每次迭代复用」改为「每次迭代新建」。如果你还在用 Go 1.21 或更早版本,必须用参数传递方式避免闭包捕获问题。
八、并发测试
1. 用 race detector 检测数据竞争
// 运行测试时加 -race 标志
// $ go test -race ./...
// 典型的数据竞争
var counter int
func TestRace(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // ⚠️ 数据竞争!多个 goroutine 并发写
}()
}
wg.Wait()
// -race 会报:DATA RACE at counter++
}
2. 测试并发安全性
func TestConcurrentMapAccess(t *testing.T) {
var mu sync.Mutex
m := make(map[int]int)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
mu.Lock()
m[n] = n * 2
mu.Unlock()
}(i)
}
wg.Wait()
if len(m) != 100 {
t.Errorf("Expected 100 entries, got %d", len(m))
}
}
3. 测试 Goroutine 泄漏
// 使用 goleak 检测
import "go.uber.org/goleak"
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
func TestNoLeak(t *testing.T) {
defer goleak.VerifyNone(t)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
result := doWork(ctx)
_ = result
}
4. 压力测试并发性能
func BenchmarkWorkerPool(b *testing.B) {
jobs := make(chan int, b.N)
results := make(chan int, b.N)
// 启动 worker pool
var wg sync.WaitGroup
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobs {
results <- job * 2
}
}()
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
jobs <- i
}
close(jobs)
go func() {
wg.Wait()
close(results)
}()
for range results {
// drain
}
}
并发测试速查
| 测试目标 | 工具/方法 | 命令 |
|---|---|---|
| 数据竞争 | race detector | go test -race ./... |
| Goroutine 泄漏 | uber-go/goleak | goleak.VerifyNone(t) |
| 死锁检测 | runtime + timeout | 结合 Context 超时 |
| 并发性能 | benchmark | go test -bench=. -benchmem |
| 覆盖率 | cover | go test -cover ./... |
Goroutine 轻量并发、Channel 安全通信、Context 优雅取消。四类模式——Fan-Out/Fan-In 扇分扇合、Pipeline 流水线、Worker Pool 工作池、Select 多路复用。测试必加 -race,始终 defer cancel(),闭包传参不传变量。