← 返回并发编程 | Channel →

Goroutine - Go 并发编程基石

Goroutine 是 Go 运行时管理的轻量级线程,是 Go 并发模型的核心。理解 Goroutine 的调度机制和生命周期,是编写高效并发程序的基础。

📌 核心概念

🧵

轻量级

初始栈仅 2KB,可动态伸缩

10 万 + Goroutine/进程

高效调度

用户态调度,非抢占式

GMP 模型
🔄

并发执行

多核并行,自动负载均衡

GOMAXPROCS
💰

低成本

创建/切换开销极小

~1μs 创建

基础用法

📝 启动 Goroutine

package main

import (
    "fmt"
    "time"
)

// 1. 启动函数作为 Goroutine
func sayHello() {
    fmt.Println("Hello from Goroutine!")
}

// 2. 启动带参数的函数
func worker(id int, task string) {
    fmt.Printf("Worker %d processing: %s\n", id, task)
}

// 3. 使用匿名函数
func main() {
    // 方式 1: 直接启动函数
    go sayHello()
    
    // 方式 2: 带参数
    go worker(1, "task-1")
    
    // 方式 3: 匿名函数 (常用)
    go func(msg string) {
        fmt.Println(msg)
    }("匿名函数执行")
    
    // 方式 4: 闭包 (注意变量捕获)
    for i := 0; i < 3; i++ {
        go func() {
            fmt.Println(i) // ⚠️ 所有 Goroutine 都输出 3
        }()
    }
    
    // 方式 5: 闭包正确写法 (传参)
    for i := 0; i < 3; i++ {
        go func(n int) {
            fmt.Println(n) // ✅ 正确输出 0, 1, 2
        }(i)
    }
    
    // 等待 Goroutine 完成
    time.Sleep(time.Millisecond * 100)
}

⚠️ 常见错误:变量捕获

// ❌ 错误:所有 Goroutine 共享同一个 i
for i := 0; i < 10; i++ {
    go func() {
        fmt.Println(i) // 输出不确定,可能全是 10
    }()
}

// ✅ 正确:通过参数传递
for i := 0; i < 10; i++ {
    go func(n int) {
        fmt.Println(n) // 正确输出 0-9
    }(i)
}

// ✅ 正确:局部变量
for i := 0; i < 10; i++ {
    i := i // Go 1.22+ 不需要这行
    go func() {
        fmt.Println(i)
    }()
}

GMP 调度模型详解

G (Goroutine)
用户态协程
  • 初始栈:2KB
  • 最大栈:1GB
  • 结构体:g
P (Processor)
逻辑处理器
  • 本地队列:256 槽
  • 数量:GOMAXPROCS
  • 调度上下文
M (Machine)
系统线程
  • OS 线程映射
  • 执行 G
  • 系统调用分离

GMP 核心数据结构

📖 Go 运行时中的 GMP 结构

// runtime/runtime2.go - G (Goroutine) 结构 (简化)
type g struct {
    stack       stack   // 栈的起止地址
    goid        int64   // Goroutine ID
    sched       gobuf   // 调度上下文 (PC, SP 等)
    atomicstatus uint32 // 运行状态
    stackHigh   uintptr // 栈最高水位
    // ... 更多字段
}

// P (Processor) 结构 (简化)
type p struct {
    lock mutex
    status uint32
    link p
    id    int32
    mcache *mcache
    runq    gQueue  // G 队列 (256 个槽)
    runqhead uint32
    runqtail uint32
    // 本地运行队列统计
    schedtick   uint32  // 调度次数
    syscalltick uint32  // 系统调用次数
    m           *m      // 绑定的 M
    // ... 更多字段
}

// M (Machine) 结构 (简化)
type m struct {
    g0      *g     // 执行调度器时用的栈
    gsignal *g     // signal handling stack
    curg    *g     // 当前执行的 G
    p       puintptr // 绑定的 P
    nextp   puintptr
    id      int64
    // ... 更多字段
}

Goroutine 状态转换

_Gidle
已创建
_Grunnable
就绪态
_Grunning
运行态
_Gwaiting
等待态
_Grunnable
被唤醒
_Gdead
已结束
_Grunning
_Gsyscall
系统调用
_Grunnable

💡 GMP 调度要点

  • G 创建: 调用 go func() 时创建,放入 P 的本地队列
  • 调度时机: 函数调用、channel 操作、select、time.Sleep 等
  • 工作窃取: P 的本地队列为空时,从其他 P 或全局队列窃取 G
  • 系统调用: M 执行系统调用时,P 与 M 分离,其他 G 可继续执行
  • 抢占调度: Go 1.14+ 基于信号的抢占,避免长时运行阻塞

调度流程

Goroutine 调度流程

Goroutine 创建
放入 P 本地队列
等待调度
P 有空闲 M?
M 执行 G
G 运行完成/阻塞
本地队列空?
工作窃取
从其他 P 窃取

系统调用处理

G 执行系统调用
M 阻塞
P 与 M 分离
创建新 M
P 绑定新 M
继续执行其他 G
系统调用返回
G 回到就绪队列
等待调度

生产级并发模式

1. Worker Pool 模式

📝 固定大小的 Worker 池

package main

import (
    "fmt"
    "sync"
    "time"
)

// Job 定义任务
type Job struct {
    ID   int
    Data string
}

// Result 定义结果
type Result struct {
    JobID  int
    Output string
}

// Worker 处理任务
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job.ID)
        
        // 模拟处理
        time.Sleep(time.Millisecond * 100)
        
        results <- Result{
            JobID:  job.ID,
            Output: fmt.Sprintf("Processed: %s", job.Data),
        }
    }
}

