并发编程¶
学习目标¶
学完本章后,学习者应该能够:
- 正确使用 goroutine、channel 和 select。
- 理解 buffered channel、unbuffered channel、Mutex、RWMutex、WaitGroup、Once、Cond、Pool、atomic。
- 能识别数据竞争、死锁、goroutine 泄漏和并发过载。
- 会使用
go test -race检查数据竞争。
goroutine¶
goroutine 是 Go 的轻量并发执行单元。
goroutine 便宜,但不是免费。每个 goroutine 都有栈、调度和资源持有成本。生产代码中要考虑退出条件、错误处理和并发上限。
channel¶
channel 用于 goroutine 之间通信。
无缓冲 channel 要求发送和接收同时准备好:
有缓冲 channel 可以暂存一定数量元素:
channel 适合表达数据流、信号和所有权转移,不适合替代所有锁。
select¶
select 用于等待多个 channel 操作:
并发代码中,ctx.Done() 是重要退出路径。
锁与同步¶
常见同步工具:
sync.Mutex:互斥锁。sync.RWMutex:读写锁。sync.WaitGroup:等待一组 goroutine。sync.Once:只执行一次。sync.Pool:临时对象复用。sync/atomic:原子操作。
锁适合保护共享状态。channel 适合传递数据和信号。不要机械套用一句“不要通过共享内存通信”。
Go 后端实际应用例子¶
例子一:worker pool¶
func RunWorkers(ctx context.Context, jobs <-chan Job, workers int) error {
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if !ok {
return
}
job.Handle(ctx)
case <-ctx.Done():
return
}
}
}()
}
wg.Wait()
return ctx.Err()
}
worker pool 能限制并发,避免无限 goroutine 打爆数据库、Redis 或第三方 API。
例子二:用锁保护 map¶
type SafeCounter struct {
mu sync.RWMutex
m map[string]int
}
func (c *SafeCounter) Inc(key string) {
c.mu.Lock()
defer c.mu.Unlock()
c.m[key]++
}
func (c *SafeCounter) Get(key string) int {
c.mu.RLock()
defer c.mu.RUnlock()
return c.m[key]
}
并发读写 map 必须同步。
常见误区¶
- 误区一:goroutine 越多吞吐越高。
超过资源上限后,更多 goroutine 只会增加调度、内存、连接池和下游压力。
- 误区二:channel 可以替代所有锁。
channel 和锁解决的问题不同。简单共享状态用锁可能更清晰。
- 误区三:没有 panic 就没有数据竞争。
数据竞争可能产生偶发错误,必须用 race detector 和代码审查发现。
线上问题案例¶
某批量同步服务每个用户启动一个 goroutine,同步高峰时创建几十万个 goroutine,同时打满数据库连接池。服务内存上涨,请求超时。
修复方式是引入 worker pool、队列背压、context 取消、数据库连接池限制和任务级指标。
实战任务¶
实现一个并发安全的内存计数器:
- 支持
Inc(key)。 - 支持
Get(key)。 - 支持
Snapshot()返回副本。 - 通过
go test -race。
参考答案
可以使用 sync.RWMutex 保护 map:
type Counter struct {
mu sync.RWMutex
m map[string]int64
}
func NewCounter() *Counter {
return &Counter{m: make(map[string]int64)}
}
func (c *Counter) Inc(key string) {
c.mu.Lock()
defer c.mu.Unlock()
c.m[key]++
}
func (c *Counter) Get(key string) int64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.m[key]
}
func (c *Counter) Snapshot() map[string]int64 {
c.mu.RLock()
defer c.mu.RUnlock()
result := make(map[string]int64, len(c.m))
for k, v := range c.m {
result[k] = v
}
return result
}
Snapshot 必须返回副本,否则调用方可以绕过锁修改内部 map。
面试题¶
1. channel 关闭后还能读取吗?¶
参考答案
可以。关闭后的 channel 仍然可以读取,读完缓冲区后会返回元素类型零值,并且第二个返回值 ok 为 false。
不能向已关闭的 channel 发送数据,否则会 panic。通常由发送方负责关闭 channel。
2. Mutex 和 RWMutex 如何选择?¶
参考答案
Mutex 简单直接,适合大多数场景。RWMutex 允许多个读者并发读,但写者需要独占,适合读多写少且读操作持锁时间有意义的场景。
RWMutex 不是默认更快。读写比例、锁竞争和代码复杂度都要考虑。
3. race detector 能发现什么?¶
参考答案
race detector 能在测试运行时发现数据竞争,即多个 goroutine 并发访问同一变量,至少一个是写操作,且没有同步保护。
它只能发现测试覆盖到的路径,所以需要配合充分测试和代码审查。