← Channel | Mutex →

Select - Go 多路复用机制

select 是 Go 提供的多路复用机制,允许 Goroutine 同时监听多个 channel 操作。它是构建复杂并发模式的核心工具,类似于网络编程中的 select/poll/epoll。

📌 核心概念

🔀

多路监听

同时监听多个 channel

多个 case
🎲

随机选择

多个就绪时随机选一个

公平调度
⏱️

超时控制

time.After 实现超时

防止阻塞
🚫

非阻塞 IO

default 实现非阻塞

立即返回

基础语法

📝 select 基本结构

package main

import "fmt"

func main() {
    ch1 := make(chan int)
    ch2 := chan<- int(make(chan int))

    go func() { ch1 <- 1 }()
    go func() { ch2 <- 2 }()

    // select 基本结构
    select {
    case v1 := <-ch1:
        fmt.Printf("Received from ch1: %d\n", v1)
    case v2 := <-ch2:
        fmt.Printf("Received from ch2: %d\n", v2)
    default:
        fmt.Println("No channel ready (non-blocking)")
    }
}

💡 select 执行规则

  • 按顺序评估: 从上到下评估每个 case
  • 随机选择: 多个 case 就绪时,随机选择一个执行
  • 阻塞等待: 没有 case 就绪时,阻塞直到有 case 就绪
  • default 非阻塞: 有 default 时,没有 case 就绪立即执行 default
  • nil channel: nil channel 的 case 永远不会就绪

经典并发模式

1. 超时控制

📝 使用 time.After 实现超时

package main

import (
    "fmt"
    "time"
)

// 模拟慢速操作
func slowOperation(ch chan<- string) {
    time.Sleep(time.Second * 2)
    ch <- "Operation completed"
}

func main() {
    ch := make(chan string)

    go slowOperation(ch)

    // 超时控制模式
    select {
    case result := <-ch:
        fmt.Println(result)
    case <-time.After(time.Millisecond * 500):
        fmt.Println("Timeout! Operation took too long")
    }

    // 带 Context 的超时
    select {
    case result := <-ch:
        fmt.Println(result)
    case <-time.AfterFunc(time.Second, func() {
        fmt.Println("Timer fired")
    }).C:
        // 可取消的定时器
    }
}

2. 非阻塞 channel 操作

📝 使用 default 实现非阻塞

package main

import (
    "fmt"
    "time"
)

// 非阻塞发送
func nonBlockingSend(ch chan<- int, value int) bool {
    select {
    case ch <- value:
        fmt.Printf("Sent: %d\n", value)
        return true
    default:
        fmt.Printf("Buffer full, dropped: %d\n", value)
        return false
    }
}

// 非阻塞接收
func nonBlockingRecv(ch <-chan int) (int, bool) {
    select {
    case v := <-ch:
        return v, true
    default:
        return 0, false
    }
}

func main() {
    ch := make(chan int, 2)

    // 发送测试
    nonBlockingSend(ch, 1) // 成功
    nonBlockingSend(ch, 2) // 成功
    nonBlockingSend(ch, 3) // 缓冲区满,丢弃

    // 接收测试
    if v, ok := nonBlockingRecv(ch); ok {
        fmt.Printf("Received: %d\n", v)
    }

    time.Sleep(time.Millisecond * 100)
}

3. 优雅关闭

📝 使用 select 实现优雅关闭

package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func worker(id int, jobs <-chan int, done chan<- bool) {
    for {
        select {
        case job, ok := <-jobs:
            if !ok {
                fmt.Printf("Worker %d: channel closed\n", id)
                done <- true
                return
            }
            fmt.Printf("Worker %d processing job %d\n", id, job)
        case <-time.After(time.Millisecond * 100):
            // 心跳检测,防止长时间阻塞
            fmt.Printf("Worker %d: still alive\n", id)
        }
    }
}

func main() {
    jobs := make(chan int, 10)
    done := make(chan bool, 3)

    // 启动 Worker
    for i := 1; i <= 3; i++ {
        go worker(i, jobs, done)
    }

    // 发送任务
    for i := 1; i <= 5; i++ {
        jobs <- i
    }

    // 监听退出信号
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    select {
    case <-sigChan:
        fmt.Println("Received shutdown signal")
    case <-time.After(time.Second * 5):
        fmt.Println("Timeout, shutting down")
    }

    // 关闭 jobs channel,通知 Worker 退出
    close(jobs)

    // 等待所有 Worker 完成
    for i := 0; i < 3; i++ {
        <-done
    }

    fmt.Println("All workers stopped")
}

4. 多路复用器

📝 合并多个 channel

package main

import (
    "fmt"
    "math/rand"
    "time"
)

// 合并多个 channel 到一个
func merge(channels ...<-chan int) <-chan int {
    out := make(chan int)

    go func() {
        var active []<-chan int
        for _, ch := range channels {
            if ch != nil {
                active = append(active, ch)
            }
        }

        for len(active) > 0 {
            select {
            case v := <-active[0]:
                out <- v
                // 移除已关闭的 channel
                active = active[1:]
            case <-time.After(time.Millisecond):
                // 轮询所有 channel
                select {
                case v := <-active[0]:
                    out <- v
                case v := <-active[1]:
                    out <- v
                case v := <-active[2]:
                    out <- v
                }
                // 重新排列
                for i, ch := range active {
                    if ch == nil {
                        active = append(active[:i], active[i+1:]...)
                        break
                    }
                }
            }
        }
        close(out)
    }()

    return out
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    ch3 := make(chan int)

    go func() {
        for i := 0; i < 5; i++ {
            ch1 <- i
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
        }
        close(ch1)
    }()

    go func() {
        for i := 10; i < 15; i++ {
            ch2 <- i
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
        }
        close(ch2)
    }()

    go func() {
        for i := 20; i < 25; i++ {
            ch3 <- i
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
        }
        close(ch3)
    }()

    merged := merge(ch1, ch2, ch3)
    for v := range merged {
        fmt.Println(v)
    }
}

