← 返回首页

并发编程

Go 语言的并发模型是其最强大的特性之一,通过 Goroutine 和 Channel 实现高效的并发编程。

Goroutine

轻量级线程,由 Go 运行时管理

📨

Channel

Goroutine 之间的通信机制

🔀

Select

多通道多路复用

🔒

Mutex

同步原语保证数据安全

Goroutine

Goroutine 是 Go 语言轻量级的线程实现,由 Go 运行时管理。

package main

import (
    "fmt"
    "time"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    // 启动 goroutine
    go say("world")
    say("hello")
}

Goroutine 内部实现原理

1. Goroutine 的内部结构:

Goroutine 是 Go 运行时(runtime)管理的轻量级线程,其内部结构包含:

// Goroutine 的内部结构(简化版)
type g struct {
    stack       stack   // 栈内存
    stackguard0 uintptr // 栈溢出检查
    m           *m      // 关联的 M(Machine)
    sched       gobuf   // 调度信息
    goid        int64   // Goroutine ID
    gopc        uintptr // 创建该 goroutine 的 PC
    startpc     uintptr // goroutine 函数的起始 PC
    atomicstatus uint32 // 原子状态
    goid        int64   // Goroutine 唯一标识
}

2. M(Machine)结构:

M 代表操作系统线程,负责执行 goroutine:

// M 的内部结构(简化版)
type m struct {
    g0      *g      // 调度用的 goroutine
    curg    *g      // 当前运行的 goroutine
    p       *p      // 关联的 P(Processor)
    nextp   *p      // 下一个 P
    id      int64   // M 的 ID
    spinning bool   // 是否在自旋寻找工作
    blocked bool   // 是否阻塞
}

3. P(Processor)结构:

P 代表处理器,维护一个本地运行队列:

// P 的内部结构(简化版)
type p struct {
    id          int32
    status      uint32
    link        *p
    schedtick   uint32
    syscalltick uint32
    m           *m      // 关联的 M
    mcache      *mcache // 内存分配缓存
    runqhead    uint32
    runqtail    uint32
    runq        [256]guintptr // 本地运行队列
    runnext     guintptr // 下一个运行的 goroutine
}

4. GMP 调度模型:

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d 开始工作\n", id)
    time.Sleep(100 * time.Millisecond)
    fmt.Printf("Worker %d 完成工作\n", id)
}

func main() {
    // 设置使用的 P 数量
    runtime.GOMAXPROCS(4)
    
    fmt.Printf("CPU 核心数: %d\n", runtime.NumCPU())
    fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
    
    var wg sync.WaitGroup
    
    // 启动多个 goroutine
    for i := 1; i <= 10; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }
    
    wg.Wait()
    fmt.Println("所有 worker 完成")
}

5. Goroutine 调度过程:

package main

import (
    "fmt"
    "runtime"
)

func task(id int) {
    fmt.Printf("Task %d 在 P%d 上运行\n", id, runtime.GOMAXPROCS(0))
}

func main() {
    // Goroutine 调度过程:
    // 1. 创建 goroutine:go func() {}
    // 2.  goroutine 放入本地运行队列(P 的 runq)
    // 3. M 从 P 的 runq 中获取 goroutine 执行
    // 4. 如果 P 的 runq 为空,从全局队列或其他 P 窃取 goroutine
    // 5. Goroutine 执行完成或阻塞,M 继续执行下一个 goroutine
    
    for i := 1; i <= 5; i++ {
        go task(i)
    }
    
    // 等待 goroutine 完成
    select {}
}

6. 工作窃取(Work Stealing):

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func heavyTask(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    // 模拟不同负载的任务
    duration := time.Duration(50+id*10) * time.Millisecond
    time.Sleep(duration)
    
    fmt.Printf("Task %d 完成,耗时 %v\n", id, duration)
}

func main() {
    runtime.GOMAXPROCS(2) // 使用 2 个 P
    
    var wg sync.WaitGroup
    
    // 创建不同负载的任务
    for i := 1; i <= 10; i++ {
        wg.Add(1)
        go heavyTask(i, &wg)
    }
    
    wg.Wait()
    fmt.Println("所有任务完成")
    
    // 工作窃取机制:
    // 1. P1 的任务较多,P2 的任务较少
    // 2. P2 完成自己的任务后,会从 P1 窃取任务执行
    // 3. 这实现了负载均衡,充分利用 CPU 资源
}

