Go 并发模式:Channel 与 Context

Go 语言从诞生之初就以并发编程为核心设计理念。"Don't communicate by sharing memory; share memory by communicating"——这句话不是口号,而是 Go 并发哲学的全部。Goroutine 让并发变得廉价,Channel 让通信变得安全,Context 让取消变得优雅。

本文从底层调度模型到上层并发模式,系统梳理 Go 并发编程的核心知识。

适用版本:本文基于 Go 1.22+ 编写,部分特性(如 range over func)需要 Go 1.23+。代码示例均经过测试。

一、Go 并发模型概述:CSP vs Actor

并发编程领域有两大主流模型:

特性 CSP(Communicating Sequential Processes) Actor Model
代表语言 Go Erlang、Akka(Scala/Java)
通信方式 Channel(通道) 消息邮箱(Mailbox)
耦合度 松耦合——发送方不需要知道接收方是谁 紧耦合——需要知道目标 Actor 的地址
调度单位 Goroutine(M:N 调度) Actor(通常 1:1 线程)
状态管理 共享状态通过 Channel 传递 状态封装在 Actor 内部

Go 选择 CSP 模型的核心原因:简单。Channel 是一等公民,和变量一样可以传递、存储、关闭。你不需要定义消息协议、不需要管理 Actor 生命周期,只需要 make(chan T) 就能开始通信。

// CSP 的核心思想:通过 Channel 通信,而不是共享内存
// ❌ 共享内存方式
var mu sync.Mutex
var counter int
func inc() { mu.Lock(); counter++; mu.Unlock() }

// ✅ Channel 方式
ch := make(chan int)
go func() { ch <- 1 }()  // 发送
val := <-ch              // 接收

二、Goroutine 基础与 GMP 调度模型

Goroutine:轻量级用户态线程

一个 Goroutine 初始栈仅 2KB(可动态扩容到 1GB),创建成本远低于 OS 线程(默认 1MB 栈)。单机轻松跑数十万 Goroutine:

// 启动 10 万个 Goroutine 毫无压力
for i := 0; i < 100000; i++ {
    go func(id int) {
        time.Sleep(10 * time.Second)
    }(i)
}
// 内存占用约 200MB,而 10 万个 OS 线程需要 ~100GB 栈空间

GMP 调度模型

Go 运行时使用 GMP 模型调度 Goroutine,这是理解并发性能的关键:

组件 全称 说明
G Goroutine 用户态协程,包含栈、指令指针、状态等
M Machine 操作系统线程,真正执行代码的载体
P Processor 逻辑处理器,持有本地 G 队列,数量 = GOMAXPROCS

调度流程:

1. 每个 P 有一个本地 G 队列(256 个),减少全局锁竞争
2. M 必须绑定一个 P 才能执行 G
3. P 的本地队列空时,从全局队列或其他 P 偷取(Work Stealing)
4. G 阻塞(如系统调用)时,M 释放 P,P 绑定新的 M 继续执行
5. G 阻塞恢复后,尝试获取空闲 P,或放入全局队列等待

关键优势:
- 本地队列 → 无锁操作,快
- Work Stealing → 负载均衡
- Hand Off → 阻塞不影响其他 G
GOMAXPROCS 该设多少?默认等于 CPU 核数,大多数情况不需要改。runtime.GOMAXPROCS(1) 只在纯 CPU 密集型+需要严格顺序的场景使用。I/O 密集型程序即使单核也能高效运行大量 Goroutine。

三、Channel 模式

无缓冲 Channel:同步通道

无缓冲 Channel 的发送和接收必须同时就绪,否则阻塞。本质是同步握手

ch := make(chan int)  // 无缓冲

// 发送方阻塞,直到有人接收
go func() { ch <- 42 }()

// 接收方阻塞,直到有人发送
val := <-ch  // val == 42
死锁陷阱:在同一个 Goroutine 中对无缓冲 Channel 先发送再接收,必然死锁——发送等接收,接收永远到不了。

