← Goroutine | Select →

Channel - Go 并发通信核心

Channel 是 Go 实现 CSP (Communicating Sequential Processes) 模型的核心机制,体现了 Go 的设计哲学:"不要通过共享内存来通信,而要通过通信来共享内存"

📌 核心概念

📬

创建

ch := make(chan int)
ch := make(chan int, 10)
📤

发送

ch <- value
chan<- int // 只发送
📥

接收

value := <-ch
<-chan int // 只接收
🔒

关闭

close(ch)
v, ok := <-ch

基础示例

📝 使用 Channel 进行 Goroutine 通信

package main

import "fmt"

// worker 处理数据并通过 channel 返回结果
func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Printf("Worker %d started job %d\n", id, j)
        results <- j * 2 // 返回处理结果
    }
}

func main() {
    // 创建带缓冲的 channel
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    // 启动 3 个 worker
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    // 发送 5 个任务
    for j := 1; j <= 5; j++ {
        jobs <- j
    }
    close(jobs) // 关闭 jobs channel,通知 worker 没有更多任务

    // 收集结果
    for r := 1; r <= 5; r++ {
        result := <-results
        fmt.Printf("Result: %d\n", result)
    }
}

💡 代码要点

  • 单向 Channel: <-chan int 只接收,chan<- int 只发送,编译期类型安全
  • range 接收: for j := range jobs 自动检测 channel 关闭
  • 缓冲 Channel: 带缓冲的 channel 允许异步通信,提高吞吐量
  • 关闭 Channel: 只有发送者应该关闭 channel,接收者关闭会导致 panic

Channel 类型详解

Channel 类型对比
类型 声明 特性 使用场景 注意事项
无缓冲 Channel make(chan T) 同步阻塞 需要严格同步的场景 发送/接收都会阻塞,直到对方准备好
有缓冲 Channel make(chan T, N) 异步非阻塞 提高吞吐量,削峰填谷 缓冲区满时发送阻塞,空时接收阻塞
单向 Channel chan<- T
<-chan T
类型安全 限制发送/接收权限 通常作为函数参数,内部可转换
nil Channel var ch chan T 永久阻塞 控制 select 分支禁用 向 nil channel 发送/接收会永久阻塞
关闭 Channel close(ch) 只可关闭一次 通知接收者数据结束 重复关闭 panic,接收者可检测关闭状态

底层实现原理

hchan 数据结构

📖 Go 运行时中的 Channel 实现

// runtime/chan.go - Channel 核心结构 (简化版)
type hchan struct {
    qcount   uint           // 当前队列中元素个数
    dataqsiz uint           // 环形队列大小 (容量)
    buf      unsafe.Pointer // 环形队列指针
    elemsize uint16         // 元素大小 (字节)
    closed   uint32         // channel 是否关闭
    elemtype *_type         // 元素类型信息
    sendx    uint           // 发送索引 (环形队列)
    recvx    uint           // 接收索引 (环形队列)
    recvq    waitq          // 接收等待队列 (goroutine)
    sendq    waitq          // 发送等待队列 (goroutine)
    lock     mutex          // 互斥锁,保护整个结构
}

// 等待队列
type waitq struct {
    first *sudog
    last  *sudog
}

// 等待中的 goroutine
type sudog struct {
    g     *g     // 等待的 goroutine
    elem  unsafe.Pointer // 数据指针
    next  *sudog
    prev  *sudog
}

💡 Channel 核心原理

  • 环形队列: 缓冲 channel 使用环形队列 (ring buffer) 存储数据,高效循环利用内存
  • 等待队列: sendq 和 recvq 分别存储等待发送和接收的 goroutine
  • 互斥锁: 所有操作都通过 lock 保护,保证并发安全
  • 零拷贝: 直接传递数据指针,避免内存拷贝
  • Goroutine 调度: 阻塞时 goroutine 进入等待状态,不占用 CPU

发送操作流程

ch <- value
获取锁 lock(c)
检查 recvq
有接收者等待?
直接传递数据
唤醒接收者
缓冲区满?
加入 sendq
gopark 阻塞

接收操作流程

<-ch
获取锁 lock(c)
检查 sendq
有发送者等待?
直接接收数据
唤醒发送者
缓冲区空?
加入 recvq
gopark 阻塞

高级模式与最佳实践

1. 优雅关闭 Channel

📝 安全关闭 Channel 的模式

package main

import (
    "fmt"
    "sync"
)

// 模式 1: 由发送者关闭 (推荐)
func producerWorker(ch chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    defer close(ch) // 发送完成后关闭
    
    for i := 0; i < 5; i++ {
        ch <- i
    }
}

// 模式 2: 使用 done channel 信号
func gracefulShutdown() {
    done := make(chan struct{})
    dataCh := make(chan int, 10)
    
    // 启动多个发送者
    for i := 0; i < 3; i++ {
        go func(id int) {
            for {
                select {
                case dataCh <- id:
                    // 发送数据
                case <-done:
                    return // 收到关闭信号
                }
            }
        }(i)
    }
    
    // 主 goroutine 决定何时关闭
    close(done) // 广播关闭信号
}

// 模式 3: 使用 context 控制
func contextControlled(ctx context.Context) {
    ch := make(chan int)
    go func() {
        defer close(ch)
        for {
            select {
            case <-ctx.Done():
                return
            case ch <- getData():
            }
        }
    }()
}

func main() {
    var wg sync.WaitGroup
    ch := make(chan int)
    
    wg.Add(1)
    go producerWorker(ch, &wg)
    
    for v := range ch {
        fmt.Println(v)
    }
    wg.Wait()
}

