ETCD学习笔记
etcd是什么
etcd是一个分布式、高可用的键值存储系统,常用于配置中心、服务注册、Leader选举等场景。
分布式系统
采用etcd作为分布式一致性KV存储,它基于Raft算法,可以保证在主从节点之间数据一致性。我们常把服务注册信息、配置中心的数据保存在etcd中,多个服务节点通过watch等机制保持一致状态。
分布式锁
数据一致性
Raft算法
代码实例
package mainimport ("context""log""time"clientv3 "go.etcd.io/etcd/client/v3"
)// 服务注册
func registerService(client *clientv3.Client, serviceName, serviceAddr string, ttl int64) error {// 创建租约leaseResp, err := client.Grant(context.TODO(), ttl)if err != nil {return err}// 服务路径key := "/services/" + serviceName + "/" + serviceAddr// 将服务地址注册到etcd,并与租约关联_, err = client.Put(context.TODO(), key, serviceAddr, clientv3.WithLease(leaseResp.ID))if err != nil {return err}// 自动续约ch, err := client.KeepAlive(context.TODO(), leaseResp.ID)if err != nil {return err}// 处理续约应答go func() {for range ch {// 续约成功log.Printf("Service %s renewed", serviceAddr)}log.Printf("Service %s lease expired", serviceAddr)}()return nil
}// 服务发现
func discoverServices(client *clientv3.Client, serviceName string) ([]string, error) {// 前缀查询resp, err := client.Get(context.TODO(), "/services/"+serviceName+"/", clientv3.WithPrefix())if err != nil {return nil, err}var services []stringfor _, kv := range resp.Kvs {services = append(services, string(kv.Value))}return services, nil
}// 监听服务变化
func watchServices(client *clientv3.Client, serviceName string) {rch := client.Watch(context.TODO(), "/services/"+serviceName+"/", clientv3.WithPrefix())for wresp := range rch {for _, ev := range wresp.Events {switch ev.Type {case clientv3.EventTypePut:log.Printf("Service added: %s", ev.Kv.Value)case clientv3.EventTypeDelete:log.Printf("Service removed: %s", ev.Kv.Key)}}}
}func main() {// 连接etcdclient, err := clientv3.New(clientv3.Config{Endpoints: []string{"localhost:2379"}, // etcd服务器地址DialTimeout: 5 * time.Second,})if err != nil {log.Fatalf("Failed to connect to etcd: %v", err)}defer client.Close()// 注册服务serviceName := "user-service"serviceAddr := "127.0.0.1:8080"err = registerService(client, serviceName, serviceAddr, 10) // 10秒租约if err != nil {log.Fatalf("Failed to register service: %v", err)}log.Printf("Service %s registered successfully", serviceAddr)// 启动服务监听go watchServices(client, serviceName)// 发现服务services, err := discoverServices(client, serviceName)if err != nil {log.Fatalf("Failed to discover services: %v", err)}log.Printf("Discovered services: %v", services)// 保持程序运行select {}
}
代码功能
- 服务注册
1. 使用etcd的租约机制注册(为了避免死锁,使用租约)
2. 设置服务的存活时间(租约有效时间,到期前如果未续费就自动断开)
3. 通过自动续约机制保持服务在线状态
4. 服务路径格式为/services/服务名/服务地址 - 服务发现
- 通过前缀查询获取指定服务名的所有实例
- 可以获取当前可用的服务列表
- 服务监听
- 实时监控服务的变化(新增或移除)
- 当服务上下线时能及时收到通知
上线与注册流程说明
- 连接etcd集群
client, err := clientv3.New(clientv3.Config{Endpoints: []string{"localhost:2379"}, // etcd服务器地址DialTimeout: 5 * time.Second,
})
创建了一个 etcd 客户端,通过指定的端点 (Endpoints) 连接到 etcd 集群。
- 服务注册实现
// 创建租约
leaseResp, err := client.Grant(context.TODO(), ttl)// 写入服务信息并关联租约
_, err = client.Put(context.TODO(), key, serviceAddr, clientv3.WithLease(leaseResp.ID))// 自动续约
ch, err := client.KeepAlive(context.TODO(), leaseResp.ID)// 处理续约应答
go func() {for range ch {// 续约成功}
}()
- Grant() 创建租约,设置服务存活时间
- Put() 将服务信息写入 etcd,并通过WithLease()关联租约
- KeepAlive() 启动自动续约机制
- 用 goroutine 处理续约响应,保持服务活跃
- 服务发现实现
// 前缀查询获取服务实例
resp, err := client.Get(context.TODO(), "/services/"+serviceName+"/", clientv3.WithPrefix())// 提取服务地址
var services []string
for _, kv := range resp.Kvs {services = append(services, string(kv.Value))
}
- Get() 方法配合WithPrefix()参数实现前缀查询
- 遍历查询结果,提取所有服务实例地址
- 监听服务变化
rch := client.Watch(context.TODO(), "/services/"+serviceName+"/", clientv3.WithPrefix())for wresp := range rch {for _, ev := range wresp.Events {switch ev.Type {case clientv3.EventTypePut: // 服务新增case clientv3.EventTypeDelete: // 服务移除}}
}
- Watch() 方法建立对指定前缀路径的监听
- 通过循环读取事件通道,处理服务新增和移除事件
- 服务下线