Etcd - Go 分布式存储
Etcd 是高可用的分布式键值存储系统,广泛用于服务发现、配置管理、分布式锁等场景。掌握 Etcd 是开发分布式系统的基础。
📌 核心概念
🗄️
键值存储
强一致性
Raft 协议
👁️
Watch 机制
变更通知
实时监听
🔒
分布式锁
Lease 租约
并发控制
📡
服务发现
服务注册
健康检查
快速开始
📝 连接 Etcd
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/client/v3"
)
func main() {
// 创建客户端
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
panic(err)
}
defer cli.Close()
ctx := context.Background()
// 设置值
_, err = cli.Put(ctx, "key1", "value1")
if err != nil {
panic(err)
}
// 获取值
resp, err := cli.Get(ctx, "key1")
if err != nil {
panic(err)
}
for _, kv := range resp.Kvs {
fmt.Printf("%s : %s\n", kv.Key, kv.Value)
}
}
💡 Etcd 要点
- Raft 协议: 强一致性保证
- Lease: 租约机制实现过期
- Watch: 实时监听键变更
- 事务: 支持条件操作
Lease 租约
📝 Lease 使用
func leaseExample(ctx context.Context, cli *clientv3.Client) {
// 创建 Lease
lease := clientv3.NewLease(cli)
// 授予租约 (10 秒)
leaseResp, _ := lease.Grant(ctx, 10)
leaseID := leaseResp.ID
// 带租约设置值
cli.Put(ctx, "session/key", "value",
clientv3.WithLease(leaseID))
// 续租
keepAliveCh, _ := lease.KeepAlive(ctx, leaseID)
// 监听续租响应
go func() {
for range keepAliveCh {
fmt.Println("Lease kept alive")
}
}()
// 撤销租约
// lease.Revoke(ctx, leaseID)
}
Watch 监听
📝 Watch 键变更
func watchExample(ctx context.Context, cli *clientv3.Client) {
// 监听单个键
watchCh := cli.Watch(ctx, "config/app")
go func() {
for resp := range watchCh {
for _, ev := range resp.Events {
switch ev.Type {
case clientv3.EventTypePut:
fmt.Printf("PUT: %s = %s\n",
ev.Kv.Key, ev.Kv.Value)
case clientv3.EventTypeDelete:
fmt.Printf("DELETE: %s\n", ev.Kv.Key)
}
}
}
}()
// 监听前缀
prefixWatchCh := cli.Watch(ctx, "services/",
clientv3.WithPrefix())
// 监听范围
rangeCh := cli.Watch(ctx, "from",
clientv3.WithRange("to"))
}
分布式锁
📝 实现分布式锁
type DistributedLock struct {
cli *clientv3.Client
key string
lease clientv3.LeaseID
}
func NewLock(cli *clientv3.Client, key string) *DistributedLock {
return &DistributedLock{cli: cli, key: key}
}
func (l *DistributedLock) Lock(ctx context.Context) error {
// 创建租约
lease := clientv3.NewLease(l.cli)
leaseResp, err := lease.Grant(ctx, 10)
if err != nil {
return err
}
l.lease = leaseResp.ID
// 尝试获取锁 (如果键不存在则创建)
resp, err := l.cli.Txn(ctx).
If(clientv3.Compare(clientv3.CreateRevision(l.key), "=", 0)).
Then(clientv3.OpPut(l.key, "locked",
clientv3.WithLease(l.lease))).
Commit()
if err != nil || !resp.Succeeded {
return fmt.Errorf("Failed to acquire lock")
}
// 续租
_, _ = lease.KeepAlive(ctx, l.lease)
return nil
}
func (l *DistributedLock) Unlock(ctx context.Context) error {
lease := clientv3.NewLease(l.cli)
_, err := lease.Revoke(ctx, l.lease)
return err
}
服务发现
📝 服务注册与发现
type ServiceRegistry struct {
cli *clientv3.Client
}
// 服务注册
func (r *ServiceRegistry) Register(ctx context.Context,
serviceName, addr string, ttl int64) error {
lease := clientv3.NewLease(r.cli)
leaseResp, _ := lease.Grant(ctx, ttl)
key := fmt.Sprintf("services/%s/%s", serviceName, addr)
_, err := r.cli.Put(ctx, key, addr,
clientv3.WithLease(leaseResp.ID))
// 自动续租
go func() {
ch, _ := lease.KeepAlive(ctx, leaseResp.ID)
for range ch {}
}()
return err
}
// 服务发现
func (r *ServiceRegistry) Discover(ctx context.Context,
serviceName string) ([]string, error) {
prefix := fmt.Sprintf("services/%s/", serviceName)
resp, err := r.cli.Get(ctx, prefix,
clientv3.WithPrefix())
if err != nil {
return nil, err
}
var addrs []string
for _, kv := range resp.Kvs {
addrs = append(addrs, string(kv.Value))
}
return addrs, nil
}
事务操作
📝 Etcd 事务
func transactionExample(ctx context.Context, cli *clientv3.Client) {
// 条件事务
resp, err := cli.Txn(ctx).
// 条件:key1 的值等于 "value1"
If(clientv3.Compare(clientv3.Value("key1"), "=", "value1")).
// 条件满足时执行
Then(clientv3.OpPut("key2", "value2")).
// 条件不满足时执行
Else(clientv3.OpPut("key2", "alternative")).
Commit()
if err != nil {
panic(err)
}
fmt.Printf("Transaction succeeded: %v\n", resp.Succeeded)
// 批量操作
cli.Txn(ctx).
Then(
clientv3.OpPut("a", "1"),
clientv3.OpPut("b", "2"),
clientv3.OpDelete("c"),
).Commit()
}
最佳实践
✅ Etcd 使用建议
- 连接池: 复用 etcd 客户端
- 超时设置: 所有操作设置超时
- Lease 续租: 后台持续续租
- Watch 重连: 处理 Watch 断开重连
- 键命名: 使用斜杠分隔命名空间