Mutex - Go 互斥锁与同步原语
Mutex(互斥锁)是 Go 中最基础的同步原语,用于保护共享数据免受并发访问。sync 包提供了 Mutex、RWMutex、WaitGroup、Cond、Once、Pool 等多种同步工具。
📌 核心概念
🔒
Mutex
互斥锁,独占访问
Lock/Unlock
📖
RWMutex
读写锁,读共享写独占
RLock/Lock
⏳
WaitGroup
等待一组 Goroutine 完成
Add/Done/Wait
🔄
Cond
条件变量,等待信号
Wait/Signal
1️⃣
Once
只执行一次
Do(func)
♻️
Pool
对象池,减少 GC 压力
Get/Put
Mutex 互斥锁
📝 Mutex 基础用法
package main
import (
"fmt"
"sync"
"time"
)
// Counter 线程安全的计数器
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
counter := &Counter{}
var wg sync.WaitGroup
// 启动 100 个 Goroutine 并发增加
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 1000; j++ {
counter.Increment()
}
}()
}
wg.Wait()
fmt.Printf("Final count: %d\n", counter.Value())
// 输出:Final count: 100000
}
💡 Mutex 使用原则
- 指针传递: Mutex 必须与数据一起传递,使用指针
- defer Unlock: 使用 defer 确保锁被释放
- 临界区最小化: 锁住尽可能少的代码
- 避免死锁: 不要重复加锁,注意锁的顺序
- 零值可用: sync.Mutex 零值即可使用,无需初始化
Mutex 底层实现
📖 runtime 中的 Mutex 实现
// sync/mutex.go - Mutex 结构
type Mutex struct {
state int32
sema uint32
}
// 状态位定义
const (
mutexLocked = 1 << 0 // 锁被持有
mutexWoken = 1 << 1 // 有等待者被唤醒
mutexStarving = 1 << 2 // 饥饿模式
mutexWaiterShift = 3 // 等待者数量偏移
)
// Lock 实现 (简化)
func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return // 快速路径:直接获取锁
}
// 慢速路径:进入等待队列
m.lockSlow()
}
// Unlock 实现
func (m *Mutex) Unlock() {
new := atomic.AddInt32(&m.state, -mutexLocked)
if new&mutexWoken != 0 {
// 唤醒等待者
runtime.semawakeup(&m.sema)
}
}
// 两种模式:
// 1. 正常模式:FIFO,等待者按顺序获取锁
// 2. 饥饿模式:防止饿死,直接传递给等待最久的 Goroutine
RWMutex 读写锁
📝 读写锁使用场景
package main
import (
"fmt"
"sync"
"time"
)
// Cache 使用 RWMutex 优化读多写少场景
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func NewCache() *Cache {
return &Cache{data: make(map[string]string)}
}
// 读操作 - 使用 RLock,允许多个读并发
func (c *Cache) Get(key string) (string, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
val, ok := c.data[key]
return val, ok
}
// 写操作 - 使用 Lock,独占访问
func (c *Cache) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
}
func main() {
cache := NewCache()
var wg sync.WaitGroup
// 多个读 Goroutine
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 100; j++ {
cache.Get("key")
}
}(i)
}
// 少量写 Goroutine
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 10; j++ {
cache.Set(fmt.Sprintf("key-%d", j), fmt.Sprintf("value-%d", j))
}
}(i)
}
wg.Wait()
fmt.Println("Cache operations completed")
}
⚠️ RWMutex 注意事项
- 读多写少: 只在读远多于写时使用 RWMutex
- 写锁饥饿: 大量读可能导致写操作饥饿
- 临界区短: 持有锁的时间要尽可能短
- 不要升级: RLock 不能升级为 Lock,会死锁
WaitGroup 等待组
📝 WaitGroup 基础用法
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
// 启动 5 个 Goroutine
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Goroutine %d starting\n", id)
time.Sleep(time.Millisecond * 100)
fmt.Printf("Goroutine %d done\n", id)
}(i)
}
// 等待所有 Goroutine 完成
wg.Wait()
fmt.Println("All goroutines completed")
}
// WaitGroup 内部实现 (简化)
// type WaitGroup struct {
// state [3]int32 // 计数器、信号量、状态
// }
//
// Add(delta int): 计数器 += delta
// Done(): 计数器 -= 1
// Wait(): 阻塞直到计数器为 0
WaitGroup 常见模式
📝 WaitGroup 实用模式
// 模式 1: 批处理任务
func batchProcess(items []string, concurrency int) {
sem := make(chan struct{}, concurrency)
var wg sync.WaitGroup
for _, item := range items {
wg.Add(1)
sem <- struct{}{} // 获取信号量
go func(item string) {
defer wg.Done()
defer func() { <-sem }() // 释放信号量
process(item)
}(item)
}
wg.Wait()
}
// 模式 2: 带超时的等待
func waitWithTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
return true
case <-time.After(timeout):
return false
}
}
// 模式 3: 错误收集
type Result struct {
ID int
Err error
}
func parallelWithErrors(ids []int) []Result {
var wg sync.WaitGroup
results := make([]Result, len(ids))
for i, id := range ids {
wg.Add(1)
go func(idx, id int) {
defer wg.Done()
results[idx] = Result{
ID: id,
Err: process(id),
}
}(i, id)
}
wg.Wait()
return results
}
Once 和 Cond
sync.Once - 只执行一次
📝 单例模式与延迟初始化
package main
import (
"fmt"
"sync"
)
// 单例模式
type Singleton struct {
data string
}
var (
instance *Singleton
once sync.Once
)
func GetInstance() *Singleton {
once.Do(func() {
instance = &Singleton{data: "singleton data"}
fmt.Println("Singleton initialized")
})
return instance
}
// 延迟初始化配置
type Config struct {
mu sync.Mutex
once sync.Once
data map[string]string
}
func (c *Config) init() {
c.data = loadConfigFromFile()
}
func (c *Config) Get(key string) string {
c.once.Do(c.init) // 延迟初始化
c.mu.Lock()
defer c.mu.Unlock()
return c.data[key]
}
func loadConfigFromFile() map[string]string {
fmt.Println("Loading config...")
return make(map[string]string)
}
func main() {
var wg sync.WaitGroup
// 多个 Goroutine 并发获取单例
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = GetInstance()
}()
}
wg.Wait()
// "Singleton initialized" 只打印一次
}
sync.Cond - 条件变量
📝 条件变量实现通知机制
package main
import (
"fmt"
"sync"
"time"
)
type Queue struct {
mu sync.Mutex
cond *sync.Cond
items []int
}
func NewQueue() *Queue {
q := &Queue{items: make([]int, 0)}
q.cond = sync.NewCond(&q.mu)
return q
}
// 入队
func (q *Queue) Push(item int) {
q.mu.Lock()
defer q.mu.Unlock()
q.items = append(q.items, item)
q.cond.Signal() // 通知一个等待者
}
// 出队 (阻塞)
func (q *Queue) Pop() int {
q.mu.Lock()
defer q.mu.Unlock()
for len(q.items) == 0 {
q.cond.Wait() // 等待直到有数据
}
item := q.items[0]
q.items = q.items[1:]
return item
}
// 广播通知所有等待者
func (q *Queue) Close() {
q.mu.Lock()
defer q.mu.Unlock()
q.cond.Broadcast()
}
func main() {
queue := NewQueue()
// 消费者
go func() {
for {
item := queue.Pop()
fmt.Printf("Consumed: %d\n", item)
}
}()
// 生产者
go func() {
for i := 0; i < 5; i++ {
time.Sleep(time.Second)
queue.Push(i)
}
}()
time.Sleep(time.Second * 6)
}
sync.Pool 对象池
📝 对象池减少 GC 压力
package main
import (
"bytes"
"fmt"
"sync"
)
// 创建 bytes.Buffer 对象池
var bufferPool = sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
}
// 从池中获取 Buffer
func getBuffer() *bytes.Buffer {
return bufferPool.Get().(*bytes.Buffer)
}
// 归还 Buffer 到池中
func putBuffer(buf *bytes.Buffer) {
buf.Reset() // 重置后再归还
bufferPool.Put(buf)
}
func processData(data []byte) []byte {
buf := getBuffer()
defer putBuffer(buf)
buf.Write(data)
buf.WriteString(" - processed")
// 返回副本,因为 Buffer 要归还
result := make([]byte, buf.Len())
buf.Read(result)
return result
}
func main() {
var wg sync.WaitGroup
// 高并发场景使用对象池
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
result := processData([]byte(fmt.Sprintf("data-%d", id)))
_ = result
}(i)
}
wg.Wait()
fmt.Println("Processing complete")
}
💡 sync.Pool 使用场景
- 频繁分配: 频繁创建和销毁的对象
- 大对象: 占用大量内存的对象
- 高并发: 高并发场景减少 GC 压力
- 注意: Pool 中的对象可能被 GC 随时回收
性能对比与最佳实践
锁性能对比
📊 不同同步原语性能对比
// 基准测试结果 (参考值,Go 1.26)
// 1. Mutex vs RWMutex (读多写少场景)
// BenchmarkMutex-8 10000000 120 ns/op
// BenchmarkRWMutex-8 10000000 80 ns/op (读操作)
// RWMutex 读性能提升 ~33%
// 2. Mutex vs Channel (计数器场景)
// BenchmarkMutex-8 10000000 120 ns/op
// BenchmarkChannel-8 5000000 250 ns/op
// Mutex 性能提升 ~50%
// 3. WaitGroup vs Channel (等待完成)
// BenchmarkWaitGroup-8 20000000 60 ns/op
// BenchmarkChannelWait-8 5000000 200 ns/op
// WaitGroup 性能提升 ~70%
// 4. sync.Pool 效果
// BenchmarkNoPool-8 1000000 1200 ns/op 500 B/op
// BenchmarkWithPool-8 5000000 200 ns/op 50 B/op
// 性能提升 83%,内存减少 90%
✅ 同步原语选择指南
| 场景 | 推荐原语 | 理由 |
|---|---|---|
| 保护共享数据 | Mutex | 简单高效 |
| 读多写少 | RWMutex | 读并发,提升性能 |
| 等待 Goroutine 完成 | WaitGroup | 专为等待设计 |
| 单次初始化 | Once | 线程安全,简单 |
| 对象复用 | Pool | 减少 GC 压力 |
| 条件等待 | Cond | 通知/等待模式 |
| Goroutine 通信 | Channel | CSP 模型,数据传递 |
🚨 常见陷阱
- 复制 Mutex: Mutex 不能复制,会导致未定义行为
- 死锁: 重复加锁或锁顺序不一致
- 锁竞争: 临界区过大导致性能下降
- 忘记 Unlock: 使用 defer 确保释放
- Pool 对象被回收: 不要依赖 Pool 长期保存对象
总结
✅ 核心要点
- Mutex: 基础互斥锁,保护共享数据
- RWMutex: 读多写少场景,读并发写独占
- WaitGroup: 等待一组 Goroutine 完成
- Once: 确保只执行一次,单例模式
- Cond: 条件变量,通知/等待模式
- Pool: 对象池,减少 GC 压力
- 选择原则: 简单场景用 Mutex,通信用 Channel