并发
Goroutine
Goroutine 是 Go 语言轻量级的线程实现,由 Go 运行时管理。
package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func main() {
// 启动 goroutine
go say("world")
say("hello")
}
Goroutine 内部实现原理
1. Goroutine 的内部结构:
Goroutine 是 Go 运行时(runtime)管理的轻量级线程,其内部结构包含:
// Goroutine 的内部结构(简化版)
type g struct {
stack stack // 栈内存
stackguard0 uintptr // 栈溢出检查
m *m // 关联的 M(Machine)
sched gobuf // 调度信息
goid int64 // Goroutine ID
gopc uintptr // 创建该 goroutine 的 PC
startpc uintptr // goroutine 函数的起始 PC
atomicstatus uint32 // 原子状态
goid int64 // Goroutine 唯一标识
}
2. M(Machine)结构:
M 代表操作系统线程,负责执行 goroutine:
// M 的内部结构(简化版)
type m struct {
g0 *g // 调度用的 goroutine
curg *g // 当前运行的 goroutine
p *p // 关联的 P(Processor)
nextp *p // 下一个 P
id int64 // M 的 ID
spinning bool // 是否在自旋寻找工作
blocked bool // 是否阻塞
}
3. P(Processor)结构:
P 代表处理器,维护一个本地运行队列:
// P 的内部结构(简化版)
type p struct {
id int32
status uint32
link *p
schedtick uint32
syscalltick uint32
m *m // 关联的 M
mcache *mcache // 内存分配缓存
runqhead uint32
runqtail uint32
runq [256]guintptr // 本地运行队列
runnext guintptr // 下一个运行的 goroutine
}
4. GMP 调度模型:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d 开始工作\n", id)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Worker %d 完成工作\n", id)
}
func main() {
// 设置使用的 P 数量
runtime.GOMAXPROCS(4)
fmt.Printf("CPU 核心数: %d\n", runtime.NumCPU())
fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
var wg sync.WaitGroup
// 启动多个 goroutine
for i := 1; i <= 10; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("所有 worker 完成")
}
5. Goroutine 调度过程:
package main
import (
"fmt"
"runtime"
)
func task(id int) {
fmt.Printf("Task %d 在 P%d 上运行\n", id, runtime.GOMAXPROCS(0))
}
func main() {
// Goroutine 调度过程:
// 1. 创建 goroutine:go func() {}
// 2. 将 goroutine 放入本地运行队列(P 的 runq)
// 3. M 从 P 的 runq 中获取 goroutine 执行
// 4. 如果 P 的 runq 为空,从全局队列或其他 P 窃取 goroutine
// 5. Goroutine 执行完成或阻塞,M 继续执行下一个 goroutine
for i := 1; i <= 5; i++ {
go task(i)
}
// 等待 goroutine 完成
select {}
}
6. 工作窃取(Work Stealing):
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func heavyTask(id int, wg *sync.WaitGroup) {
defer wg.Done()
// 模拟不同负载的任务
duration := time.Duration(50+id*10) * time.Millisecond
time.Sleep(duration)
fmt.Printf("Task %d 完成,耗时 %v\n", id, duration)
}
func main() {
runtime.GOMAXPROCS(2) // 使用 2 个 P
var wg sync.WaitGroup
// 创建不同负载的任务
for i := 1; i <= 10; i++ {
wg.Add(1)
go heavyTask(i, &wg)
}
wg.Wait()
fmt.Println("所有任务完成")
// 工作窃取机制:
// 1. P1 的任务较多,P2 的任务较少
// 2. P2 完成自己的任务后,会从 P1 窃取任务执行
// 3. 这实现了负载均衡,充分利用 CPU 资源
}
7. Goroutine 栈管理:
package main
import (
"fmt"
"runtime"
)
func recursive(depth int) {
var buf [1024]byte // 分配 1KB 栈空间
if depth < 100 {
fmt.Printf("递归深度: %d, 栈地址: %p\n", depth, &buf)
recursive(depth + 1)
}
}
func main() {
fmt.Printf("初始栈大小: %d KB\n", runtime.GoroutineStack()/1024)
// Goroutine 栈管理特点:
// 1. 初始栈大小:2KB(Go 1.4+)
// 2. 动态增长:栈空间不足时自动扩容
// 3. 最大栈大小:1GB(可配置)
// 4. 栈拷贝:扩容时将旧栈内容拷贝到新栈
go recursive(0)
select {}
}
8. Goroutine 状态转换:
package main
import (
"fmt"
"runtime"
"time"
)
func demonstrateStates() {
fmt.Println("Goroutine 状态转换:")
fmt.Println("_Gidle: 刚创建,未初始化")
fmt.Println("_Grunnable: 在运行队列中,等待执行")
fmt.Println("_Grunning: 正在执行")
fmt.Println("_Gsyscall: 正在执行系统调用")
fmt.Println("_Gwaiting: 等待(channel、sleep 等)")
fmt.Println("_Gdead: 已退出")
}
func main() {
go demonstrateStates()
// Goroutine 状态转换示例:
// _Gidle -> _Grunnable: 创建 goroutine
// _Grunnable -> _Grunning: 被调度执行
// _Grunning -> _Gwaiting: 等待 channel 或 sleep
// _Gwaiting -> _Grunnable: 唤醒后重新进入队列
// _Grunning -> _Gdead: 执行完成
time.Sleep(100 * time.Millisecond)
}
Goroutine 最佳实践
1. 合理控制 Goroutine 数量:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// 使用 worker pool 模式限制 goroutine 数量
type WorkerPool struct {
tasks chan int
workers int
wg sync.WaitGroup
}
func NewWorkerPool(workers int) *WorkerPool {
return &WorkerPool{
tasks: make(chan int, 100),
workers: workers,
}
}
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workers; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for task := range wp.tasks {
fmt.Printf("Worker %d 处理任务 %d\n", id, task)
time.Sleep(100 * time.Millisecond)
}
}
func (wp *WorkerPool) Submit(task int) {
wp.tasks <- task
}
func (wp *WorkerPool) Stop() {
close(wp.tasks)
wp.wg.Wait()
}
func main() {
pool := NewWorkerPool(5) // 限制为 5 个 worker
pool.Start()
// 提交 20 个任务
for i := 1; i <= 20; i++ {
pool.Submit(i)
}
pool.Stop()
fmt.Println("所有任务完成")
}
2. 避免 Goroutine 泄露:
package main
import (
"fmt"
"time"
)
// 错误示例:goroutine 泄露
func leak() {
ch := make(chan int)
go func() {
val := <-ch // 永远阻塞,goroutine 泄露
fmt.Println(val)
}()
}
// 正确示例:使用 context 控制
import "context"
func noLeak(ctx context.Context) {
ch := make(chan int)
go func() {
select {
case val := <-ch:
fmt.Println(val)
case <-ctx.Done():
fmt.Println("goroutine 被取消")
return
}
}()
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
noLeak(ctx)
time.Sleep(3 * time.Second)
}
3. 使用 WaitGroup 等待 Goroutine:
package main
import (
"fmt"
"sync"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d 完成\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("所有 worker 完成")
}
4. 使用 ErrGroup 处理错误:
package main
import (
"errors"
"fmt"
"golang.org/x/sync/errgroup"
)
func task(name string, shouldFail bool) error {
fmt.Printf("Task %s 开始\n", name)
if shouldFail {
return errors.New("task failed")
}
fmt.Printf("Task %s 完成\n", name)
return nil
}
func main() {
var g errgroup.Group
g.Go(func() error {
return task("A", false)
})
g.Go(func() error {
return task("B", true) // 这个会失败
})
g.Go(func() error {
return task("C", false)
})
if err := g.Wait(); err != nil {
fmt.Println("错误:", err)
}
}
5. 使用 Channel 进行通信:
package main
import (
"fmt"
"time"
)
func producer(ch chan<- int) {
for i := 1; i <= 5; i++ {
ch <- i
fmt.Printf("生产: %d\n", i)
time.Sleep(100 * time.Millisecond)
}
close(ch)
}
func consumer(ch <-chan int) {
for val := range ch {
fmt.Printf("消费: %d\n", val)
time.Sleep(150 * time.Millisecond)
}
}
func main() {
ch := make(chan int, 3)
go producer(ch)
go consumer(ch)
time.Sleep(2 * time.Second)
}
6. 使用 Context 取消 Goroutine:
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d 被取消\n", id)
return
default:
fmt.Printf("Worker %d 工作中...\n", id)
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
for i := 1; i <= 3; i++ {
go worker(ctx, i)
}
time.Sleep(2 * time.Second)
cancel() // 取消所有 goroutine
time.Sleep(1 * time.Second)
}
7. 使用 Mutex 保护共享资源:
package main
import (
"fmt"
"sync"
)
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
var counter Counter
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Printf("最终值: %d\n", counter.Value())
}
8. 性能优化技巧:
- 避免过度创建 goroutine:使用 worker pool 模式
- 合理设置 GOMAXPROCS:根据 CPU 核心数调整
- 使用缓冲 channel:减少阻塞
- 避免锁竞争:尽量使用 channel 通信
- 使用 sync.Pool:重用对象,减少 GC 压力
- 监控 goroutine 数量:使用 runtime.NumGoroutine()
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func monitorGoroutines() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
fmt.Printf("当前 goroutine 数量: %d\n", runtime.NumGoroutine())
}
}
func main() {
go monitorGoroutines()
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(2 * time.Second)
}()
}
wg.Wait()
fmt.Println("所有 goroutine 完成")
}
Channel
Channel 是 goroutine 之间通信的管道。
package main
import "fmt"
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum // 将和发送到 channel
}
func main() {
s := []int{7, 2, 8, -9, 4, 0}
c := make(chan int)
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
x, y := <-c, <-c // 从 channel 接收
fmt.Println(x, y, x+y)
}
Buffered Channel
package main
import "fmt"
func main() {
// 创建带缓冲的 channel
ch := make(chan int, 2)
ch <- 1
ch <- 2
fmt.Println(<-ch)
fmt.Println(<-ch)
}
Range 和 Close
package main
import "fmt"
func fibonacci(n int, c chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
close(c) // 关闭 channel
}
func main() {
c := make(chan int, 10)
go fibonacci(cap(c), c)
// range 会持续从 channel 接收,直到 channel 被关闭
for i := range c {
fmt.Println(i)
}
}
Select
Select 语句让 goroutine 可以等待多个通信操作。
package main
import "fmt"
func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}
默认选择
package main
import (
"fmt"
"time"
)
func main() {
tick := time.Tick(100 * time.Millisecond)
boom := time.After(500 * time.Millisecond)
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return
default:
fmt.Println(" .")
time.Sleep(50 * time.Millisecond)
}
}
}
互斥锁(Mutex)
package main
import (
"fmt"
"sync"
"time"
)
// SafeCounter 是并发安全的计数器
type SafeCounter struct {
mu sync.Mutex
v map[string]int
}
func (c *SafeCounter) Inc(key string) {
c.mu.Lock()
// Lock 之后同一时刻只有一个 goroutine 能访问 c.v
c.v[key]++
c.mu.Unlock()
}
func (c *SafeCounter) Value(key string) int {
c.mu.Lock()
// Lock 之后同一时刻只有一个 goroutine 能访问 c.v
defer c.mu.Unlock()
return c.v[key]
}
func main() {
c := SafeCounter{v: make(map[string]int)}
for i := 0; i < 1000; i++ {
go c.Inc("somekey")
}
time.Sleep(time.Second)
fmt.Println(c.Value("somekey"))
}
WaitGroup
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d 开始工作\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d 完成工作\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("所有 worker 完成")
}
Context
package main
import (
"context"
"fmt"
"time"
)
func main() {
// 创建一个可取消的 context
ctx, cancel := context.WithCancel(context.Background())
go func() {
for {
select {
case <-ctx.Done():
fmt.Println("goroutine 被取消")
return
default:
fmt.Println("goroutine 运行中")
time.Sleep(500 * time.Millisecond)
}
}
}()
time.Sleep(2 * time.Second)
fmt.Println("取消 goroutine")
cancel()
time.Sleep(time.Second)
}
超时控制
package main
import (
"context"
"fmt"
"time"
)
func longRunningTask(ctx context.Context) {
select {
case <-time.After(5 * time.Second):
fmt.Println("任务完成")
case <-ctx.Done():
fmt.Println("任务超时:", ctx.Err())
}
}
func main() {
// 设置 2 秒超时
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
longRunningTask(ctx)
}