⚠️ Channel 关闭规则

  • 只关闭一次: 重复关闭会 panic
  • 发送者关闭: 只有发送者应该关闭 channel
  • 接收者不关闭: 接收者关闭会导致其他接收者 panic
  • 多发送者场景: 使用 done channel 或 context,不要直接关闭数据 channel
  • 不关闭也可以: 如果 goroutine 会自然退出,channel 可以不关闭,由 GC 回收

2. Channel 泄漏检测与预防

📝 常见 Channel 泄漏场景

// 泄漏场景 1: 接收者提前退出,发送者阻塞
func leak1() {
    ch := make(chan int)
    go func() {
        // 如果接收者退出,这里永久阻塞
        ch <- 1
    }()
    // 忘记接收...
}

// 修复:使用缓冲或 select
func fix1() {
    ch := make(chan int, 1) // 缓冲
    go func() {
        select {
        case ch <- 1:
        default:
            // 非阻塞,丢弃数据
        }
    }()
}

// 泄漏场景 2: range 等待关闭,但发送者永不关闭
func leak2() {
    ch := make(chan int)
    go func() {
        for v := range ch { // 等待 channel 关闭
            fmt.Println(v)
        }
    }()
    ch <- 1
    // 忘记 close(ch)...
}

// 修复:确保关闭
func fix2() {
    ch := make(chan int)
    go func() {
        defer close(ch) // 确保关闭
        ch <- 1
    }()
    for v := range ch {
        fmt.Println(v)
    }
}

// 泄漏场景 3: 未消费完数据,goroutine 退出
func leak3() {
    ch := make(chan int, 10)
    go func() {
        for i := 0; i < 10; i++ {
            ch <- i
        }
    }()
    // 只接收部分数据
    fmt.Println(<-ch)
    // 发送者阻塞在 ch <- 9,goroutine 泄漏
}

3. 性能优化技巧

📊 Channel 性能对比与优化

// 场景 1: 无缓冲 vs 有缓冲 (吞吐量对比)
func benchmarkUnbuffered() {
    ch := make(chan int)
    go func() {
        for i := 0; i < 1000000; i++ {
            ch <- i // 每次发送都阻塞
        }
        close(ch)
    }()
    for range ch {}
}

func benchmarkBuffered() {
    ch := make(chan int, 1000) // 缓冲 1000
    go func() {
        for i := 0; i < 1000000; i++ {
            ch <- i // 缓冲区未满时不阻塞
        }
        close(ch)
    }()
    for range ch {}
}
// 结果:缓冲 channel 吞吐量提升 5-10 倍

// 场景 2: 批量处理减少 channel 通信次数
func batchProcessing() {
    ch := make(chan []int, 100) // 传递切片而非单个值
    
    go func() {
        batch := make([]int, 0, 100)
        for i := 0; i < 1000000; i++ {
            batch = append(batch, i)
            if len(batch) >= 100 {
                ch <- batch
                batch = make([]int, 0, 100)
            }
        }
        if len(batch) > 0 {
            ch <- batch
        }
        close(ch)
    }()
}
// 结果:减少 channel 通信次数,降低锁竞争

// 场景 3: 使用 sync.Pool 减少内存分配
var bufferPool = sync.Pool{
    New: func() interface{} {
        return make([]byte, 4096)
    },
}

func processWithPool(ch chan []byte) {
    buf := bufferPool.Get().([]byte)
    defer bufferPool.Put(buf)
    // 使用 buf 处理数据...
}

📊 Channel 性能建议

  • 选择合适缓冲: 根据生产者/消费者速度差设置缓冲大小
  • 批量处理: 传递批量数据而非单个值,减少通信次数
  • 避免过度设计: 简单场景用无缓冲 channel 更清晰
  • 注意内存分配: 高频场景使用 sync.Pool 复用缓冲区
  • 监控等待队列: 使用 runtime 包监控 sendq/recvq 长度

常见陷阱与解决方案

Channel 常见陷阱
陷阱 错误示例 正确做法
重复关闭 close(ch) 多次调用 使用 sync.Once 或确保只关闭一次
向关闭的 channel 发送 close 后继续 ch <- v 使用 recover 或确保逻辑正确
Goroutine 泄漏 发送者阻塞无人接收 使用 select + default 或缓冲
死锁 循环依赖等待 使用 select 设置超时
nil channel 阻塞 var ch chan T; <-ch 确保 channel 已初始化

🚨 死锁示例与预防

// 死锁示例:goroutine 互相等待
func deadlock() {
    a := make(chan int)
    b := make(chan int)
    
    go func() {
        <-a // 等待 b 发送
        b <- 1
    }()
    
    go func() {
        <-b // 等待 a 发送
        a <- 1
    }()
    
    // 两个 goroutine 都阻塞,死锁!
}

// 预防:使用 select 设置超时
func withTimeout() {
    ch := make(chan int)
    select {
    case <-ch:
        // 成功接收
    case <-time.After(time.Second):
        fmt.Println("Timeout!")
    }
}

总结

✅ Channel 核心要点

  • CSP 模型: Go 通过 channel 实现"通过通信共享内存"
  • 类型安全: 单向 channel 在编译期保证发送/接收权限
  • 同步原语: 无缓冲 channel 是同步原语,有缓冲 channel 是队列
  • 关闭规则: 只有发送者关闭,且只关闭一次
  • 泄漏预防: 确保发送/接收配对,使用 select 设置超时
  • 性能优化: 根据场景选择缓冲大小,批量处理减少通信次数