sync 包 - 并发同步原语
sync 包提供了基本的并发同步原语,包括 Mutex、RWMutex、WaitGroup、Cond、Once、Pool 等。这些原语是构建安全并发程序的基石。
sync 包概览
🔒
Mutex
互斥锁,保护共享数据
📖
RWMutex
读写锁,读共享写独占
⏳
WaitGroup
等待一组任务完成
🔔
Cond
条件变量,通知/等待
1️⃣
Once
确保只执行一次
♻️
Pool
对象池,减少 GC
🔢
Atomic
原子操作 (sync/atomic)
🚧
Map
并发安全的 Map
Mutex 互斥锁
📝 Mutex 核心用法
package main
import (
"fmt"
"sync"
)
// 线程安全计数器
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
counter := &Counter{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Inc()
}()
}
wg.Wait()
fmt.Printf("Count: %d\n", counter.Value())
}
💡 Mutex 要点
- 零值可用: 无需初始化,零值即可使用
- 不可复制: Mutex 不能复制,会导致未定义行为
- defer Unlock: 使用 defer 确保锁被释放
- 指针传递: 结构体包含 Mutex 时必须指针传递
- 两种模式: 正常模式和饥饿模式自动切换
RWMutex 读写锁
📝 读写锁优化读多写少
package main
import (
"fmt"
"sync"
)
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func NewCache() *Cache {
return &Cache{data: make(map[string]string)}
}
// 读操作 - RLock 允许多个读并发
func (c *Cache) Get(key string) (string, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
val, ok := c.data[key]
return val, ok
}
// 写操作 - Lock 独占访问
func (c *Cache) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
}
// 删除操作
func (c *Cache) Delete(key string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.data, key)
}
WaitGroup 等待组
📝 WaitGroup 等待多个任务
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(time.Millisecond * 100)
fmt.Printf("Task %d done\n", id)
}(i)
}
wg.Wait()
fmt.Println("All tasks completed")
}
// WaitGroup API:
// Add(delta int) - 计数器增加
// Done() - 计数器减 1
// Wait() - 阻塞直到计数器为 0
Once 单次执行
📝 单例模式与延迟初始化
package main
import (
"fmt"
"sync"
)
type Singleton struct {
data string
}
var (
instance *Singleton
once sync.Once
)
func GetInstance() *Singleton {
once.Do(func() {
instance = &Singleton{data: "singleton"}
fmt.Println("Singleton initialized")
})
return instance
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = GetInstance()
}()
}
wg.Wait()
// "Singleton initialized" 只打印一次
}
Cond 条件变量
📝 条件变量实现通知机制
package main
import (
"fmt"
"sync"
"time"
)
type Queue struct {
mu sync.Mutex
cond *sync.Cond
items []int
}
func NewQueue() *Queue {
q := &Queue{items: make([]int, 0)}
q.cond = sync.NewCond(&q.mu)
return q
}
func (q *Queue) Push(item int) {
q.mu.Lock()
defer q.mu.Unlock()
q.items = append(q.items, item)
q.cond.Signal() // 通知一个等待者
}
func (q *Queue) Pop() int {
q.mu.Lock()
defer q.mu.Unlock()
for len(q.items) == 0 {
q.cond.Wait() // 等待直到有数据
}
item := q.items[0]
q.items = q.items[1:]
return item
}
func main() {
queue := NewQueue()
go func() {
for i := 0; i < 5; i++ {
time.Sleep(time.Second)
queue.Push(i)
}
}()
go func() {
for {
item := queue.Pop()
fmt.Printf("Consumed: %d\n", item)
}
}()
time.Sleep(time.Second * 6)
}
Pool 对象池
📝 对象池减少 GC 压力
package main
import (
"bytes"
"fmt"
"sync"
)
var bufferPool = sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
}
func getBuffer() *bytes.Buffer {
return bufferPool.Get().(*bytes.Buffer)
}
func putBuffer(buf *bytes.Buffer) {
buf.Reset()
bufferPool.Put(buf)
}
func processData(data []byte) []byte {
buf := getBuffer()
defer putBuffer(buf)
buf.Write(data)
buf.WriteString(" - processed")
result := make([]byte, buf.Len())
buf.Read(result)
return result
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
result := processData([]byte(fmt.Sprintf("data-%d", id)))
_ = result
}(i)
}
wg.Wait()
}
sync.Map 并发 Map
📝 并发安全的 Map
package main
import (
"fmt"
"sync"
)
func main() {
var m sync.Map
// 存储
m.Store("key1", "value1")
m.Store("key2", 42)
// 读取
val, ok := m.Load("key1")
fmt.Printf("key1: %v, ok: %v\n", val, ok)
// 读取或删除
val, ok = m.LoadAndDelete("key2")
fmt.Printf("key2: %v, ok: %v\n", val, ok)
// 读取或存储
val, loaded := m.LoadOrStore("key3", "value3")
fmt.Printf("key3: %v, loaded: %v\n", val, loaded)
// 遍历
m.Range(func(key, value interface{}) bool {
fmt.Printf("%v: %v\n", key, value)
return true // 继续遍历
})
}
💡 sync.Map 使用场景
- 读多写少: 缓存场景,键稳定
- 键是整数: 每个 Goroutine 有不同 ID
- 多个 Goroutine 读写同一 Map
- 注意: 普通场景用 map+Mutex 性能更好
性能对比
📊 同步原语性能对比
// 基准测试结果 (Go 1.26, 参考值)
// Mutex 操作
// BenchmarkMutex-8 10000000 120 ns/op
// RWMutex (读)
// BenchmarkRWMutexRead-8 10000000 80 ns/op
// RWMutex (写)
// BenchmarkRWMutexWrite-8 5000000 200 ns/op
// WaitGroup
// BenchmarkWaitGroup-8 20000000 60 ns/op
// Once
// BenchmarkOnce-8 50000000 25 ns/op (已初始化后)
// Pool
// BenchmarkPool-8 10000000 100 ns/op
// sync.Map vs map+Mutex
// BenchmarkSyncMap-8 5000000 200 ns/op (读多写少)
// BenchmarkMapMutex-8 10000000 100 ns/op (普通场景)
总结
✅ 选择指南
| 场景 | 推荐原语 |
|---|---|
| 保护共享数据 | Mutex |
| 读多写少 | RWMutex |
| 等待任务完成 | WaitGroup |
| 单次初始化 | Once |
| 对象复用 | Pool |
| 条件等待 | Cond |
| 并发 Map (读多写少) | sync.Map |