并发编程
Go 语言的并发模型是其最强大的特性之一,通过 Goroutine 和 Channel 实现高效的并发编程。
Goroutine
轻量级线程,由 Go 运行时管理
Channel
Goroutine 之间的通信机制
Select
多通道多路复用
Mutex
同步原语保证数据安全
Goroutine
Goroutine 是 Go 语言轻量级的线程实现,由 Go 运行时管理。
package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func main() {
// 启动 goroutine
go say("world")
say("hello")
}
Goroutine 内部实现原理
1. Goroutine 的内部结构:
Goroutine 是 Go 运行时(runtime)管理的轻量级线程,其内部结构包含:
// Goroutine 的内部结构(简化版)
type g struct {
stack stack // 栈内存
stackguard0 uintptr // 栈溢出检查
m *m // 关联的 M(Machine)
sched gobuf // 调度信息
goid int64 // Goroutine ID
gopc uintptr // 创建该 goroutine 的 PC
startpc uintptr // goroutine 函数的起始 PC
atomicstatus uint32 // 原子状态
goid int64 // Goroutine 唯一标识
}
2. M(Machine)结构:
M 代表操作系统线程,负责执行 goroutine:
// M 的内部结构(简化版)
type m struct {
g0 *g // 调度用的 goroutine
curg *g // 当前运行的 goroutine
p *p // 关联的 P(Processor)
nextp *p // 下一个 P
id int64 // M 的 ID
spinning bool // 是否在自旋寻找工作
blocked bool // 是否阻塞
}
3. P(Processor)结构:
P 代表处理器,维护一个本地运行队列:
// P 的内部结构(简化版)
type p struct {
id int32
status uint32
link *p
schedtick uint32
syscalltick uint32
m *m // 关联的 M
mcache *mcache // 内存分配缓存
runqhead uint32
runqtail uint32
runq [256]guintptr // 本地运行队列
runnext guintptr // 下一个运行的 goroutine
}
4. GMP 调度模型:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d 开始工作\n", id)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Worker %d 完成工作\n", id)
}
func main() {
// 设置使用的 P 数量
runtime.GOMAXPROCS(4)
fmt.Printf("CPU 核心数: %d\n", runtime.NumCPU())
fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
var wg sync.WaitGroup
// 启动多个 goroutine
for i := 1; i <= 10; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("所有 worker 完成")
}
5. Goroutine 调度过程:
package main
import (
"fmt"
"runtime"
)
func task(id int) {
fmt.Printf("Task %d 在 P%d 上运行\n", id, runtime.GOMAXPROCS(0))
}
func main() {
// Goroutine 调度过程:
// 1. 创建 goroutine:go func() {}
// 2. goroutine 放入本地运行队列(P 的 runq)
// 3. M 从 P 的 runq 中获取 goroutine 执行
// 4. 如果 P 的 runq 为空,从全局队列或其他 P 窃取 goroutine
// 5. Goroutine 执行完成或阻塞,M 继续执行下一个 goroutine
for i := 1; i <= 5; i++ {
go task(i)
}
// 等待 goroutine 完成
select {}
}
6. 工作窃取(Work Stealing):
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func heavyTask(id int, wg *sync.WaitGroup) {
defer wg.Done()
// 模拟不同负载的任务
duration := time.Duration(50+id*10) * time.Millisecond
time.Sleep(duration)
fmt.Printf("Task %d 完成,耗时 %v\n", id, duration)
}
func main() {
runtime.GOMAXPROCS(2) // 使用 2 个 P
var wg sync.WaitGroup
// 创建不同负载的任务
for i := 1; i <= 10; i++ {
wg.Add(1)
go heavyTask(i, &wg)
}
wg.Wait()
fmt.Println("所有任务完成")
// 工作窃取机制:
// 1. P1 的任务较多,P2 的任务较少
// 2. P2 完成自己的任务后,会从 P1 窃取任务执行
// 3. 这实现了负载均衡,充分利用 CPU 资源
}
7. Goroutine 栈管理:
package main
import (
"fmt"
"runtime"
)
func recursive(depth int) {
var buf [1024]byte // 分配 1KB 栈空间
if depth < 100 {
fmt.Printf("递归深度: %d, 栈地址: %p\n", depth, &buf)
recursive(depth + 1)
}
}
func main() {
fmt.Printf("初始栈大小: %d KB\n", runtime.GoroutineStack()/1024)
// Goroutine 栈管理特点:
// 1. 初始栈大小:2KB(Go 1.4+)
// 2. 动态增长:栈空间不足时自动扩容
// 3. 最大栈大小:1GB(可配置)
// 4. 栈拷贝:扩容时旧栈内容拷贝到新栈
go recursive(0)
select {}
}
8. Goroutine 状态转换:
package main
import (
"fmt"
"runtime"
"time"
)
func demonstrateStates() {
fmt.Println("Goroutine 状态转换:")
fmt.Println("_Gidle: 刚创建,未初始化")
fmt.Println("_Grunnable: 在运行队列中,等待执行")
fmt.Println("_Grunning: 正在执行")
fmt.Println("_Gsyscall: 正在执行系统调用")
fmt.Println("_Gwaiting: 等待(channel、sleep 等)")
fmt.Println("_Gdead: 已退出")
}
func main() {
go demonstrateStates()
// Goroutine 状态转换示例:
// _Gidle -> _Grunnable: 创建 goroutine
// _Grunnable -> _Grunning: 被调度执行
// _Grunning -> _Gwaiting: 等待 channel 或 sleep
// _Gwaiting -> _Grunnable: 唤醒后重新进入队列
// _Grunning -> _Gdead: 执行完成
time.Sleep(100 * time.Millisecond)
}
Goroutine 最佳实践
1. 合理控制 Goroutine 数量:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// 使用 worker pool 模式限制 goroutine 数量
type WorkerPool struct {
tasks chan int
workers int
wg sync.WaitGroup
}
func NewWorkerPool(workers int) *WorkerPool {
return &WorkerPool{
tasks: make(chan int, 100),
workers: workers,
}
}
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workers; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for task := range wp.tasks {
fmt.Printf("Worker %d 处理任务 %d\n", id, task)
time.Sleep(100 * time.Millisecond)
}
}
func (wp *WorkerPool) Submit(task int) {
wp.tasks <- task
}
func (wp *WorkerPool) Stop() {
close(wp.tasks)
wp.wg.Wait()
}
func main() {
pool := NewWorkerPool(5) // 限制为 5 个 worker
pool.Start()
// 提交 20 个任务
for i := 1; i <= 20; i++ {
pool.Submit(i)
}
pool.Stop()
fmt.Println("所有任务完成")
}
2. 避免 Goroutine 泄露:
package main
import (
"fmt"
"time"
)
// 错误示例:goroutine 泄露
func leak() {
ch := make(chan int)
go func() {
val := <-ch // 永远阻塞,goroutine 泄露
fmt.Println(val)
}()
}
// 正确示例:使用 context 控制
import "context"
func noLeak(ctx context.Context) {
ch := make(chan int)
go func() {
select {
case val := <-ch:
fmt.Println(val)
case <-ctx.Done():
fmt.Println("goroutine 被取消")
return
}
}()
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
noLeak(ctx)
time.Sleep(3 * time.Second)
}
3. 使用 WaitGroup 等待 Goroutine:
package main
import (
"fmt"
"sync"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d 完成\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("所有 worker 完成")
}
4. 使用 ErrGroup 处理错误:
package main
import (
"errors"
"fmt"
"golang.org/x/sync/errgroup"
)
func task(name string, shouldFail bool) error {
fmt.Printf("Task %s 开始\n", name)
if shouldFail {
return errors.New("task failed")
}
fmt.Printf("Task %s 完成\n", name)
return nil
}
func main() {
var g errgroup.Group
g.Go(func() error {
return task("A", false)
})
g.Go(func() error {
return task("B", true) // 这个会失败
})
g.Go(func() error {
return task("C", false)
})
if err := g.Wait(); err != nil {
fmt.Println("错误:", err)
}
}
5. 使用 Channel 进行通信:
package main
import (
"fmt"
"time"
)
func producer(ch chan<- int) {
for i := 1; i <= 5; i++ {
ch <- i
fmt.Printf("生产: %d\n", i)
time.Sleep(100 * time.Millisecond)
}
close(ch)
}
func consumer(ch <-chan int) {
for val := range ch {
fmt.Printf("消费: %d\n", val)
time.Sleep(150 * time.Millisecond)
}
}
func main() {
ch := make(chan int, 3)
go producer(ch)
go consumer(ch)
time.Sleep(2 * time.Second)
}
6. 使用 Context 取消 Goroutine:
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d 被取消\n", id)
return
default:
fmt.Printf("Worker %d 工作中...\n", id)
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
for i := 1; i <= 3; i++ {
go worker(ctx, i)
}
time.Sleep(2 * time.Second)
cancel() // 取消所有 goroutine
time.Sleep(1 * time.Second)
}
7. 使用 Mutex 保护共享资源:
package main
import (
"fmt"
"sync"
)
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
var counter Counter
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Printf("最终值: %d\n", counter.Value())
}
8. 性能优化技巧:
- 避免过度创建 goroutine:使用 worker pool 模式
- 合理设置 GOMAXPROCS:根据 CPU 核心数调整
- 使用缓冲 channel:减少阻塞
- 避免锁竞争:尽量使用 channel 通信
- 使用 sync.Pool:重用对象,减少 GC 压力
- 监控 goroutine 数量:使用 runtime.NumGoroutine()
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func monitorGoroutines() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
fmt.Printf("当前 goroutine 数量: %d\n", runtime.NumGoroutine())
}
}
func main() {
go monitorGoroutines()
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(2 * time.Second)
}()
}
wg.Wait()
fmt.Println("所有 goroutine 完成")
}
Channel
Channel 是 goroutine 之间通信的管道。
package main
import "fmt"
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum // 和发送到 channel
}
func main() {
s := []int{7, 2, 8, -9, 4, 0}
c := make(chan int)
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
x, y := <-c, <-c // 从 channel 接收
fmt.Println(x, y, x+y)
}
Channel 内部实现原理
1. Channel 的核心数据结构:
Channel 在 Go 运行时中的核心结构是 hchan:
// runtime/chan.go - Channel 的核心结构(简化版)
type hchan struct {
qcount uint // 当前队列中元素个数
dataqsiz uint // 环形队列大小(容量)
buf unsafe.Pointer // 环形队列指针
elemsize uint16 // 元素大小
closed uint32 // channel 是否关闭
elemtype *_type // 元素类型
sendx uint // 发送索引
recvx uint // 接收索引
recvq waitq // 接收等待队列
sendq waitq // 发送等待队列
lock mutex // 互斥锁
}
2. 环形队列实现:
Channel 使用环形队列来存储数据,这是一个高效的循环缓冲区:
// 环形队列的工作原理示例
// 假设容量为 5 的 channel
// 初始状态: []
// sendx = 0, recvx = 0
// 发送 1, 2, 3: [1, 2, 3, _, _]
// sendx = 3, recvx = 0
// 接收 2 个值: [_, _, 3, _, _]
// sendx = 3, recvx = 2
// 继续发送 4, 5: [_, _, 3, 4, 5]
// sendx = 0, recvx = 2 (循环回到开头)
// 接收 3 个值: [_, _, _, _, _]
// sendx = 0, recvx = 0 (队列为空)
3. 发送操作流程:
// 发送操作: ch <- value
// 步骤 1: 获取 channel 锁
// lock(&c.lock)
// 步骤 2: 检查是否有接收者在等待
if sg := c.recvq.dequeue(); sg != nil {
// 直接传递数据给接收者,不经过缓冲区
send(c, sg, ep)
return
}
// 步骤 3: 如果缓冲区有空间,放入缓冲区
if c.qcount < c.dataqsiz {
// 数据放入环形队列
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
return
}
// 步骤 4: 缓冲区已满,阻塞当前 goroutine
// 当前 goroutine 加入发送等待队列
// gopark() - 让出 CPU,等待被唤醒
4. 接收操作流程:
// 接收操作: value := <-ch
// 步骤 1: 获取 channel 锁
// lock(&c.lock)
// 步骤 2: 检查是否有发送者在等待
if sg := c.sendq.dequeue(); sg != nil {
// 直接从发送者获取数据
recv(c, sg, ep)
return
}
// 步骤 3: 如果缓冲区有数据,从缓冲区读取
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
return
}
// 步骤 4: 缓冲区为空且未关闭,阻塞当前 goroutine
// 当前 goroutine 加入接收等待队列
// gopark() - 让出 CPU,等待被唤醒
5. 关闭操作流程:
// 关闭操作: close(ch)
// 步骤 1: 获取 channel 锁
// lock(&c.lock)
// 步骤 2: 标记 channel 为已关闭
c.closed = 1
// 步骤 3: 唤醒所有等待的接收者
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
sg.elem = nil // 接收者获得零值
goready(sg.g) // 唤醒 goroutine
}
// 步骤 4: 唤醒所有等待的发送者(它们会 panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
goready(sg.g) // 唤醒后发送者会 panic
}
Channel 最佳实践
1. 选择合适的 Channel 类型:
package main
import "fmt"
// 无缓冲 Channel(同步)- 适用于需要同步的场景
func synchronousExample() {
ch := make(chan int)
go func() {
fmt.Println("发送者: 准备发送")
ch <- 42 // 阻塞直到有消费者接收
fmt.Println("发送者: 数据已发送")
}()
fmt.Println("接收者: 等待数据")
val := <-ch
fmt.Printf("接收者: 收到 %d\n", val)
}
// 缓冲 Channel(异步)- 适用于生产者和消费者速度不匹配
func bufferedExample() {
ch := make(chan int, 3) // 缓冲大小为 3
// 可以发送 3 个数据而不会阻塞
ch <- 1
ch <- 2
ch <- 3
fmt.Println("已发送 3 个数据")
fmt.Println(<-ch)
fmt.Println(<-ch)
fmt.Println(<-ch)
}
2. 避免 Goroutine 泄露:
package main
import (
"context"
"fmt"
"time"
)
// 错误示例:goroutine 泄露
func leakExample() {
ch := make(chan int)
go func() {
val := <-ch // 永远阻塞,goroutine 泄露
fmt.Println(val)
}()
// 如果 ch 永远不发送数据,goroutine 永不退出
}
// 正确示例:使用 context 控制
func noLeakExample() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
ch := make(chan int)
go func() {
select {
case val := <-ch:
fmt.Println(val)
case <-ctx.Done():
fmt.Println("goroutine 被取消")
return
}
}()
}
3. 正确关闭 Channel:
package main
import "fmt"
// 原则:只在发送方关闭 channel,不要在接收方关闭
func producer(ch chan<- int) {
defer close(ch) // 发送完成后关闭
for i := 0; i < 5; i++ {
ch <- i
}
}
func consumer(ch <-chan int) {
for v := range ch { // 自动检测 channel 关闭
fmt.Println(v)
}
fmt.Println("channel 已关闭")
}
// 使用方向性 channel
func directionalExample() {
ch := make(chan int)
go producer(ch)
consumer(ch)
}
4. 处理 Channel 关闭后的零值:
package main
import "fmt"
func safeReceive(ch <-chan int) (int, bool) {
v, ok := <-ch
if !ok {
// channel 已关闭,v 是零值
fmt.Println("channel 已关闭")
return 0, false
}
return v, true
}
5. 使用 Select 实现超时:
package main
import (
"fmt"
"time"
)
func workerWithTimeout(ch <-chan int, timeout time.Duration) {
select {
case v := <-ch:
fmt.Printf("收到: %d\n", v)
case <-time.After(timeout):
fmt.Println("超时")
}
}
6. 扇出模式(Fan-out):
package main
import "fmt"
func worker(id int, input <-chan int, output chan<- int) {
defer close(output)
for v := range input {
result := v * 2 // 处理数据
output <- result
fmt.Printf("Worker %d 处理: %d -> %d\n", id, v, result)
}
}
func fanOut(input <-chan int, numWorkers int) []<-chan int {
outputs := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
outputs[i] = make(chan int)
go worker(i, input, outputs[i])
}
return outputs
}
7. 扇入模式(Fan-in):
package main
import (
"sync"
)
func fanIn(inputs ...<-chan int) <-chan int {
output := make(chan int)
var wg sync.WaitGroup
for _, input := range inputs {
wg.Add(1)
go func(ch <-chan int) {
defer wg.Done()
for v := range ch {
output <- v
}
}(input)
}
go func() {
wg.Wait()
close(output)
}()
return output
}
8. 使用 Channel 实现信号量:
package main
import "fmt"
// 限制并发数
func semaphore(maxConcurrent int) {
sem := make(chan struct{}, maxConcurrent)
for i := 0; i < 100; i++ {
sem <- struct{}{} // 获取信号量
go func(id int) {
defer func() { <-sem }() // 释放信号量
doWork(id)
}(i)
}
}
func doWork(id int) {
fmt.Printf("Task %d 开始\n", id)
}
9. Channel 性能优化技巧:
- 预分配缓冲区:根据实际情况调整缓冲大小
- 避免频繁创建和销毁 channel:重用 channel
- 使用指针传递大对象:减少拷贝开销
- 批量处理数据:减少 channel 操作次数
package main
type LargeData struct {
// 大量字段
}
// 传递指针而不是值
func pointerChannel() {
ch := make(chan *LargeData, 100)
data := &LargeData{}
ch <- data // 只传递指针
}
// 批量处理
func batchProcess(ch <-chan int, batchSize int) {
batch := make([]int, 0, batchSize)
for v := range ch {
batch = append(batch, v)
if len(batch) >= batchSize {
processBatch(batch)
batch = batch[:0]
}
}
if len(batch) > 0 {
processBatch(batch)
}
}
10. Channel 错误处理:
package main
import (
"fmt"
"time"
)
func safeChannelOperations() {
ch := make(chan int)
go func() {
defer close(ch)
for i := 0; i < 5; i++ {
ch <- i
}
}()
for {
select {
case v, ok := <-ch:
if !ok {
fmt.Println("Channel 已关闭")
return
}
fmt.Printf("收到: %d\n", v)
case <-time.After(1 * time.Second):
fmt.Println("超时")
return
}
}
}
总结:
- 内部原理要点:环形队列、等待队列、直接传递、锁保护
- 最佳实践要点:正确关闭、避免泄露、使用方向性、扇出扇入模式
- 性能优化:合理缓冲、批量处理、指针传递、重用 channel
Buffered Channel
package main
import "fmt"
func main() {
// 创建带缓冲的 channel
ch := make(chan int, 2)
ch <- 1
ch <- 2
fmt.Println(<-ch)
fmt.Println(<-ch)
}
Range 和 Close
package main
import "fmt"
func fibonacci(n int, c chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
close(c) // 关闭 channel
}
func main() {
c := make(chan int, 10)
go fibonacci(cap(c), c)
// range 会持续从 channel 接收,直到 channel 被关闭
for i := range c {
fmt.Println(i)
}
}
Select
Select 语句让 goroutine 可以等待多个通信操作。
package main
import "fmt"
func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}
默认选择
package main
import (
"fmt"
"time"
)
func main() {
tick := time.Tick(100 * time.Millisecond)
boom := time.After(500 * time.Millisecond)
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return
default:
fmt.Println(" .")
time.Sleep(50 * time.Millisecond)
}
}
}
互斥锁(Mutex)
package main
import (
"fmt"
"sync"
"time"
)
// SafeCounter 是并发安全的计数器
type SafeCounter struct {
mu sync.Mutex
v map[string]int
}
func (c *SafeCounter) Inc(key string) {
c.mu.Lock()
// Lock 之后同一时刻只有一个 goroutine 能访问 c.v
c.v[key]++
c.mu.Unlock()
}
func (c *SafeCounter) Value(key string) int {
c.mu.Lock()
// Lock 之后同一时刻只有一个 goroutine 能访问 c.v
defer c.mu.Unlock()
return c.v[key]
}
func main() {
c := SafeCounter{v: make(map[string]int)}
for i := 0; i < 1000; i++ {
go c.Inc("somekey")
}
time.Sleep(time.Second)
fmt.Println(c.Value("somekey"))
}
WaitGroup
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d 开始工作\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d 完成工作\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("所有 worker 完成")
}
Context
package main
import (
"context"
"fmt"
"time"
)
func main() {
// 创建一个可取消的 context
ctx, cancel := context.WithCancel(context.Background())
go func() {
for {
select {
case <-ctx.Done():
fmt.Println("goroutine 被取消")
return
default:
fmt.Println("goroutine 运行中")
time.Sleep(500 * time.Millisecond)
}
}
}()
time.Sleep(2 * time.Second)
fmt.Println("取消 goroutine")
cancel()
time.Sleep(time.Second)
}
超时控制
package main
import (
"context"
"fmt"
"time"
)
func longRunningTask(ctx context.Context) {
select {
case <-time.After(5 * time.Second):
fmt.Println("任务完成")
case <-ctx.Done():
fmt.Println("任务超时:", ctx.Err())
}
}
func main() {
// 设置 2 秒超时
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
longRunningTask(ctx)
}