← Redis | Gateway →

Etcd - Go 分布式存储

Etcd 是高可用的分布式键值存储系统,广泛用于服务发现、配置管理、分布式锁等场景。掌握 Etcd 是开发分布式系统的基础。

📌 核心概念

🗄️

键值存储

强一致性

Raft 协议
👁️

Watch 机制

变更通知

实时监听
🔒

分布式锁

Lease 租约

并发控制
📡

服务发现

服务注册

健康检查

快速开始

📝 连接 Etcd

package main

import (
    "context"
    "fmt"
    "time"
    "go.etcd.io/etcd/client/v3"
)

func main() {
    // 创建客户端
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        panic(err)
    }
    defer cli.Close()
    
    ctx := context.Background()
    
    // 设置值
    _, err = cli.Put(ctx, "key1", "value1")
    if err != nil {
        panic(err)
    }
    
    // 获取值
    resp, err := cli.Get(ctx, "key1")
    if err != nil {
        panic(err)
    }
    
    for _, kv := range resp.Kvs {
        fmt.Printf("%s : %s\n", kv.Key, kv.Value)
    }
}

💡 Etcd 要点

  • Raft 协议: 强一致性保证
  • Lease: 租约机制实现过期
  • Watch: 实时监听键变更
  • 事务: 支持条件操作

Lease 租约

📝 Lease 使用

func leaseExample(ctx context.Context, cli *clientv3.Client) {
    // 创建 Lease
    lease := clientv3.NewLease(cli)
    
    // 授予租约 (10 秒)
    leaseResp, _ := lease.Grant(ctx, 10)
    leaseID := leaseResp.ID
    
    // 带租约设置值
    cli.Put(ctx, "session/key", "value", 
        clientv3.WithLease(leaseID))
    
    // 续租
    keepAliveCh, _ := lease.KeepAlive(ctx, leaseID)
    
    // 监听续租响应
    go func() {
        for range keepAliveCh {
            fmt.Println("Lease kept alive")
        }
    }()
    
    // 撤销租约
    // lease.Revoke(ctx, leaseID)
}

Watch 监听

📝 Watch 键变更

func watchExample(ctx context.Context, cli *clientv3.Client) {
    // 监听单个键
    watchCh := cli.Watch(ctx, "config/app")
    
    go func() {
        for resp := range watchCh {
            for _, ev := range resp.Events {
                switch ev.Type {
                case clientv3.EventTypePut:
                    fmt.Printf("PUT: %s = %s\n", 
                        ev.Kv.Key, ev.Kv.Value)
                case clientv3.EventTypeDelete:
                    fmt.Printf("DELETE: %s\n", ev.Kv.Key)
                }
            }
        }
    }()
    
    // 监听前缀
    prefixWatchCh := cli.Watch(ctx, "services/", 
        clientv3.WithPrefix())
    
    // 监听范围
    rangeCh := cli.Watch(ctx, "from", 
        clientv3.WithRange("to"))
}

分布式锁

📝 实现分布式锁

type DistributedLock struct {
    cli  *clientv3.Client
    key  string
    lease clientv3.LeaseID
}

func NewLock(cli *clientv3.Client, key string) *DistributedLock {
    return &DistributedLock{cli: cli, key: key}
}

func (l *DistributedLock) Lock(ctx context.Context) error {
    // 创建租约
    lease := clientv3.NewLease(l.cli)
    leaseResp, err := lease.Grant(ctx, 10)
    if err != nil {
        return err
    }
    l.lease = leaseResp.ID
    
    // 尝试获取锁 (如果键不存在则创建)
    resp, err := l.cli.Txn(ctx).
        If(clientv3.Compare(clientv3.CreateRevision(l.key), "=", 0)).
        Then(clientv3.OpPut(l.key, "locked", 
            clientv3.WithLease(l.lease))).
        Commit()
    
    if err != nil || !resp.Succeeded {
        return fmt.Errorf("Failed to acquire lock")
    }
    
    // 续租
    _, _ = lease.KeepAlive(ctx, l.lease)
    
    return nil
}

func (l *DistributedLock) Unlock(ctx context.Context) error {
    lease := clientv3.NewLease(l.cli)
    _, err := lease.Revoke(ctx, l.lease)
    return err
}

服务发现

📝 服务注册与发现

type ServiceRegistry struct {
    cli *clientv3.Client
}

// 服务注册
func (r *ServiceRegistry) Register(ctx context.Context, 
    serviceName, addr string, ttl int64) error {
    
    lease := clientv3.NewLease(r.cli)
    leaseResp, _ := lease.Grant(ctx, ttl)
    
    key := fmt.Sprintf("services/%s/%s", serviceName, addr)
    
    _, err := r.cli.Put(ctx, key, addr, 
        clientv3.WithLease(leaseResp.ID))
    
    // 自动续租
    go func() {
        ch, _ := lease.KeepAlive(ctx, leaseResp.ID)
        for range ch {}
    }()
    
    return err
}

// 服务发现
func (r *ServiceRegistry) Discover(ctx context.Context, 
    serviceName string) ([]string, error) {
    
    prefix := fmt.Sprintf("services/%s/", serviceName)
    
    resp, err := r.cli.Get(ctx, prefix, 
        clientv3.WithPrefix())
    
    if err != nil {
        return nil, err
    }
    
    var addrs []string
    for _, kv := range resp.Kvs {
        addrs = append(addrs, string(kv.Value))
    }
    
    return addrs, nil
}

事务操作

📝 Etcd 事务

func transactionExample(ctx context.Context, cli *clientv3.Client) {
    // 条件事务
    resp, err := cli.Txn(ctx).
        // 条件:key1 的值等于 "value1"
        If(clientv3.Compare(clientv3.Value("key1"), "=", "value1")).
        // 条件满足时执行
        Then(clientv3.OpPut("key2", "value2")).
        // 条件不满足时执行
        Else(clientv3.OpPut("key2", "alternative")).
        Commit()
    
    if err != nil {
        panic(err)
    }
    
    fmt.Printf("Transaction succeeded: %v\n", resp.Succeeded)
    
    // 批量操作
    cli.Txn(ctx).
        Then(
            clientv3.OpPut("a", "1"),
            clientv3.OpPut("b", "2"),
            clientv3.OpDelete("c"),
        ).Commit()
}

最佳实践

✅ Etcd 使用建议

  • 连接池: 复用 etcd 客户端
  • 超时设置: 所有操作设置超时
  • Lease 续租: 后台持续续租
  • Watch 重连: 处理 Watch 断开重连
  • 键命名: 使用斜杠分隔命名空间