ch := make(chan int)
ch <- 1 // 💀 死锁!没有其他 Goroutine 来接收
val := <-ch

有缓冲 Channel:异步通道

有缓冲 Channel 在缓冲区未满时发送不阻塞,未空时接收不阻塞:

ch := make(chan int, 3)  // 缓冲区容量 3

ch <- 1  // 不阻塞,缓冲区:[1]
ch <- 2  // 不阻塞,缓冲区:[1, 2]
ch <- 3  // 不阻塞,缓冲区:[1, 2, 3]
// ch <- 4  // 阻塞!缓冲区已满

val := <-ch  // val == 1,缓冲区:[2, 3]

缓冲区大小选择

容量 适用场景 说明
0 同步信号、严格握手 发送方必须等接收方就绪
1 最新状态、单次通知 只保留最近一个值
N(业务值) 生产者-消费者、Worker Pool 缓解生产消费速度差

单向 Channel:类型安全

Go 可以将 Channel 声明为只发送或只接收,在函数签名中强制约束:

// 生产者:只能发送
func producer(ch chan<- int) {
    for i := 0; i < 10; i++ {
        ch <- i
    }
    close(ch)  // 发送方负责关闭
}

// 消费者:只能接收
func consumer(ch <-chan int) {
    for val := range ch {  // range 自动在 close 后退出
        fmt.Println(val)
    }
}

func main() {
    ch := make(chan int, 5)
    go producer(ch)
    consumer(ch)
}
谁关闭 Channel?原则:只由发送方关闭,接收方永远不要关闭。关闭一个已关闭的 Channel 会 panic,向已关闭的 Channel 发送也会 panic。单向 Channel 的类型约束天然保证了这一点。

四、并发模式实战

1. Fan-Out / Fan-In

Fan-Out:将一个 Channel 的数据分发给多个 Goroutine 并行处理。Fan-In:将多个 Channel 的结果合并到一个 Channel。

// Fan-Out:多个 worker 消费同一个 channel
func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        time.Sleep(time.Millisecond * 100)  // 模拟耗时
        results <- job * 2
    }
}

func main() {
    jobs := make(chan int, 10)
    results := make(chan int, 10)

    // Fan-Out:3 个 worker 并行消费
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    // 发送任务
    for j := 1; j <= 9; j++ {
        jobs <- j
    }
    close(jobs)

    // Fan-In:收集所有结果
    for r := 1; r <= 9; r++ {
        fmt.Printf("Result: %d\n", <-results)
    }
}
Fan-Out 的最佳场景:任务之间无依赖、CPU 密集型计算、需要并行加速。Worker 数量通常设为 runtime.NumCPU()

2. Pipeline

Pipeline 模式将数据处理拆成多个阶段(Stage),每个阶段是一个 Goroutine,通过 Channel 串联:

// Stage 1:生成数据
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// Stage 2:平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// Stage 3:打印
func print(in <-chan int) {
    for n := range in {
        fmt.Println(n)
    }
}

func main() {
    // 组装 Pipeline:generate → square → print
    ch := generate(2, 3, 4)
    out := square(ch)
    print(out)
    // 输出:4, 9, 16
}

Pipeline 可以和 Fan-Out 结合——某个阶段启动多个 Goroutine 并行处理:

// 并行 square 阶段
func merge(channels ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            for n := range c {
                out <- n
            }
            wg.Done()
        }(ch)
    }
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

// Fan-Out square + Fan-In merge
in := generate(2, 3, 4, 5, 6)
ch1 := square(in)  // 并行处理
ch2 := square(in)  // 并行处理
for n := range merge(ch1, ch2) {
    fmt.Println(n)
}

3. Worker Pool

当任务量很大且需要限制并发数时,Worker Pool 是首选模式:

type Job struct {
    ID    int
    Data  string
}

type Result struct {
    Job   Job
    Value string
    Err   error
}

