Goroutine - Go 并发编程基石
Goroutine 是 Go 运行时管理的轻量级线程,是 Go 并发模型的核心。理解 Goroutine 的调度机制和生命周期,是编写高效并发程序的基础。
📌 核心概念
🧵
轻量级
初始栈仅 2KB,可动态伸缩
10 万 + Goroutine/进程
⚡
高效调度
用户态调度,非抢占式
GMP 模型
🔄
并发执行
多核并行,自动负载均衡
GOMAXPROCS
💰
低成本
创建/切换开销极小
~1μs 创建
基础用法
📝 启动 Goroutine
package main
import (
"fmt"
"time"
)
// 1. 启动函数作为 Goroutine
func sayHello() {
fmt.Println("Hello from Goroutine!")
}
// 2. 启动带参数的函数
func worker(id int, task string) {
fmt.Printf("Worker %d processing: %s\n", id, task)
}
// 3. 使用匿名函数
func main() {
// 方式 1: 直接启动函数
go sayHello()
// 方式 2: 带参数
go worker(1, "task-1")
// 方式 3: 匿名函数 (常用)
go func(msg string) {
fmt.Println(msg)
}("匿名函数执行")
// 方式 4: 闭包 (注意变量捕获)
for i := 0; i < 3; i++ {
go func() {
fmt.Println(i) // ⚠️ 所有 Goroutine 都输出 3
}()
}
// 方式 5: 闭包正确写法 (传参)
for i := 0; i < 3; i++ {
go func(n int) {
fmt.Println(n) // ✅ 正确输出 0, 1, 2
}(i)
}
// 等待 Goroutine 完成
time.Sleep(time.Millisecond * 100)
}
⚠️ 常见错误:变量捕获
// ❌ 错误:所有 Goroutine 共享同一个 i
for i := 0; i < 10; i++ {
go func() {
fmt.Println(i) // 输出不确定,可能全是 10
}()
}
// ✅ 正确:通过参数传递
for i := 0; i < 10; i++ {
go func(n int) {
fmt.Println(n) // 正确输出 0-9
}(i)
}
// ✅ 正确:局部变量
for i := 0; i < 10; i++ {
i := i // Go 1.22+ 不需要这行
go func() {
fmt.Println(i)
}()
}
GMP 调度模型详解
G (Goroutine)
用户态协程
- 初始栈:2KB
- 最大栈:1GB
- 结构体:g
→
P (Processor)
逻辑处理器
- 本地队列:256 槽
- 数量:GOMAXPROCS
- 调度上下文
→
M (Machine)
系统线程
- OS 线程映射
- 执行 G
- 系统调用分离
GMP 核心数据结构
📖 Go 运行时中的 GMP 结构
// runtime/runtime2.go - G (Goroutine) 结构 (简化)
type g struct {
stack stack // 栈的起止地址
goid int64 // Goroutine ID
sched gobuf // 调度上下文 (PC, SP 等)
atomicstatus uint32 // 运行状态
stackHigh uintptr // 栈最高水位
// ... 更多字段
}
// P (Processor) 结构 (简化)
type p struct {
lock mutex
status uint32
link p
id int32
mcache *mcache
runq gQueue // G 队列 (256 个槽)
runqhead uint32
runqtail uint32
// 本地运行队列统计
schedtick uint32 // 调度次数
syscalltick uint32 // 系统调用次数
m *m // 绑定的 M
// ... 更多字段
}
// M (Machine) 结构 (简化)
type m struct {
g0 *g // 执行调度器时用的栈
gsignal *g // signal handling stack
curg *g // 当前执行的 G
p puintptr // 绑定的 P
nextp puintptr
id int64
// ... 更多字段
}
Goroutine 状态转换
_Gidle
已创建
已创建
→
_Grunnable
就绪态
就绪态
→
_Grunning
运行态
运行态
→
_Gwaiting
等待态
等待态
↔
_Grunnable
被唤醒
被唤醒
→
_Gdead
已结束
已结束
_Grunning
→
_Gsyscall
系统调用
系统调用
→
_Grunnable
💡 GMP 调度要点
- G 创建: 调用
go func()时创建,放入 P 的本地队列 - 调度时机: 函数调用、channel 操作、select、time.Sleep 等
- 工作窃取: P 的本地队列为空时,从其他 P 或全局队列窃取 G
- 系统调用: M 执行系统调用时,P 与 M 分离,其他 G 可继续执行
- 抢占调度: Go 1.14+ 基于信号的抢占,避免长时运行阻塞
调度流程
Goroutine 调度流程
Goroutine 创建
→
放入 P 本地队列
→
等待调度
P 有空闲 M?
→
M 执行 G
→
G 运行完成/阻塞
本地队列空?
→
工作窃取
→
从其他 P 窃取
系统调用处理
G 执行系统调用
→
M 阻塞
→
P 与 M 分离
创建新 M
→
P 绑定新 M
→
继续执行其他 G
系统调用返回
→
G 回到就绪队列
→
等待调度
生产级并发模式
1. Worker Pool 模式
📝 固定大小的 Worker 池
package main
import (
"fmt"
"sync"
"time"
)
// Job 定义任务
type Job struct {
ID int
Data string
}
// Result 定义结果
type Result struct {
JobID int
Output string
}
// Worker 处理任务
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job.ID)
// 模拟处理
time.Sleep(time.Millisecond * 100)
results <- Result{
JobID: job.ID,
Output: fmt.Sprintf("Processed: %s", job.Data),
}
}
}
func main() {
const (
workerCount = 5
jobCount = 20
)
jobs := make(chan Job, jobCount)
results := make(chan Result, jobCount)
var wg sync.WaitGroup
// 启动 Worker 池
for i := 1; i <= workerCount; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// 发送任务
for i := 1; i <= jobCount; i++ {
jobs <- Job{ID: i, Data: fmt.Sprintf("task-%d", i)}
}
close(jobs)
// 等待所有 Worker 完成
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Printf("Result: Job %d → %s\n", result.JobID, result.Output)
}
}
2. Fan-Out / Fan-In 模式
📝 多阶段并行处理
package main
import (
"fmt"
"sync"
)
// stage1: 数据生成 (Fan-Out)
func generate(nums []int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// stage2: 并行计算 (Fan-Out)
func square(in <-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
// 启动 3 个计算 Goroutine
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for n := range in {
out <- n * n
}
}()
}
// 等待所有计算完成
go func() {
wg.Wait()
close(out)
}()
return out
}
// stage3: 结果收集 (Fan-In)
func main() {
nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
// 管道式处理
gen := generate(nums)
sq := square(gen)
// 收集结果
for result := range sq {
fmt.Println(result)
}
}
3. 优雅关闭 Goroutine
📝 使用 done channel 控制生命周期
package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
// Worker 持续运行直到收到停止信号
func worker(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d shutting down\n", id)
return
default:
// 执行任务
fmt.Printf("Worker %d working...\n", id)
time.Sleep(time.Second)
}
}
}
func main() {
// 创建可取消的 Context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 启动 Worker
for i := 1; i <= 3; i++ {
go worker(ctx, i)
}
// 监听系统信号
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// 等待退出信号
<-sigChan
fmt.Println("Received shutdown signal")
// 取消 Context,通知所有 Worker 退出
cancel()
// 等待清理完成
time.Sleep(time.Millisecond * 500)
fmt.Println("Shutdown complete")
}
性能优化
Goroutine 泄漏检测
📊 检测和预防 Goroutine 泄漏
package main
import (
"fmt"
"runtime"
"time"
)
// 获取当前 Goroutine 数量
func countGoroutines() int {
return runtime.NumGoroutine()
}
// 泄漏示例:阻塞的 Goroutine
func leak() {
ch := make(chan int)
go func() {
<-ch // 永久阻塞,无人发送
fmt.Println("永远不会执行")
}()
}
// 修复:使用超时
func noLeak() {
ch := make(chan int)
go func() {
select {
case <-ch:
fmt.Println("收到数据")
case <-time.After(time.Second):
fmt.Println("超时退出")
}
}()
}
func main() {
fmt.Printf("初始 Goroutine 数:%d\n", countGoroutines())
// 模拟泄漏
for i := 0; i < 100; i++ {
leak()
}
time.Sleep(time.Millisecond * 100)
fmt.Printf("泄漏后 Goroutine 数:%d\n", countGoroutines())
// 修复后
for i := 0; i < 100; i++ {
noLeak()
}
time.Sleep(time.Second * 2)
fmt.Printf("修复后 Goroutine 数:%d\n", countGoroutines())
}
📊 Goroutine 泄漏检测工具
- runtime.NumGoroutine(): 实时监控 Goroutine 数量
- pprof:
go tool pprof http://localhost:6060/debug/pprof/goroutine - go tool trace: 可视化调度追踪
- goleak: 测试包
go.uber.org/goleak
性能基准对比
📈 Goroutine vs 线程性能对比
// 基准测试:创建 10 万个并发单元
// Goroutine 版本
func BenchmarkGoroutine(b *testing.B) {
for i := 0; i < b.N; i++ {
done := make(chan struct{})
go func() {
// 模拟工作
done <- struct{}{}
}()
<-done
}
}
// 结果 (参考值):
// BenchmarkGoroutine-8 100000 1200 ns/op 2 KB/op
// 优化技巧:
// 1. 复用 Goroutine (Worker Pool)
// 2. 批量处理减少创建次数
// 3. 使用 sync.Pool 减少内存分配
// 4. 合理设置 GOMAXPROCS
💡 性能优化建议
- 避免过度创建: 使用 Worker Pool 复用 Goroutine
- 控制并发度: 限制同时运行的 Goroutine 数量
- 减少锁竞争: 使用无锁数据结构或细粒度锁
- 批量处理: 减少 Goroutine 间通信次数
- 监控泄漏: 生产环境监控 Goroutine 数量
最佳实践总结
✅ Goroutine 使用原则
- 明确生命周期: 确保 Goroutine 能正常退出
- 使用 Context: 通过 Context 控制取消和超时
- 避免泄漏: 确保 channel 有发送者/接收者
- 错误处理: 使用 errgroup 或错误 channel
- 资源清理: 使用 defer 确保资源释放
- 变量捕获: 循环中通过参数传递变量
🚨 常见陷阱
- 主 Goroutine 提前退出: 使用 WaitGroup 或 channel 等待
- 共享数据竞争: 使用 mutex 或 channel 保护
- 无限阻塞: 使用 select + timeout
- 重复关闭 channel: 确保只关闭一次
- 向关闭的 channel 发送: 检查 channel 状态