跳转至

并发编程

学习目标

学完本章后,学习者应该能够:

  1. 正确使用 goroutine、channel 和 select。
  2. 理解 buffered channel、unbuffered channel、Mutex、RWMutex、WaitGroup、Once、Cond、Pool、atomic。
  3. 能识别数据竞争、死锁、goroutine 泄漏和并发过载。
  4. 会使用 go test -race 检查数据竞争。

goroutine

goroutine 是 Go 的轻量并发执行单元。

go func() {
    log.Println("background job")
}()

goroutine 便宜,但不是免费。每个 goroutine 都有栈、调度和资源持有成本。生产代码中要考虑退出条件、错误处理和并发上限。

channel

channel 用于 goroutine 之间通信。

无缓冲 channel 要求发送和接收同时准备好:

ch := make(chan int)

有缓冲 channel 可以暂存一定数量元素:

ch := make(chan int, 10)

channel 适合表达数据流、信号和所有权转移,不适合替代所有锁。

select

select 用于等待多个 channel 操作:

select {
case job := <-jobs:
    handle(job)
case <-ctx.Done():
    return ctx.Err()
}

并发代码中,ctx.Done() 是重要退出路径。

锁与同步

常见同步工具:

  • sync.Mutex:互斥锁。
  • sync.RWMutex:读写锁。
  • sync.WaitGroup:等待一组 goroutine。
  • sync.Once:只执行一次。
  • sync.Pool:临时对象复用。
  • sync/atomic:原子操作。

锁适合保护共享状态。channel 适合传递数据和信号。不要机械套用一句“不要通过共享内存通信”。

Go 后端实际应用例子

例子一:worker pool

func RunWorkers(ctx context.Context, jobs <-chan Job, workers int) error {
    var wg sync.WaitGroup
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case job, ok := <-jobs:
                    if !ok {
                        return
                    }
                    job.Handle(ctx)
                case <-ctx.Done():
                    return
                }
            }
        }()
    }
    wg.Wait()
    return ctx.Err()
}

worker pool 能限制并发,避免无限 goroutine 打爆数据库、Redis 或第三方 API。

例子二:用锁保护 map

type SafeCounter struct {
    mu sync.RWMutex
    m  map[string]int
}

func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.m[key]++
}

func (c *SafeCounter) Get(key string) int {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.m[key]
}

并发读写 map 必须同步。

常见误区

  • 误区一:goroutine 越多吞吐越高。

超过资源上限后,更多 goroutine 只会增加调度、内存、连接池和下游压力。

  • 误区二:channel 可以替代所有锁。

channel 和锁解决的问题不同。简单共享状态用锁可能更清晰。

  • 误区三:没有 panic 就没有数据竞争。

数据竞争可能产生偶发错误,必须用 race detector 和代码审查发现。

线上问题案例

某批量同步服务每个用户启动一个 goroutine,同步高峰时创建几十万个 goroutine,同时打满数据库连接池。服务内存上涨,请求超时。

修复方式是引入 worker pool、队列背压、context 取消、数据库连接池限制和任务级指标。

实战任务

实现一个并发安全的内存计数器:

  1. 支持 Inc(key)
  2. 支持 Get(key)
  3. 支持 Snapshot() 返回副本。
  4. 通过 go test -race
参考答案

可以使用 sync.RWMutex 保护 map:

type Counter struct {
    mu sync.RWMutex
    m  map[string]int64
}

func NewCounter() *Counter {
    return &Counter{m: make(map[string]int64)}
}

func (c *Counter) Inc(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.m[key]++
}

func (c *Counter) Get(key string) int64 {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.m[key]
}

func (c *Counter) Snapshot() map[string]int64 {
    c.mu.RLock()
    defer c.mu.RUnlock()
    result := make(map[string]int64, len(c.m))
    for k, v := range c.m {
        result[k] = v
    }
    return result
}

Snapshot 必须返回副本,否则调用方可以绕过锁修改内部 map。

面试题

1. channel 关闭后还能读取吗?

参考答案

可以。关闭后的 channel 仍然可以读取,读完缓冲区后会返回元素类型零值,并且第二个返回值 ok 为 false。

不能向已关闭的 channel 发送数据,否则会 panic。通常由发送方负责关闭 channel。

2. Mutex 和 RWMutex 如何选择?

参考答案

Mutex 简单直接,适合大多数场景。RWMutex 允许多个读者并发读,但写者需要独占,适合读多写少且读操作持锁时间有意义的场景。

RWMutex 不是默认更快。读写比例、锁竞争和代码复杂度都要考虑。

3. race detector 能发现什么?

参考答案

race detector 能在测试运行时发现数据竞争,即多个 goroutine 并发访问同一变量,至少一个是写操作,且没有同步保护。

它只能发现测试覆盖到的路径,所以需要配合充分测试和代码审查。