func workerPool(jobs <-chan Job, results chan<- Result, workerCount int) {
    var wg sync.WaitGroup
    for i := 0; i < workerCount; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for job := range jobs {
                val, err := process(job)
                results <- Result{Job: job, Value: val, Err: err}
            }
        }(i)
    }
    go func() {
        wg.Wait()
        close(results)
    }()
}

func process(job Job) (string, error) {
    time.Sleep(100 * time.Millisecond)  // 模拟处理
    return fmt.Sprintf("processed-%d", job.ID), nil
}

func main() {
    jobs := make(chan Job, 100)
    results := make(chan Result, 100)

    // 启动 5 个 worker
    workerPool(jobs, results, 5)

    // 提交 20 个任务
    go func() {
        for i := 0; i < 20; i++ {
            jobs <- Job{ID: i, Data: fmt.Sprintf("task-%d", i)}
        }
        close(jobs)
    }()

    // 收集结果
    for result := range results {
        if result.Err != nil {
            fmt.Printf("Job %d failed: %v\n", result.Job.ID, result.Err)
        } else {
            fmt.Printf("Job %d: %s\n", result.Job.ID, result.Value)
        }
    }
}

4. Select 多路复用

select 同时监听多个 Channel 操作,类似网络编程中的 select/epoll:

// 基本用法:谁先就绪执行谁
select {
case msg := <-ch1:
    fmt.Println("Received from ch1:", msg)
case msg := <-ch2:
    fmt.Println("Received from ch2:", msg)
case ch3 <- 42:
    fmt.Println("Sent to ch3")
default:
    fmt.Println("No channel ready")  // 非阻塞模式
}

经典模式:超时控制

select {
case result := <-ch:
    fmt.Println("Got result:", result)
case <-time.After(3 * time.Second):
    fmt.Println("Timeout!")
}

经典模式:退出信号

func doWork(done <-chan struct{}) {
    for {
        select {
        case <-done:
            fmt.Println("Received stop signal")
            return
        default:
            // 模拟工作
            time.Sleep(500 * time.Millisecond)
            fmt.Println("Working...")
        }
    }
}
select 的随机性:当多个 case 同时就绪时,Go 会随机选择一个执行。这是有意设计——避免某个 Channel 饥饿。如果你需要优先级,需要用额外的逻辑实现。

五、Context 包详解

Context 是 Go 并发的「取消传播机制」。一个请求链路上的所有 Goroutine 共享同一个 Context,取消时级联通知。

1. WithCancel:手动取消

func operation(ctx context.Context) error {
    for i := 0; i < 10; i++ {
        select {
        case <-ctx.Done():
            return ctx.Err()  // context canceled
        default:
            time.Sleep(500 * time.Millisecond)
            fmt.Printf("Step %d completed\n", i)
        }
    }
    return nil
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    go func() {
        time.Sleep(2 * time.Second)
        cancel()  // 2 秒后取消
    }()

    err := operation(ctx)
    fmt.Println("Operation result:", err)  // context canceled
}

2. WithTimeout:超时自动取消

// 3 秒超时
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()  // 习惯:始终 defer cancel,即使超时也会释放资源

result, err := doSlowOperation(ctx)
if err != nil {
    if ctx.Err() == context.DeadlineExceeded {
        fmt.Println("Operation timed out")
    }
}

3. WithDeadline:绝对时间取消

// 在指定时间点取消
deadline := time.Date(2026, 6, 27, 15, 0, 0, 0, time.Local)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()

if deadline.Sub(time.Now()) < 0 {
    fmt.Println("Already past deadline!")
    return
}

4. WithValue:请求级数据传递

type contextKey string

const (
    requestIDKey contextKey = "requestID"
    userIDKey    contextKey = "userID"
)

// 设置值
ctx := context.WithValue(context.Background(), requestIDKey, "req-123")
ctx = context.WithValue(ctx, userIDKey, 42)

