← Select | Context →

Mutex - Go 互斥锁与同步原语

Mutex(互斥锁)是 Go 中最基础的同步原语,用于保护共享数据免受并发访问。sync 包提供了 Mutex、RWMutex、WaitGroup、Cond、Once、Pool 等多种同步工具。

📌 核心概念

🔒

Mutex

互斥锁,独占访问

Lock/Unlock
📖

RWMutex

读写锁,读共享写独占

RLock/Lock

WaitGroup

等待一组 Goroutine 完成

Add/Done/Wait
🔄

Cond

条件变量,等待信号

Wait/Signal
1️⃣

Once

只执行一次

Do(func)
♻️

Pool

对象池,减少 GC 压力

Get/Put

Mutex 互斥锁

📝 Mutex 基础用法

package main

import (
    "fmt"
    "sync"
    "time"
)

// Counter 线程安全的计数器
type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func main() {
    counter := &Counter{}
    
    var wg sync.WaitGroup
    
    // 启动 100 个 Goroutine 并发增加
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 1000; j++ {
                counter.Increment()
            }
        }()
    }
    
    wg.Wait()
    fmt.Printf("Final count: %d\n", counter.Value())
    // 输出:Final count: 100000
}

💡 Mutex 使用原则

  • 指针传递: Mutex 必须与数据一起传递,使用指针
  • defer Unlock: 使用 defer 确保锁被释放
  • 临界区最小化: 锁住尽可能少的代码
  • 避免死锁: 不要重复加锁,注意锁的顺序
  • 零值可用: sync.Mutex 零值即可使用,无需初始化

Mutex 底层实现

📖 runtime 中的 Mutex 实现

// sync/mutex.go - Mutex 结构
type Mutex struct {
    state int32
    sema  uint32
}

// 状态位定义
const (
    mutexLocked      = 1 << 0 // 锁被持有
    mutexWoken       = 1 << 1 // 有等待者被唤醒
    mutexStarving    = 1 << 2 // 饥饿模式
    mutexWaiterShift = 3       // 等待者数量偏移
)

// Lock 实现 (简化)
func (m *Mutex) Lock() {
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return // 快速路径:直接获取锁
    }
    
    // 慢速路径:进入等待队列
    m.lockSlow()
}

// Unlock 实现
func (m *Mutex) Unlock() {
    new := atomic.AddInt32(&m.state, -mutexLocked)
    
    if new&mutexWoken != 0 {
        // 唤醒等待者
        runtime.semawakeup(&m.sema)
    }
}

// 两种模式:
// 1. 正常模式:FIFO,等待者按顺序获取锁
// 2. 饥饿模式:防止饿死,直接传递给等待最久的 Goroutine

RWMutex 读写锁

📝 读写锁使用场景

package main

import (
    "fmt"
    "sync"
    "time"
)

// Cache 使用 RWMutex 优化读多写少场景
type Cache struct {
    mu   sync.RWMutex
    data map[string]string
}

func NewCache() *Cache {
    return &Cache{data: make(map[string]string)}
}

// 读操作 - 使用 RLock,允许多个读并发
func (c *Cache) Get(key string) (string, bool) {
    c.mu.RLock()
    defer c.mu.RUnlock()
    val, ok := c.data[key]
    return val, ok
}

// 写操作 - 使用 Lock,独占访问
func (c *Cache) Set(key, value string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.data[key] = value
}

func main() {
    cache := NewCache()
    
    var wg sync.WaitGroup
    
    // 多个读 Goroutine
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                cache.Get("key")
            }
        }(i)
    }
    
    // 少量写 Goroutine
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 10; j++ {
                cache.Set(fmt.Sprintf("key-%d", j), fmt.Sprintf("value-%d", j))
            }
        }(i)
    }
    
    wg.Wait()
    fmt.Println("Cache operations completed")
}

⚠️ RWMutex 注意事项

  • 读多写少: 只在读远多于写时使用 RWMutex
  • 写锁饥饿: 大量读可能导致写操作饥饿
  • 临界区短: 持有锁的时间要尽可能短
  • 不要升级: RLock 不能升级为 Lock,会死锁