7. Goroutine 栈管理:

package main

import (
    "fmt"
    "runtime"
)

func recursive(depth int) {
    var buf [1024]byte // 分配 1KB 栈空间
    
    if depth < 100 {
        fmt.Printf("递归深度: %d, 栈地址: %p\n", depth, &buf)
        recursive(depth + 1)
    }
}

func main() {
    fmt.Printf("初始栈大小: %d KB\n", runtime.GoroutineStack()/1024)
    
    // Goroutine 栈管理特点:
    // 1. 初始栈大小:2KB(Go 1.4+)
    // 2. 动态增长:栈空间不足时自动扩容
    // 3. 最大栈大小:1GB(可配置)
    // 4. 栈拷贝:扩容时旧栈内容拷贝到新栈
    
    go recursive(0)
    
    select {}
}

8. Goroutine 状态转换:

package main

import (
    "fmt"
    "runtime"
    "time"
)

func demonstrateStates() {
    fmt.Println("Goroutine 状态转换:")
    fmt.Println("_Gidle: 刚创建,未初始化")
    fmt.Println("_Grunnable: 在运行队列中,等待执行")
    fmt.Println("_Grunning: 正在执行")
    fmt.Println("_Gsyscall: 正在执行系统调用")
    fmt.Println("_Gwaiting: 等待(channel、sleep 等)")
    fmt.Println("_Gdead: 已退出")
}

func main() {
    go demonstrateStates()
    
    // Goroutine 状态转换示例:
    // _Gidle -> _Grunnable: 创建 goroutine
    // _Grunnable -> _Grunning: 被调度执行
    // _Grunning -> _Gwaiting: 等待 channel 或 sleep
    // _Gwaiting -> _Grunnable: 唤醒后重新进入队列
    // _Grunning -> _Gdead: 执行完成
    
    time.Sleep(100 * time.Millisecond)
}

Goroutine 最佳实践

1. 合理控制 Goroutine 数量:

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

// 使用 worker pool 模式限制 goroutine 数量
type WorkerPool struct {
    tasks   chan int
    workers int
    wg      sync.WaitGroup
}

func NewWorkerPool(workers int) *WorkerPool {
    return &WorkerPool{
        tasks:   make(chan int, 100),
        workers: workers,
    }
}

func (wp *WorkerPool) Start() {
    for i := 0; i < wp.workers; i++ {
        wp.wg.Add(1)
        go wp.worker(i)
    }
}

func (wp *WorkerPool) worker(id int) {
    defer wp.wg.Done()
    for task := range wp.tasks {
        fmt.Printf("Worker %d 处理任务 %d\n", id, task)
        time.Sleep(100 * time.Millisecond)
    }
}

func (wp *WorkerPool) Submit(task int) {
    wp.tasks <- task
}

func (wp *WorkerPool) Stop() {
    close(wp.tasks)
    wp.wg.Wait()
}

func main() {
    pool := NewWorkerPool(5) // 限制为 5 个 worker
    pool.Start()
    
    // 提交 20 个任务
    for i := 1; i <= 20; i++ {
        pool.Submit(i)
    }
    
    pool.Stop()
    fmt.Println("所有任务完成")
}

2. 避免 Goroutine 泄露:

package main

import (
    "fmt"
    "time"
)

// 错误示例:goroutine 泄露
func leak() {
    ch := make(chan int)
    go func() {
        val := <-ch // 永远阻塞,goroutine 泄露
        fmt.Println(val)
    }()
}

// 正确示例:使用 context 控制
import "context"

func noLeak(ctx context.Context) {
    ch := make(chan int)
    go func() {
        select {
        case val := <-ch:
            fmt.Println(val)
        case <-ctx.Done():
            fmt.Println("goroutine 被取消")
            return
        }
    }()
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    
    noLeak(ctx)
    time.Sleep(3 * time.Second)
}

3. 使用 WaitGroup 等待 Goroutine:

package main