func main() {
    const (
        workerCount = 5
        jobCount    = 20
    )
    
    jobs := make(chan Job, jobCount)
    results := make(chan Result, jobCount)
    
    var wg sync.WaitGroup
    
    // 启动 Worker 池
    for i := 1; i <= workerCount; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }
    
    // 发送任务
    for i := 1; i <= jobCount; i++ {
        jobs <- Job{ID: i, Data: fmt.Sprintf("task-%d", i)}
    }
    close(jobs)
    
    // 等待所有 Worker 完成
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 收集结果
    for result := range results {
        fmt.Printf("Result: Job %d → %s\n", result.JobID, result.Output)
    }
}

2. Fan-Out / Fan-In 模式

📝 多阶段并行处理

package main

import (
    "fmt"
    "sync"
)

// stage1: 数据生成 (Fan-Out)
func generate(nums []int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// stage2: 并行计算 (Fan-Out)
func square(in <-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    
    // 启动 3 个计算 Goroutine
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for n := range in {
                out <- n * n
            }
        }()
    }
    
    // 等待所有计算完成
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

// stage3: 结果收集 (Fan-In)
func main() {
    nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    
    // 管道式处理
    gen := generate(nums)
    sq := square(gen)
    
    // 收集结果
    for result := range sq {
        fmt.Println(result)
    }
}

3. 优雅关闭 Goroutine

📝 使用 done channel 控制生命周期

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"
)

// Worker 持续运行直到收到停止信号
func worker(ctx context.Context, id int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d shutting down\n", id)
            return
        default:
            // 执行任务
            fmt.Printf("Worker %d working...\n", id)
            time.Sleep(time.Second)
        }
    }
}

func main() {
    // 创建可取消的 Context
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // 启动 Worker
    for i := 1; i <= 3; i++ {
        go worker(ctx, i)
    }
    
    // 监听系统信号
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    
    // 等待退出信号
    <-sigChan
    fmt.Println("Received shutdown signal")
    
    // 取消 Context,通知所有 Worker 退出
    cancel()
    
    // 等待清理完成
    time.Sleep(time.Millisecond * 500)
    fmt.Println("Shutdown complete")
}

性能优化

Goroutine 泄漏检测

📊 检测和预防 Goroutine 泄漏

package main

import (
    "fmt"
    "runtime"
    "time"
)

// 获取当前 Goroutine 数量
func countGoroutines() int {
    return runtime.NumGoroutine()
}

// 泄漏示例:阻塞的 Goroutine
func leak() {
    ch := make(chan int)
    go func() {
        <-ch // 永久阻塞,无人发送
        fmt.Println("永远不会执行")
    }()
}

// 修复:使用超时
func noLeak() {
    ch := make(chan int)
    go func() {
        select {
        case <-ch:
            fmt.Println("收到数据")
        case <-time.After(time.Second):
            fmt.Println("超时退出")
        }
    }()
}

func main() {
    fmt.Printf("初始 Goroutine 数:%d\n", countGoroutines())
    
    // 模拟泄漏
    for i := 0; i < 100; i++ {
        leak()
    }
    
    time.Sleep(time.Millisecond * 100)
    fmt.Printf("泄漏后 Goroutine 数:%d\n", countGoroutines())
    
    // 修复后
    for i := 0; i < 100; i++ {
        noLeak()
    }
    
    time.Sleep(time.Second * 2)
    fmt.Printf("修复后 Goroutine 数:%d\n", countGoroutines())
}

📊 Goroutine 泄漏检测工具

  • runtime.NumGoroutine(): 实时监控 Goroutine 数量
  • pprof: go tool pprof http://localhost:6060/debug/pprof/goroutine
  • go tool trace: 可视化调度追踪
  • goleak: 测试包 go.uber.org/goleak

性能基准对比

📈 Goroutine vs 线程性能对比

// 基准测试:创建 10 万个并发单元

// Goroutine 版本
func BenchmarkGoroutine(b *testing.B) {
    for i := 0; i < b.N; i++ {
        done := make(chan struct{})
        go func() {
            // 模拟工作
            done <- struct{}{}
        }()
        <-done
    }
}

// 结果 (参考值):
// BenchmarkGoroutine-8    100000    1200 ns/op    2 KB/op

// 优化技巧:
// 1. 复用 Goroutine (Worker Pool)
// 2. 批量处理减少创建次数
// 3. 使用 sync.Pool 减少内存分配
// 4. 合理设置 GOMAXPROCS

💡 性能优化建议

  • 避免过度创建: 使用 Worker Pool 复用 Goroutine
  • 控制并发度: 限制同时运行的 Goroutine 数量
  • 减少锁竞争: 使用无锁数据结构或细粒度锁
  • 批量处理: 减少 Goroutine 间通信次数
  • 监控泄漏: 生产环境监控 Goroutine 数量

最佳实践总结

✅ Goroutine 使用原则

  • 明确生命周期: 确保 Goroutine 能正常退出
  • 使用 Context: 通过 Context 控制取消和超时
  • 避免泄漏: 确保 channel 有发送者/接收者
  • 错误处理: 使用 errgroup 或错误 channel
  • 资源清理: 使用 defer 确保资源释放
  • 变量捕获: 循环中通过参数传递变量

🚨 常见陷阱

  • 主 Goroutine 提前退出: 使用 WaitGroup 或 channel 等待
  • 共享数据竞争: 使用 mutex 或 channel 保护
  • 无限阻塞: 使用 select + timeout
  • 重复关闭 channel: 确保只关闭一次
  • 向关闭的 channel 发送: 检查 channel 状态