WaitGroup 等待组

📝 WaitGroup 基础用法

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    
    // 启动 5 个 Goroutine
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            fmt.Printf("Goroutine %d starting\n", id)
            time.Sleep(time.Millisecond * 100)
            fmt.Printf("Goroutine %d done\n", id)
        }(i)
    }
    
    // 等待所有 Goroutine 完成
    wg.Wait()
    fmt.Println("All goroutines completed")
}

// WaitGroup 内部实现 (简化)
// type WaitGroup struct {
//     state [3]int32  // 计数器、信号量、状态
// }
//
// Add(delta int): 计数器 += delta
// Done(): 计数器 -= 1
// Wait(): 阻塞直到计数器为 0

WaitGroup 常见模式

📝 WaitGroup 实用模式

// 模式 1: 批处理任务
func batchProcess(items []string, concurrency int) {
    sem := make(chan struct{}, concurrency)
    var wg sync.WaitGroup
    
    for _, item := range items {
        wg.Add(1)
        sem <- struct{}{} // 获取信号量
        
        go func(item string) {
            defer wg.Done()
            defer func() { <-sem }() // 释放信号量
            
            process(item)
        }(item)
    }
    
    wg.Wait()
}

// 模式 2: 带超时的等待
func waitWithTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
    done := make(chan struct{})
    
    go func() {
        wg.Wait()
        close(done)
    }()
    
    select {
    case <-done:
        return true
    case <-time.After(timeout):
        return false
    }
}

// 模式 3: 错误收集
type Result struct {
    ID  int
    Err error
}

func parallelWithErrors(ids []int) []Result {
    var wg sync.WaitGroup
    results := make([]Result, len(ids))
    
    for i, id := range ids {
        wg.Add(1)
        go func(idx, id int) {
            defer wg.Done()
            results[idx] = Result{
                ID:  id,
                Err: process(id),
            }
        }(i, id)
    }
    
    wg.Wait()
    return results
}

Once 和 Cond

sync.Once - 只执行一次

📝 单例模式与延迟初始化

package main

import (
    "fmt"
    "sync"
)

// 单例模式
type Singleton struct {
    data string
}

var (
    instance *Singleton
    once     sync.Once
)

func GetInstance() *Singleton {
    once.Do(func() {
        instance = &Singleton{data: "singleton data"}
        fmt.Println("Singleton initialized")
    })
    return instance
}

// 延迟初始化配置
type Config struct {
    mu   sync.Mutex
    once sync.Once
    data map[string]string
}

func (c *Config) init() {
    c.data = loadConfigFromFile()
}

func (c *Config) Get(key string) string {
    c.once.Do(c.init) // 延迟初始化
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.data[key]
}

func loadConfigFromFile() map[string]string {
    fmt.Println("Loading config...")
    return make(map[string]string)
}

func main() {
    var wg sync.WaitGroup
    
    // 多个 Goroutine 并发获取单例
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            _ = GetInstance()
        }()
    }
    
    wg.Wait()
    // "Singleton initialized" 只打印一次
}

sync.Cond - 条件变量

📝 条件变量实现通知机制

package main

import (
    "fmt"
    "sync"
    "time"
)

type Queue struct {
    mu    sync.Mutex
    cond  *sync.Cond
    items []int
}

func NewQueue() *Queue {
    q := &Queue{items: make([]int, 0)}
    q.cond = sync.NewCond(&q.mu)
    return q
}

// 入队
func (q *Queue) Push(item int) {
    q.mu.Lock()
    defer q.mu.Unlock()
    
    q.items = append(q.items, item)
    q.cond.Signal() // 通知一个等待者
}

// 出队 (阻塞)
func (q *Queue) Pop() int {
    q.mu.Lock()
    defer q.mu.Unlock()
    
    for len(q.items) == 0 {
        q.cond.Wait() // 等待直到有数据
    }
    
    item := q.items[0]
    q.items = q.items[1:]
    return item
}

// 广播通知所有等待者
func (q *Queue) Close() {
    q.mu.Lock()
    defer q.mu.Unlock()
    q.cond.Broadcast()
}