// 获取值
if reqID, ok := ctx.Value(requestIDKey).(string); ok {
    fmt.Println("Request ID:", reqID)
}
WithValue 不是全局变量!Context 中的值应该用于请求级别的元数据(Request ID、Trace ID、认证信息),不要用来传递业务参数、数据库连接等。Key 必须用自定义类型,避免包间冲突。

Context 使用规范

规范 说明
Context 作为第一个参数 函数签名 func DoSomething(ctx context.Context, ...)
不要传递 nil Context 不确定时用 context.TODO()
始终 defer cancel() 即使不手动取消,也要释放资源
不要传递可选参数 WithValue 只放请求元数据
派生而非覆盖 子 Context 继承父 Context 的取消和值

六、sync 包实战

1. WaitGroup:等待一组 Goroutine

var wg sync.WaitGroup

for i := 0; i < 5; i++ {
    wg.Add(1)  // 在 goroutine 外 Add
    go func(id int) {
        defer wg.Done()
        fmt.Printf("Worker %d done\n", id)
    }(i)
}

wg.Wait()  // 阻塞直到所有 Done
fmt.Println("All workers finished")
WaitGroup 三条铁律
1. Add 必须在 Goroutine 外调用,否则可能 Wait 先于 Add 执行
2. AddDone 必须配对,否则 Wait 永远阻塞或 panic
3. 不能复制 WaitGroup——传指针或用引用

2. Once:只执行一次

var once sync.Once
var instance *Config

func GetConfig() *Config {
    once.Do(func() {
        fmt.Println("Initializing config...")
        instance = loadConfig()
    })
    return instance
}

// 无论多少 Goroutine 并发调用 GetConfig,初始化只执行一次

3. Mutex / RWMutex:互斥锁

type SafeCounter struct {
    mu sync.RWMutex
    m  map[string]int
}

func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()         // 写锁
    defer c.mu.Unlock()
    c.m[key]++
}

func (c *SafeCounter) Get(key string) int {
    c.mu.RLock()        // 读锁,多个读者可并发
    defer c.mu.RUnlock()
    return c.m[key]
}
选 Mutex 还是 RWMutex?如果读操作远多于写操作(>90%),用 RWMutex 可以显著提升并发读性能。否则 Mutex 更简单、开销更低。

4. Pool:对象池

var bufPool = sync.Pool{
    New: func() any {
        return new(bytes.Buffer)
    },
}

func processRequest(data []byte) {
    buf := bufPool.Get().(*bytes.Buffer)
    defer func() {
        buf.Reset()
        bufPool.Put(buf)  // 用完归还
    }()

    buf.Write(data)
    // ... 使用 buf 处理数据
}
Pool 不保证对象存活:GC 时 Pool 中的对象可能被回收。Pool 适用于临时对象复用(如 Buffer、临时切片),不适合做连接池或缓存。

七、常见并发陷阱

陷阱 1:Goroutine 泄漏

Goroutine 如果永远无法退出,就会泄漏——占用的栈空间、Channel 引用、闭包变量都不会被 GC。

// ❌ 泄漏:如果没有人读取 ch,goroutine 永远阻塞在发送
func leak() <-chan int {
    ch := make(chan int)
    go func() {
        result := expensiveComputation()
        ch <- result  // 如果没人接收,永远阻塞!
    }()
    return ch
}

// ✅ 修复:用带缓冲的 channel 或 context 取消
func noLeak(ctx context.Context) <-chan int {
    ch := make(chan int, 1)  // 缓冲 1,发送不阻塞
    go func() {
        select {
        case ch <- expensiveComputation():
        case <-ctx.Done():  // 超时或取消时退出
        }
    }()
    return ch
}
检测 Goroutine 泄漏:使用 runtime.NumGoroutine() 在测试前后对比,或用 uber-go/goleak 库自动检测。

陷阱 2:Channel 死锁

// ❌ 死锁:无缓冲 channel 在同一 goroutine 发送和接收
func main() {
    ch := make(chan int)
    ch <- 1    // 阻塞,等待接收
    <-ch       // 永远执行不到
}