import (
    "fmt"
    "sync"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d 完成\n", id)
}

func main() {
    var wg sync.WaitGroup
    
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }
    
    wg.Wait()
    fmt.Println("所有 worker 完成")
}

4. 使用 ErrGroup 处理错误:

package main

import (
    "errors"
    "fmt"
    "golang.org/x/sync/errgroup"
)

func task(name string, shouldFail bool) error {
    fmt.Printf("Task %s 开始\n", name)
    if shouldFail {
        return errors.New("task failed")
    }
    fmt.Printf("Task %s 完成\n", name)
    return nil
}

func main() {
    var g errgroup.Group
    
    g.Go(func() error {
        return task("A", false)
    })
    
    g.Go(func() error {
        return task("B", true) // 这个会失败
    })
    
    g.Go(func() error {
        return task("C", false)
    })
    
    if err := g.Wait(); err != nil {
        fmt.Println("错误:", err)
    }
}

5. 使用 Channel 进行通信:

package main

import (
    "fmt"
    "time"
)

func producer(ch chan<- int) {
    for i := 1; i <= 5; i++ {
        ch <- i
        fmt.Printf("生产: %d\n", i)
        time.Sleep(100 * time.Millisecond)
    }
    close(ch)
}

func consumer(ch <-chan int) {
    for val := range ch {
        fmt.Printf("消费: %d\n", val)
        time.Sleep(150 * time.Millisecond)
    }
}

func main() {
    ch := make(chan int, 3)
    
    go producer(ch)
    go consumer(ch)
    
    time.Sleep(2 * time.Second)
}

6. 使用 Context 取消 Goroutine:

package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context, id int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d 被取消\n", id)
            return
        default:
            fmt.Printf("Worker %d 工作中...\n", id)
            time.Sleep(500 * time.Millisecond)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    
    for i := 1; i <= 3; i++ {
        go worker(ctx, i)
    }
    
    time.Sleep(2 * time.Second)
    cancel() // 取消所有 goroutine
    
    time.Sleep(1 * time.Second)
}

7. 使用 Mutex 保护共享资源:

package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func main() {
    var counter Counter
    var wg sync.WaitGroup
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }
    
    wg.Wait()
    fmt.Printf("最终值: %d\n", counter.Value())
}

8. 性能优化技巧:

  • 避免过度创建 goroutine:使用 worker pool 模式
  • 合理设置 GOMAXPROCS:根据 CPU 核心数调整
  • 使用缓冲 channel:减少阻塞
  • 避免锁竞争:尽量使用 channel 通信
  • 使用 sync.Pool:重用对象,减少 GC 压力
  • 监控 goroutine 数量:使用 runtime.NumGoroutine()
package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func monitorGoroutines() {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()
    
    for range ticker.C {
        fmt.Printf("当前 goroutine 数量: %d\n", runtime.NumGoroutine())
    }
}

func main() {
    go monitorGoroutines()
    
    var wg sync.WaitGroup
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            time.Sleep(2 * time.Second)
        }()
    }
    
    wg.Wait()
    fmt.Println("所有 goroutine 完成")
}

Channel

Channel 是 goroutine 之间通信的管道。

package main

import "fmt"

func sum(s []int, c chan int) {
    sum := 0
    for _, v := range s {
        sum += v
    }
    c <- sum // 和发送到 channel
}

func main() {
    s := []int{7, 2, 8, -9, 4, 0}

    c := make(chan int)
    go sum(s[:len(s)/2], c)
    go sum(s[len(s)/2:], c)
    x, y := <-c, <-c // 从 channel 接收

    fmt.Println(x, y, x+y)
}

Channel 内部实现原理

1. Channel 的核心数据结构:

Channel 在 Go 运行时中的核心结构是 hchan

// runtime/chan.go - Channel 的核心结构(简化版)
type hchan struct {
    qcount   uint           // 当前队列中元素个数
    dataqsiz uint           // 环形队列大小(容量)
    buf      unsafe.Pointer // 环形队列指针
    elemsize uint16         // 元素大小
    closed   uint32         // channel 是否关闭
    elemtype *_type         // 元素类型
    sendx    uint           // 发送索引
    recvx    uint           // 接收索引
    recvq    waitq          // 接收等待队列
    sendq    waitq          // 发送等待队列
    lock     mutex          // 互斥锁
}

