← Mutex | json 包 →

sync 包 - 并发同步原语

sync 包提供了基本的并发同步原语,包括 Mutex、RWMutex、WaitGroup、Cond、Once、Pool 等。这些原语是构建安全并发程序的基石。

sync 包概览

🔒

Mutex

互斥锁,保护共享数据

📖

RWMutex

读写锁,读共享写独占

WaitGroup

等待一组任务完成

🔔

Cond

条件变量,通知/等待

1️⃣

Once

确保只执行一次

♻️

Pool

对象池,减少 GC

🔢

Atomic

原子操作 (sync/atomic)

🚧

Map

并发安全的 Map

Mutex 互斥锁

📝 Mutex 核心用法

package main

import (
    "fmt"
    "sync"
)

// 线程安全计数器
type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func main() {
    counter := &Counter{}
    var wg sync.WaitGroup
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Inc()
        }()
    }
    
    wg.Wait()
    fmt.Printf("Count: %d\n", counter.Value())
}

💡 Mutex 要点

  • 零值可用: 无需初始化,零值即可使用
  • 不可复制: Mutex 不能复制,会导致未定义行为
  • defer Unlock: 使用 defer 确保锁被释放
  • 指针传递: 结构体包含 Mutex 时必须指针传递
  • 两种模式: 正常模式和饥饿模式自动切换

RWMutex 读写锁

📝 读写锁优化读多写少

package main

import (
    "fmt"
    "sync"
)

type Cache struct {
    mu   sync.RWMutex
    data map[string]string
}

func NewCache() *Cache {
    return &Cache{data: make(map[string]string)}
}

// 读操作 - RLock 允许多个读并发
func (c *Cache) Get(key string) (string, bool) {
    c.mu.RLock()
    defer c.mu.RUnlock()
    val, ok := c.data[key]
    return val, ok
}

// 写操作 - Lock 独占访问
func (c *Cache) Set(key, value string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.data[key] = value
}

// 删除操作
func (c *Cache) Delete(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    delete(c.data, key)
}

WaitGroup 等待组

📝 WaitGroup 等待多个任务

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            time.Sleep(time.Millisecond * 100)
            fmt.Printf("Task %d done\n", id)
        }(i)
    }
    
    wg.Wait()
    fmt.Println("All tasks completed")
}

// WaitGroup API:
// Add(delta int)   - 计数器增加
// Done()           - 计数器减 1
// Wait()           - 阻塞直到计数器为 0

Once 单次执行

📝 单例模式与延迟初始化

package main

import (
    "fmt"
    "sync"
)

type Singleton struct {
    data string
}

var (
    instance *Singleton
    once     sync.Once
)

func GetInstance() *Singleton {
    once.Do(func() {
        instance = &Singleton{data: "singleton"}
        fmt.Println("Singleton initialized")
    })
    return instance
}

func main() {
    var wg sync.WaitGroup
    
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            _ = GetInstance()
        }()
    }
    
    wg.Wait()
    // "Singleton initialized" 只打印一次
}

Cond 条件变量

📝 条件变量实现通知机制

package main

import (
    "fmt"
    "sync"
    "time"
)

type Queue struct {
    mu    sync.Mutex
    cond  *sync.Cond
    items []int
}

func NewQueue() *Queue {
    q := &Queue{items: make([]int, 0)}
    q.cond = sync.NewCond(&q.mu)
    return q
}

func (q *Queue) Push(item int) {
    q.mu.Lock()
    defer q.mu.Unlock()
    q.items = append(q.items, item)
    q.cond.Signal() // 通知一个等待者
}

func (q *Queue) Pop() int {
    q.mu.Lock()
    defer q.mu.Unlock()
    
    for len(q.items) == 0 {
        q.cond.Wait() // 等待直到有数据
    }
    
    item := q.items[0]
    q.items = q.items[1:]
    return item
}

func main() {
    queue := NewQueue()
    
    go func() {
        for i := 0; i < 5; i++ {
            time.Sleep(time.Second)
            queue.Push(i)
        }
    }()
    
    go func() {
        for {
            item := queue.Pop()
            fmt.Printf("Consumed: %d\n", item)
        }
    }()
    
    time.Sleep(time.Second * 6)
}

Pool 对象池

📝 对象池减少 GC 压力

package main

import (
    "bytes"
    "fmt"
    "sync"
)

var bufferPool = sync.Pool{
    New: func() interface{} {
        return &bytes.Buffer{}
    },
}

func getBuffer() *bytes.Buffer {
    return bufferPool.Get().(*bytes.Buffer)
}

func putBuffer(buf *bytes.Buffer) {
    buf.Reset()
    bufferPool.Put(buf)
}

func processData(data []byte) []byte {
    buf := getBuffer()
    defer putBuffer(buf)
    
    buf.Write(data)
    buf.WriteString(" - processed")
    
    result := make([]byte, buf.Len())
    buf.Read(result)
    return result
}

func main() {
    var wg sync.WaitGroup
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            result := processData([]byte(fmt.Sprintf("data-%d", id)))
            _ = result
        }(i)
    }
    
    wg.Wait()
}

sync.Map 并发 Map

📝 并发安全的 Map

package main

import (
    "fmt"
    "sync"
)

func main() {
    var m sync.Map
    
    // 存储
    m.Store("key1", "value1")
    m.Store("key2", 42)
    
    // 读取
    val, ok := m.Load("key1")
    fmt.Printf("key1: %v, ok: %v\n", val, ok)
    
    // 读取或删除
    val, ok = m.LoadAndDelete("key2")
    fmt.Printf("key2: %v, ok: %v\n", val, ok)
    
    // 读取或存储
    val, loaded := m.LoadOrStore("key3", "value3")
    fmt.Printf("key3: %v, loaded: %v\n", val, loaded)
    
    // 遍历
    m.Range(func(key, value interface{}) bool {
        fmt.Printf("%v: %v\n", key, value)
        return true // 继续遍历
    })
}

💡 sync.Map 使用场景

  • 读多写少: 缓存场景,键稳定
  • 键是整数: 每个 Goroutine 有不同 ID
  • 多个 Goroutine 读写同一 Map
  • 注意: 普通场景用 map+Mutex 性能更好

性能对比

📊 同步原语性能对比

// 基准测试结果 (Go 1.26, 参考值)

// Mutex 操作
// BenchmarkMutex-8    10000000    120 ns/op

// RWMutex (读)
// BenchmarkRWMutexRead-8    10000000     80 ns/op

// RWMutex (写)
// BenchmarkRWMutexWrite-8    5000000    200 ns/op

// WaitGroup
// BenchmarkWaitGroup-8    20000000     60 ns/op

// Once
// BenchmarkOnce-8    50000000     25 ns/op (已初始化后)

// Pool
// BenchmarkPool-8    10000000    100 ns/op

// sync.Map vs map+Mutex
// BenchmarkSyncMap-8         5000000    200 ns/op (读多写少)
// BenchmarkMapMutex-8       10000000    100 ns/op (普通场景)

总结

✅ 选择指南

场景 推荐原语
保护共享数据 Mutex
读多写少 RWMutex
等待任务完成 WaitGroup
单次初始化 Once
对象复用 Pool
条件等待 Cond
并发 Map (读多写少) sync.Map

📖 延伸阅读