// ❌ 死锁:所有 goroutine 都阻塞
func main() {
    ch := make(chan int)
    go func() { ch <- 1 }()
    go func() { ch <- 2 }()  // 如果没人接收,2 个 goroutine 都阻塞
    // main goroutine 退出 → 程序结束,但如果没有退出就是死锁
}

// ✅ 修复:使用缓冲 channel 或确保有接收方
func main() {
    ch := make(chan int, 2)  // 缓冲区够用
    ch <- 1
    ch <- 2
    fmt.Println(<-ch, <-ch)
}

陷阱 3:闭包捕获问题

// ❌ 所有 goroutine 输出 10——循环变量 i 被闭包共享
for i := 0; i < 10; i++ {
    go func() {
        fmt.Println(i)  // 捕获的是变量 i 的引用
    }()
}

// ✅ 修复方式 1:参数传递(Go 1.21 之前推荐)
for i := 0; i < 10; i++ {
    go func(n int) {
        fmt.Println(n)  // n 是值拷贝
    }(i)
}

// ✅ 修复方式 2:Go 1.22+ 循环变量每次迭代重新创建
// Go 1.22 改变了 for 循环语义:每次迭代创建新变量
// 以下代码在 Go 1.22+ 中正确输出 0-9
for i := 0; i < 10; i++ {
    go func() {
        fmt.Println(i)  // Go 1.22+: 每个 i 是独立变量
    }()
}
Go 1.22 破坏性变更for 循环变量从「每次迭代复用」改为「每次迭代新建」。如果你还在用 Go 1.21 或更早版本,必须用参数传递方式避免闭包捕获问题。

八、并发测试

1. 用 race detector 检测数据竞争

// 运行测试时加 -race 标志
// $ go test -race ./...

// 典型的数据竞争
var counter int
func TestRace(t *testing.T) {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++  // ⚠️ 数据竞争!多个 goroutine 并发写
        }()
    }
    wg.Wait()
    // -race 会报:DATA RACE at counter++
}

2. 测试并发安全性

func TestConcurrentMapAccess(t *testing.T) {
    var mu sync.Mutex
    m := make(map[int]int)
    var wg sync.WaitGroup

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            mu.Lock()
            m[n] = n * 2
            mu.Unlock()
        }(i)
    }
    wg.Wait()

    if len(m) != 100 {
        t.Errorf("Expected 100 entries, got %d", len(m))
    }
}

3. 测试 Goroutine 泄漏

// 使用 goleak 检测
import "go.uber.org/goleak"

func TestMain(m *testing.M) {
    goleak.VerifyTestMain(m)
}

func TestNoLeak(t *testing.T) {
    defer goleak.VerifyNone(t)

    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()

    result := doWork(ctx)
    _ = result
}

4. 压力测试并发性能

func BenchmarkWorkerPool(b *testing.B) {
    jobs := make(chan int, b.N)
    results := make(chan int, b.N)

    // 启动 worker pool
    var wg sync.WaitGroup
    for i := 0; i < runtime.NumCPU(); i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for job := range jobs {
                results <- job * 2
            }
        }()
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        jobs <- i
    }
    close(jobs)

    go func() {
        wg.Wait()
        close(results)
    }()

    for range results {
        // drain
    }
}

并发测试速查

测试目标 工具/方法 命令
数据竞争 race detector go test -race ./...
Goroutine 泄漏 uber-go/goleak goleak.VerifyNone(t)
死锁检测 runtime + timeout 结合 Context 超时
并发性能 benchmark go test -bench=. -benchmem
覆盖率 cover go test -cover ./...
一句话总结:Go 并发三件套——Goroutine 轻量并发、Channel 安全通信、Context 优雅取消。四类模式——Fan-Out/Fan-In 扇分扇合、Pipeline 流水线、Worker Pool 工作池、Select 多路复用。测试必加 -race,始终 defer cancel(),闭包传参不传变量。