2. 环形队列实现:

Channel 使用环形队列来存储数据,这是一个高效的循环缓冲区:

// 环形队列的工作原理示例
// 假设容量为 5 的 channel

// 初始状态: []
// sendx = 0, recvx = 0

// 发送 1, 2, 3: [1, 2, 3, _, _]
// sendx = 3, recvx = 0

// 接收 2 个值: [_, _, 3, _, _]
// sendx = 3, recvx = 2

// 继续发送 4, 5: [_, _, 3, 4, 5]
// sendx = 0, recvx = 2 (循环回到开头)

// 接收 3 个值: [_, _, _, _, _]
// sendx = 0, recvx = 0 (队列为空)

3. 发送操作流程:

// 发送操作: ch <- value

// 步骤 1: 获取 channel 锁
// lock(&c.lock)

// 步骤 2: 检查是否有接收者在等待
if sg := c.recvq.dequeue(); sg != nil {
    // 直接传递数据给接收者,不经过缓冲区
    send(c, sg, ep)
    return
}

// 步骤 3: 如果缓冲区有空间,放入缓冲区
if c.qcount < c.dataqsiz {
    // 数据放入环形队列
    qp := chanbuf(c, c.sendx)
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
        c.sendx = 0
    }
    c.qcount++
    return
}

// 步骤 4: 缓冲区已满,阻塞当前 goroutine
// 当前 goroutine 加入发送等待队列
// gopark() - 让出 CPU,等待被唤醒

4. 接收操作流程:

// 接收操作: value := <-ch

// 步骤 1: 获取 channel 锁
// lock(&c.lock)

// 步骤 2: 检查是否有发送者在等待
if sg := c.sendq.dequeue(); sg != nil {
    // 直接从发送者获取数据
    recv(c, sg, ep)
    return
}

// 步骤 3: 如果缓冲区有数据,从缓冲区读取
if c.qcount > 0 {
    qp := chanbuf(c, c.recvx)
    if ep != nil {
        typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.qcount--
    return
}

// 步骤 4: 缓冲区为空且未关闭,阻塞当前 goroutine
// 当前 goroutine 加入接收等待队列
// gopark() - 让出 CPU,等待被唤醒

5. 关闭操作流程:

// 关闭操作: close(ch)

// 步骤 1: 获取 channel 锁
// lock(&c.lock)

// 步骤 2: 标记 channel 为已关闭
c.closed = 1

// 步骤 3: 唤醒所有等待的接收者
for {
    sg := c.recvq.dequeue()
    if sg == nil {
        break
    }
    sg.elem = nil  // 接收者获得零值
    goready(sg.g)  // 唤醒 goroutine
}

// 步骤 4: 唤醒所有等待的发送者(它们会 panic)
for {
    sg := c.sendq.dequeue()
    if sg == nil {
        break
    }
    sg.elem = nil
    goready(sg.g)  // 唤醒后发送者会 panic
}

Channel 最佳实践

1. 选择合适的 Channel 类型:

package main

import "fmt"

// 无缓冲 Channel(同步)- 适用于需要同步的场景
func synchronousExample() {
    ch := make(chan int)
    
    go func() {
        fmt.Println("发送者: 准备发送")
        ch <- 42  // 阻塞直到有消费者接收
        fmt.Println("发送者: 数据已发送")
    }()
    
    fmt.Println("接收者: 等待数据")
    val := <-ch
    fmt.Printf("接收者: 收到 %d\n", val)
}

// 缓冲 Channel(异步)- 适用于生产者和消费者速度不匹配
func bufferedExample() {
    ch := make(chan int, 3)  // 缓冲大小为 3
    
    // 可以发送 3 个数据而不会阻塞
    ch <- 1
    ch <- 2
    ch <- 3
    fmt.Println("已发送 3 个数据")
    
    fmt.Println(<-ch)
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}

2. 避免 Goroutine 泄露:

package main

import (
    "context"
    "fmt"
    "time"
)

// 错误示例:goroutine 泄露
func leakExample() {
    ch := make(chan int)
    go func() {
        val := <-ch  // 永远阻塞,goroutine 泄露
        fmt.Println(val)
    }()
    // 如果 ch 永远不发送数据,goroutine 永不退出
}

// 正确示例:使用 context 控制
func noLeakExample() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    
    ch := make(chan int)
    go func() {
        select {
        case val := <-ch:
            fmt.Println(val)
        case <-ctx.Done():
            fmt.Println("goroutine 被取消")
            return
        }
    }()
}

