← 返回首页

并发

Goroutine

Goroutine 是 Go 语言轻量级的线程实现,由 Go 运行时管理。

package main

import (
    "fmt"
    "time"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    // 启动 goroutine
    go say("world")
    say("hello")
}

Goroutine 内部实现原理

1. Goroutine 的内部结构:

Goroutine 是 Go 运行时(runtime)管理的轻量级线程,其内部结构包含:

// Goroutine 的内部结构(简化版)
type g struct {
    stack       stack   // 栈内存
    stackguard0 uintptr // 栈溢出检查
    m           *m      // 关联的 M(Machine)
    sched       gobuf   // 调度信息
    goid        int64   // Goroutine ID
    gopc        uintptr // 创建该 goroutine 的 PC
    startpc     uintptr // goroutine 函数的起始 PC
    atomicstatus uint32 // 原子状态
    goid        int64   // Goroutine 唯一标识
}

2. M(Machine)结构:

M 代表操作系统线程,负责执行 goroutine:

// M 的内部结构(简化版)
type m struct {
    g0      *g      // 调度用的 goroutine
    curg    *g      // 当前运行的 goroutine
    p       *p      // 关联的 P(Processor)
    nextp   *p      // 下一个 P
    id      int64   // M 的 ID
    spinning bool   // 是否在自旋寻找工作
    blocked bool   // 是否阻塞
}

3. P(Processor)结构:

P 代表处理器,维护一个本地运行队列:

// P 的内部结构(简化版)
type p struct {
    id          int32
    status      uint32
    link        *p
    schedtick   uint32
    syscalltick uint32
    m           *m      // 关联的 M
    mcache      *mcache // 内存分配缓存
    runqhead    uint32
    runqtail    uint32
    runq        [256]guintptr // 本地运行队列
    runnext     guintptr // 下一个运行的 goroutine
}

4. GMP 调度模型:

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d 开始工作\n", id)
    time.Sleep(100 * time.Millisecond)
    fmt.Printf("Worker %d 完成工作\n", id)
}

func main() {
    // 设置使用的 P 数量
    runtime.GOMAXPROCS(4)
    
    fmt.Printf("CPU 核心数: %d\n", runtime.NumCPU())
    fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
    
    var wg sync.WaitGroup
    
    // 启动多个 goroutine
    for i := 1; i <= 10; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }
    
    wg.Wait()
    fmt.Println("所有 worker 完成")
}

5. Goroutine 调度过程:

package main

import (
    "fmt"
    "runtime"
)

func task(id int) {
    fmt.Printf("Task %d 在 P%d 上运行\n", id, runtime.GOMAXPROCS(0))
}

func main() {
    // Goroutine 调度过程:
    // 1. 创建 goroutine:go func() {}
    // 2. 将 goroutine 放入本地运行队列(P 的 runq)
    // 3. M 从 P 的 runq 中获取 goroutine 执行
    // 4. 如果 P 的 runq 为空,从全局队列或其他 P 窃取 goroutine
    // 5. Goroutine 执行完成或阻塞,M 继续执行下一个 goroutine
    
    for i := 1; i <= 5; i++ {
        go task(i)
    }
    
    // 等待 goroutine 完成
    select {}
}

6. 工作窃取(Work Stealing):

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func heavyTask(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    // 模拟不同负载的任务
    duration := time.Duration(50+id*10) * time.Millisecond
    time.Sleep(duration)
    
    fmt.Printf("Task %d 完成,耗时 %v\n", id, duration)
}

func main() {
    runtime.GOMAXPROCS(2) // 使用 2 个 P
    
    var wg sync.WaitGroup
    
    // 创建不同负载的任务
    for i := 1; i <= 10; i++ {
        wg.Add(1)
        go heavyTask(i, &wg)
    }
    
    wg.Wait()
    fmt.Println("所有任务完成")
    
    // 工作窃取机制:
    // 1. P1 的任务较多,P2 的任务较少
    // 2. P2 完成自己的任务后,会从 P1 窃取任务执行
    // 3. 这实现了负载均衡,充分利用 CPU 资源
}

7. Goroutine 栈管理:

package main

import (
    "fmt"
    "runtime"
)

func recursive(depth int) {
    var buf [1024]byte // 分配 1KB 栈空间
    
    if depth < 100 {
        fmt.Printf("递归深度: %d, 栈地址: %p\n", depth, &buf)
        recursive(depth + 1)
    }
}

func main() {
    fmt.Printf("初始栈大小: %d KB\n", runtime.GoroutineStack()/1024)
    
    // Goroutine 栈管理特点:
    // 1. 初始栈大小:2KB(Go 1.4+)
    // 2. 动态增长:栈空间不足时自动扩容
    // 3. 最大栈大小:1GB(可配置)
    // 4. 栈拷贝:扩容时将旧栈内容拷贝到新栈
    
    go recursive(0)
    
    select {}
}

8. Goroutine 状态转换:

package main

import (
    "fmt"
    "runtime"
    "time"
)