func main() {
    queue := NewQueue()
    
    // 消费者
    go func() {
        for {
            item := queue.Pop()
            fmt.Printf("Consumed: %d\n", item)
        }
    }()
    
    // 生产者
    go func() {
        for i := 0; i < 5; i++ {
            time.Sleep(time.Second)
            queue.Push(i)
        }
    }()
    
    time.Sleep(time.Second * 6)
}

sync.Pool 对象池

📝 对象池减少 GC 压力

package main

import (
    "bytes"
    "fmt"
    "sync"
)

// 创建 bytes.Buffer 对象池
var bufferPool = sync.Pool{
    New: func() interface{} {
        return &bytes.Buffer{}
    },
}

// 从池中获取 Buffer
func getBuffer() *bytes.Buffer {
    return bufferPool.Get().(*bytes.Buffer)
}

// 归还 Buffer 到池中
func putBuffer(buf *bytes.Buffer) {
    buf.Reset() // 重置后再归还
    bufferPool.Put(buf)
}

func processData(data []byte) []byte {
    buf := getBuffer()
    defer putBuffer(buf)
    
    buf.Write(data)
    buf.WriteString(" - processed")
    
    // 返回副本,因为 Buffer 要归还
    result := make([]byte, buf.Len())
    buf.Read(result)
    return result
}

func main() {
    var wg sync.WaitGroup
    
    // 高并发场景使用对象池
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            result := processData([]byte(fmt.Sprintf("data-%d", id)))
            _ = result
        }(i)
    }
    
    wg.Wait()
    fmt.Println("Processing complete")
}

💡 sync.Pool 使用场景

  • 频繁分配: 频繁创建和销毁的对象
  • 大对象: 占用大量内存的对象
  • 高并发: 高并发场景减少 GC 压力
  • 注意: Pool 中的对象可能被 GC 随时回收

性能对比与最佳实践

锁性能对比

📊 不同同步原语性能对比

// 基准测试结果 (参考值,Go 1.26)

// 1. Mutex vs RWMutex (读多写少场景)
// BenchmarkMutex-8          10000000    120 ns/op
// BenchmarkRWMutex-8        10000000     80 ns/op  (读操作)
// RWMutex 读性能提升 ~33%

// 2. Mutex vs Channel (计数器场景)
// BenchmarkMutex-8          10000000    120 ns/op
// BenchmarkChannel-8         5000000    250 ns/op
// Mutex 性能提升 ~50%

// 3. WaitGroup vs Channel (等待完成)
// BenchmarkWaitGroup-8      20000000     60 ns/op
// BenchmarkChannelWait-8     5000000    200 ns/op
// WaitGroup 性能提升 ~70%

// 4. sync.Pool 效果
// BenchmarkNoPool-8          1000000   1200 ns/op    500 B/op
// BenchmarkWithPool-8        5000000    200 ns/op     50 B/op
// 性能提升 83%,内存减少 90%

✅ 同步原语选择指南

场景 推荐原语 理由
保护共享数据 Mutex 简单高效
读多写少 RWMutex 读并发,提升性能
等待 Goroutine 完成 WaitGroup 专为等待设计
单次初始化 Once 线程安全,简单
对象复用 Pool 减少 GC 压力
条件等待 Cond 通知/等待模式
Goroutine 通信 Channel CSP 模型,数据传递

🚨 常见陷阱

  • 复制 Mutex: Mutex 不能复制,会导致未定义行为
  • 死锁: 重复加锁或锁顺序不一致
  • 锁竞争: 临界区过大导致性能下降
  • 忘记 Unlock: 使用 defer 确保释放
  • Pool 对象被回收: 不要依赖 Pool 长期保存对象

总结

✅ 核心要点

  • Mutex: 基础互斥锁,保护共享数据
  • RWMutex: 读多写少场景,读并发写独占
  • WaitGroup: 等待一组 Goroutine 完成
  • Once: 确保只执行一次,单例模式
  • Cond: 条件变量,通知/等待模式
  • Pool: 对象池,减少 GC 压力
  • 选择原则: 简单场景用 Mutex,通信用 Channel