3. 正确关闭 Channel:

package main

import "fmt"

// 原则:只在发送方关闭 channel,不要在接收方关闭
func producer(ch chan<- int) {
    defer close(ch)  // 发送完成后关闭
    for i := 0; i < 5; i++ {
        ch <- i
    }
}

func consumer(ch <-chan int) {
    for v := range ch {  // 自动检测 channel 关闭
        fmt.Println(v)
    }
    fmt.Println("channel 已关闭")
}

// 使用方向性 channel
func directionalExample() {
    ch := make(chan int)
    go producer(ch)
    consumer(ch)
}

4. 处理 Channel 关闭后的零值:

package main

import "fmt"

func safeReceive(ch <-chan int) (int, bool) {
    v, ok := <-ch
    if !ok {
        // channel 已关闭,v 是零值
        fmt.Println("channel 已关闭")
        return 0, false
    }
    return v, true
}

5. 使用 Select 实现超时:

package main

import (
    "fmt"
    "time"
)

func workerWithTimeout(ch <-chan int, timeout time.Duration) {
    select {
    case v := <-ch:
        fmt.Printf("收到: %d\n", v)
    case <-time.After(timeout):
        fmt.Println("超时")
    }
}

6. 扇出模式(Fan-out):

package main

import "fmt"

func worker(id int, input <-chan int, output chan<- int) {
    defer close(output)
    for v := range input {
        result := v * 2  // 处理数据
        output <- result
        fmt.Printf("Worker %d 处理: %d -> %d\n", id, v, result)
    }
}

func fanOut(input <-chan int, numWorkers int) []<-chan int {
    outputs := make([]<-chan int, numWorkers)
    for i := 0; i < numWorkers; i++ {
        outputs[i] = make(chan int)
        go worker(i, input, outputs[i])
    }
    return outputs
}

7. 扇入模式(Fan-in):

package main

import (
    "sync"
)

func fanIn(inputs ...<-chan int) <-chan int {
    output := make(chan int)
    var wg sync.WaitGroup
    
    for _, input := range inputs {
        wg.Add(1)
        go func(ch <-chan int) {
            defer wg.Done()
            for v := range ch {
                output <- v
            }
        }(input)
    }
    
    go func() {
        wg.Wait()
        close(output)
    }()
    
    return output
}

8. 使用 Channel 实现信号量:

package main

import "fmt"

// 限制并发数
func semaphore(maxConcurrent int) {
    sem := make(chan struct{}, maxConcurrent)
    
    for i := 0; i < 100; i++ {
        sem <- struct{}{}  // 获取信号量
        go func(id int) {
            defer func() { <-sem }()  // 释放信号量
            doWork(id)
        }(i)
    }
}

func doWork(id int) {
    fmt.Printf("Task %d 开始\n", id)
}

9. Channel 性能优化技巧:

  • 预分配缓冲区:根据实际情况调整缓冲大小
  • 避免频繁创建和销毁 channel:重用 channel
  • 使用指针传递大对象:减少拷贝开销
  • 批量处理数据:减少 channel 操作次数
package main

type LargeData struct {
    // 大量字段
}

// 传递指针而不是值
func pointerChannel() {
    ch := make(chan *LargeData, 100)
    
    data := &LargeData{}
    ch <- data  // 只传递指针
}

// 批量处理
func batchProcess(ch <-chan int, batchSize int) {
    batch := make([]int, 0, batchSize)
    for v := range ch {
        batch = append(batch, v)
        if len(batch) >= batchSize {
            processBatch(batch)
            batch = batch[:0]
        }
    }
    if len(batch) > 0 {
        processBatch(batch)
    }
}

10. Channel 错误处理:

package main