func demonstrateStates() {
    fmt.Println("Goroutine 状态转换:")
    fmt.Println("_Gidle: 刚创建,未初始化")
    fmt.Println("_Grunnable: 在运行队列中,等待执行")
    fmt.Println("_Grunning: 正在执行")
    fmt.Println("_Gsyscall: 正在执行系统调用")
    fmt.Println("_Gwaiting: 等待(channel、sleep 等)")
    fmt.Println("_Gdead: 已退出")
}

func main() {
    go demonstrateStates()
    
    // Goroutine 状态转换示例:
    // _Gidle -> _Grunnable: 创建 goroutine
    // _Grunnable -> _Grunning: 被调度执行
    // _Grunning -> _Gwaiting: 等待 channel 或 sleep
    // _Gwaiting -> _Grunnable: 唤醒后重新进入队列
    // _Grunning -> _Gdead: 执行完成
    
    time.Sleep(100 * time.Millisecond)
}

Goroutine 最佳实践

1. 合理控制 Goroutine 数量:

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

// 使用 worker pool 模式限制 goroutine 数量
type WorkerPool struct {
    tasks   chan int
    workers int
    wg      sync.WaitGroup
}

func NewWorkerPool(workers int) *WorkerPool {
    return &WorkerPool{
        tasks:   make(chan int, 100),
        workers: workers,
    }
}

func (wp *WorkerPool) Start() {
    for i := 0; i < wp.workers; i++ {
        wp.wg.Add(1)
        go wp.worker(i)
    }
}

func (wp *WorkerPool) worker(id int) {
    defer wp.wg.Done()
    for task := range wp.tasks {
        fmt.Printf("Worker %d 处理任务 %d\n", id, task)
        time.Sleep(100 * time.Millisecond)
    }
}

func (wp *WorkerPool) Submit(task int) {
    wp.tasks <- task
}

func (wp *WorkerPool) Stop() {
    close(wp.tasks)
    wp.wg.Wait()
}

func main() {
    pool := NewWorkerPool(5) // 限制为 5 个 worker
    pool.Start()
    
    // 提交 20 个任务
    for i := 1; i <= 20; i++ {
        pool.Submit(i)
    }
    
    pool.Stop()
    fmt.Println("所有任务完成")
}

2. 避免 Goroutine 泄露:

package main

import (
    "fmt"
    "time"
)

// 错误示例:goroutine 泄露
func leak() {
    ch := make(chan int)
    go func() {
        val := <-ch // 永远阻塞,goroutine 泄露
        fmt.Println(val)
    }()
}

// 正确示例:使用 context 控制
import "context"

func noLeak(ctx context.Context) {
    ch := make(chan int)
    go func() {
        select {
        case val := <-ch:
            fmt.Println(val)
        case <-ctx.Done():
            fmt.Println("goroutine 被取消")
            return
        }
    }()
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    
    noLeak(ctx)
    time.Sleep(3 * time.Second)
}

3. 使用 WaitGroup 等待 Goroutine:

package main

import (
    "fmt"
    "sync"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d 完成\n", id)
}

func main() {
    var wg sync.WaitGroup
    
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }
    
    wg.Wait()
    fmt.Println("所有 worker 完成")
}

4. 使用 ErrGroup 处理错误:

package main

import (
    "errors"
    "fmt"
    "golang.org/x/sync/errgroup"
)

func task(name string, shouldFail bool) error {
    fmt.Printf("Task %s 开始\n", name)
    if shouldFail {
        return errors.New("task failed")
    }
    fmt.Printf("Task %s 完成\n", name)
    return nil
}

func main() {
    var g errgroup.Group
    
    g.Go(func() error {
        return task("A", false)
    })
    
    g.Go(func() error {
        return task("B", true) // 这个会失败
    })
    
    g.Go(func() error {
        return task("C", false)
    })
    
    if err := g.Wait(); err != nil {
        fmt.Println("错误:", err)
    }
}

5. 使用 Channel 进行通信:

package main

import (
    "fmt"
    "time"
)

func producer(ch chan<- int) {
    for i := 1; i <= 5; i++ {
        ch <- i
        fmt.Printf("生产: %d\n", i)
        time.Sleep(100 * time.Millisecond)
    }
    close(ch)
}

func consumer(ch <-chan int) {
    for val := range ch {
        fmt.Printf("消费: %d\n", val)
        time.Sleep(150 * time.Millisecond)
    }
}

func main() {
    ch := make(chan int, 3)
    
    go producer(ch)
    go consumer(ch)
    
    time.Sleep(2 * time.Second)
}

6. 使用 Context 取消 Goroutine:

package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context, id int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d 被取消\n", id)
            return
        default:
            fmt.Printf("Worker %d 工作中...\n", id)
            time.Sleep(500 * time.Millisecond)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    
    for i := 1; i <= 3; i++ {
        go worker(ctx, i)
    }
    
    time.Sleep(2 * time.Second)
    cancel() // 取消所有 goroutine
    
    time.Sleep(1 * time.Second)
}