底层实现原理

select 数据结构

📖 runtime 中的 select 实现

// runtime/select.go - select 实现 (简化)

// hselect - select 状态结构
type hselect struct {
    tcase       uint16   // case 数量
    ncases      uint16   // 已评估的 case 数量
    cases       []scase  // case 数组
    pollindex   int32    // poll 索引
    lock        mutex    // 锁
}

// scase - 单个 case 的信息
type scase struct {
    elem        unsafe.Pointer  // 数据元素指针
    pc          uintptr         // 程序计数器
    releasetime int64           // 释放时间
    kind        uint16          // case 类型
    chanv       *hchan          // channel 指针
}

// select 执行流程 (伪代码)
func selectgo(sel *hselect) {
    // 1. 获取锁
    lock(&sel.lock)

    // 2. 评估所有 case
    for i := 0; i < sel.tcase; i++ {
        cas := &sel.cases[i]
        
        switch cas.kind {
        case 0: // recv
            if sg := cas.chanv.recvq.dequeue(); sg != nil {
                // 有发送者等待,直接接收
                recv(cas.chanv, sg, cas.elem)
                unlock(&sel.lock)
                return i
            }
            if cas.chanv.qcount > 0 {
                // 缓冲区有数据
                recv(cas.chanv, nil, cas.elem)
                unlock(&sel.lock)
                return i
            }
        
        case 1: // send
            if sg := cas.chanv.recvq.dequeue(); sg != nil {
                // 有接收者等待,直接发送
                send(cas.chanv, sg, cas.elem)
                unlock(&sel.lock)
                return i
            }
            if cas.chanv.qcount < cas.chanv.dataqsiz {
                // 缓冲区有空间
                send(cas.chanv, nil, cas.elem)
                unlock(&sel.lock)
                return i
            }
        }
    }

    // 3. 没有 case 就绪,阻塞等待
    gopark(...)
}

💡 select 核心原理

  • 轮询评估: 按顺序评估每个 case 的 channel 状态
  • 随机选择: 多个就绪时使用 fastrand 随机选择
  • 阻塞机制: 没有就绪时将 Goroutine 加入所有 channel 的等待队列
  • 唤醒机制: 任一 channel 就绪时唤醒 Goroutine
  • default 处理: 有 default 时不阻塞,立即返回

性能优化与最佳实践

性能考虑

📊 select 性能分析

// 1. case 数量影响性能
// select 会按顺序评估所有 case
// case 越多,评估开销越大

// ❌ 不推荐:case 过多
select {
case <-ch1:
case <-ch2:
case <-ch3:
case <-ch4:
case <-ch5:
case <-ch6:
case <-ch7:
case <-ch8:
case <-ch9:
case <-ch10:
}

// ✅ 推荐:使用循环 + map
channels := map[int]<-chan int{
    1: ch1, 2: ch2, 3: ch3,
}

for len(channels) > 0 {
    select {
    case <-channels[1]:
        delete(channels, 1)
    case <-channels[2]:
        delete(channels, 2)
    case <-channels[3]:
        delete(channels, 3)
    }
}

// 2. 避免在 select 中进行复杂计算
// select 应该只用于 channel 操作

// ❌ 不推荐
select {
case v := <-ch:
    result := complexCalculation(v) // 阻塞其他 case
    process(result)
}

// ✅ 推荐
select {
case v := <-ch:
    go func() {
        result := complexCalculation(v)
        process(result)
    }()
}

⚠️ 常见陷阱

  • 忘记 default: 可能导致永久阻塞
  • nil channel 陷阱: nil channel 的 case 永不就绪
  • 重复读取: 同一个 channel 在多个 case 中
  • 资源泄漏: select 阻塞时未清理资源
  • 顺序依赖: 依赖 case 的评估顺序

实用模式

📝 select 实用技巧

// 1. 禁用 case (使用 nil channel)
func processWithDisable(ch1, ch2 <-chan int) {
    var ch1Copy <-chan int = ch1
    
    for {
        select {
        case <-ch1Copy:
            // 处理 ch1
            ch1Copy = nil // 禁用 ch1 case
        case <-ch2:
            // ch2 始终可用
        }
        
        if ch1Copy == nil && ch2 == nil {
            break
        }
    }
}

// 2. 优先级控制
func prioritySelect(high, low <-chan int) {
    select {
    case v := <-high:
        fmt.Println("High priority:", v)
    default:
        select {
        case v := <-low:
            fmt.Println("Low priority:", v)
        default:
            fmt.Println("No data")
        }
    }
}

// 3. 批量接收
func batchRecv(ch <-chan int, batchSize int) []int {
    batch := make([]int, 0, batchSize)
    
    for len(batch) < batchSize {
        select {
        case v, ok := <-ch:
            if !ok {
                return batch // channel 关闭
            }
            batch = append(batch, v)
        case <-time.After(time.Millisecond * 100):
            return batch // 超时返回
        }
    }
    
    return batch
}

总结

✅ select 核心要点

  • 多路复用: 同时监听多个 channel 操作
  • 随机选择: 多个就绪时公平随机选择
  • 超时控制: 使用 time.After 实现超时
  • 非阻塞 IO: default 实现非阻塞操作
  • 优雅关闭: 结合 done channel 实现
  • nil channel: 可用于动态禁用 case