import (
    "fmt"
    "time"
)

func safeChannelOperations() {
    ch := make(chan int)
    
    go func() {
        defer close(ch)
        for i := 0; i < 5; i++ {
            ch <- i
        }
    }()
    
    for {
        select {
        case v, ok := <-ch:
            if !ok {
                fmt.Println("Channel 已关闭")
                return
            }
            fmt.Printf("收到: %d\n", v)
        case <-time.After(1 * time.Second):
            fmt.Println("超时")
            return
        }
    }
}

总结:

  • 内部原理要点:环形队列、等待队列、直接传递、锁保护
  • 最佳实践要点:正确关闭、避免泄露、使用方向性、扇出扇入模式
  • 性能优化:合理缓冲、批量处理、指针传递、重用 channel

Buffered Channel

package main

import "fmt"

func main() {
    // 创建带缓冲的 channel
    ch := make(chan int, 2)

    ch <- 1
    ch <- 2

    fmt.Println(<-ch)
    fmt.Println(<-ch)
}

Range 和 Close

package main

import "fmt"

func fibonacci(n int, c chan int) {
    x, y := 0, 1
    for i := 0; i < n; i++ {
        c <- x
        x, y = y, x+y
    }
    close(c) // 关闭 channel
}

func main() {
    c := make(chan int, 10)
    go fibonacci(cap(c), c)

    // range 会持续从 channel 接收,直到 channel 被关闭
    for i := range c {
        fmt.Println(i)
    }
}

Select

Select 语句让 goroutine 可以等待多个通信操作。

package main

import "fmt"

func fibonacci(c, quit chan int) {
    x, y := 0, 1
    for {
        select {
        case c <- x:
            x, y = y, x+y
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

func main() {
    c := make(chan int)
    quit := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        }
        quit <- 0
    }()

    fibonacci(c, quit)
}

默认选择

package main

import (
    "fmt"
    "time"
)

func main() {
    tick := time.Tick(100 * time.Millisecond)
    boom := time.After(500 * time.Millisecond)

    for {
        select {
        case <-tick:
            fmt.Println("tick.")
        case <-boom:
            fmt.Println("BOOM!")
            return
        default:
            fmt.Println("    .")
            time.Sleep(50 * time.Millisecond)
        }
    }
}

互斥锁(Mutex)

package main

import (
    "fmt"
    "sync"
    "time"
)

// SafeCounter 是并发安全的计数器
type SafeCounter struct {
    mu sync.Mutex
    v  map[string]int
}

func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    // Lock 之后同一时刻只有一个 goroutine 能访问 c.v
    c.v[key]++
    c.mu.Unlock()
}

func (c *SafeCounter) Value(key string) int {
    c.mu.Lock()
    // Lock 之后同一时刻只有一个 goroutine 能访问 c.v
    defer c.mu.Unlock()
    return c.v[key]
}

func main() {
    c := SafeCounter{v: make(map[string]int)}

    for i := 0; i < 1000; i++ {
        go c.Inc("somekey")
    }

    time.Sleep(time.Second)
    fmt.Println(c.Value("somekey"))
}

WaitGroup

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Printf("Worker %d 开始工作\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d 完成工作\n", id)
}

func main() {
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }

    wg.Wait()
    fmt.Println("所有 worker 完成")
}

Context

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    // 创建一个可取消的 context
    ctx, cancel := context.WithCancel(context.Background())

    go func() {
        for {
            select {
            case <-ctx.Done():
                fmt.Println("goroutine 被取消")
                return
            default:
                fmt.Println("goroutine 运行中")
                time.Sleep(500 * time.Millisecond)
            }
        }
    }()

    time.Sleep(2 * time.Second)
    fmt.Println("取消 goroutine")
    cancel()
    time.Sleep(time.Second)
}

超时控制

package main

import (
    "context"
    "fmt"
    "time"
)

func longRunningTask(ctx context.Context) {
    select {
    case <-time.After(5 * time.Second):
        fmt.Println("任务完成")
    case <-ctx.Done():
        fmt.Println("任务超时:", ctx.Err())
    }
}

func main() {
    // 设置 2 秒超时
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    longRunningTask(ctx)
}