7. 使用 Mutex 保护共享资源:

package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func main() {
    var counter Counter
    var wg sync.WaitGroup
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }
    
    wg.Wait()
    fmt.Printf("最终值: %d\n", counter.Value())
}

8. 性能优化技巧:

  • 避免过度创建 goroutine:使用 worker pool 模式
  • 合理设置 GOMAXPROCS:根据 CPU 核心数调整
  • 使用缓冲 channel:减少阻塞
  • 避免锁竞争:尽量使用 channel 通信
  • 使用 sync.Pool:重用对象,减少 GC 压力
  • 监控 goroutine 数量:使用 runtime.NumGoroutine()
package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func monitorGoroutines() {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()
    
    for range ticker.C {
        fmt.Printf("当前 goroutine 数量: %d\n", runtime.NumGoroutine())
    }
}

func main() {
    go monitorGoroutines()
    
    var wg sync.WaitGroup
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            time.Sleep(2 * time.Second)
        }()
    }
    
    wg.Wait()
    fmt.Println("所有 goroutine 完成")
}

Channel

Channel 是 goroutine 之间通信的管道。

package main

import "fmt"

func sum(s []int, c chan int) {
    sum := 0
    for _, v := range s {
        sum += v
    }
    c <- sum // 将和发送到 channel
}

func main() {
    s := []int{7, 2, 8, -9, 4, 0}

    c := make(chan int)
    go sum(s[:len(s)/2], c)
    go sum(s[len(s)/2:], c)
    x, y := <-c, <-c // 从 channel 接收

    fmt.Println(x, y, x+y)
}

Buffered Channel

package main

import "fmt"

func main() {
    // 创建带缓冲的 channel
    ch := make(chan int, 2)

    ch <- 1
    ch <- 2

    fmt.Println(<-ch)
    fmt.Println(<-ch)
}

Range 和 Close

package main

import "fmt"

func fibonacci(n int, c chan int) {
    x, y := 0, 1
    for i := 0; i < n; i++ {
        c <- x
        x, y = y, x+y
    }
    close(c) // 关闭 channel
}

func main() {
    c := make(chan int, 10)
    go fibonacci(cap(c), c)

    // range 会持续从 channel 接收,直到 channel 被关闭
    for i := range c {
        fmt.Println(i)
    }
}

Select

Select 语句让 goroutine 可以等待多个通信操作。

package main

import "fmt"

func fibonacci(c, quit chan int) {
    x, y := 0, 1
    for {
        select {
        case c <- x:
            x, y = y, x+y
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

func main() {
    c := make(chan int)
    quit := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        }
        quit <- 0
    }()

    fibonacci(c, quit)
}

默认选择

package main

import (
    "fmt"
    "time"
)

func main() {
    tick := time.Tick(100 * time.Millisecond)
    boom := time.After(500 * time.Millisecond)

    for {
        select {
        case <-tick:
            fmt.Println("tick.")
        case <-boom:
            fmt.Println("BOOM!")
            return
        default:
            fmt.Println("    .")
            time.Sleep(50 * time.Millisecond)
        }
    }
}

互斥锁(Mutex)

package main

import (
    "fmt"
    "sync"
    "time"
)

// SafeCounter 是并发安全的计数器
type SafeCounter struct {
    mu sync.Mutex
    v  map[string]int
}

func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    // Lock 之后同一时刻只有一个 goroutine 能访问 c.v
    c.v[key]++
    c.mu.Unlock()
}

func (c *SafeCounter) Value(key string) int {
    c.mu.Lock()
    // Lock 之后同一时刻只有一个 goroutine 能访问 c.v
    defer c.mu.Unlock()
    return c.v[key]
}

func main() {
    c := SafeCounter{v: make(map[string]int)}

    for i := 0; i < 1000; i++ {
        go c.Inc("somekey")
    }

    time.Sleep(time.Second)
    fmt.Println(c.Value("somekey"))
}

WaitGroup

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Printf("Worker %d 开始工作\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d 完成工作\n", id)
}

func main() {
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }

    wg.Wait()
    fmt.Println("所有 worker 完成")
}

Context

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    // 创建一个可取消的 context
    ctx, cancel := context.WithCancel(context.Background())

    go func() {
        for {
            select {
            case <-ctx.Done():
                fmt.Println("goroutine 被取消")
                return
            default:
                fmt.Println("goroutine 运行中")
                time.Sleep(500 * time.Millisecond)
            }
        }
    }()

    time.Sleep(2 * time.Second)
    fmt.Println("取消 goroutine")
    cancel()
    time.Sleep(time.Second)
}

超时控制

package main

import (
    "context"
    "fmt"
    "time"
)

func longRunningTask(ctx context.Context) {
    select {
    case <-time.After(5 * time.Second):
        fmt.Println("任务完成")
    case <-ctx.Done():
        fmt.Println("任务超时:", ctx.Err())
    }
}

func main() {
    // 设置 2 秒超时
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    longRunningTask(ctx)
}