【C++框架#3】Etcd 安装使用
Ectd 安装和使用
1. 介绍
Etcd 是一个 golang 编写的分布式、高可用的一致性键值存储系统,用于配置共享和服务发现等。它使用 Raft 一致性算法 来保持集群数据的一致性,且客户端通过长连接 watch
功能,能够及时收到数据变化通知,相较于 Zookeeper 框架更加轻量化。
2. 安装配置
首先,需要在你的系统中安装 Etcd。Etcd 是一个分布式键值存储,通常用于服务发现和配置管理。
以下是在 Linux 系统上安装 Etcd 的基本步骤
-
安装 Etcd
sudo apt-get install etcd
-
启动 Ectd 服务
sudo systemctl start etcd
-
设置 Ecyd 开机自启
sudo systemctl enable etcd
节点配置
lighthouse@VM-8-10-ubuntu:~$ sudo netstat -anptu | grep etcd
tcp 0 0 127.0.0.1:2380 0.0.0.0:* LISTEN 2136420/etcd
tcp 0 0 127.0.0.1:2379 0.0.0.0:* LISTEN 2136420/etcd
tcp 0 0 127.0.0.1:59936 127.0.0.1:2379 ESTABLISHED 2136420/etcd
tcp 0 0 127.0.0.1:2379 127.0.0.1:59936 ESTABLISHED 2136420/etcd
如果是单节点集群其实就可以不用进行配置,默认 etcd 的集群节点通信端口为 2380客户端访问端口为 2379
若需要修改,则可以配置:/etc/default/etcd
# 节点名称,默认为 "default"
ETCD_NAME="etcd1"
# 数据目录,默认为 "${name}.etcd"
ETCD_DATA_DIR="/var/lib/etcd/default.etcd"# 用于客户端连接的 URL。
ETCD_LISTEN_CLIENT_URLS="http://192.168.65.132:2379,http://127.0.0.1:2379"
# 用于客户端访问的公开,也就是提供服务的 URL
ETCD_ADVERTISE_CLIENT_URLS="http://192.168.65.132:2379,http://127.0.0.1:2379"
# 用于集群节点间通信的 URL。
ETCD_LISTEN_PEER_URLS="http://192.168.65.132:2380"
ETCD_INITIAL_ADVERTISE_PEER_URLS="http://192.168.65.132:2380"
# 心跳间隔时间-毫秒
ETCD_HEARTBEAT_INTERVAL=100
# 选举超时时间-毫秒
ETCD_ELECTION_TIMEOUT=1000
集群配置
- 若无集群则需要注销
- 初始集群状态和配置–集群中所有节点
#ETCD_INITIAL_CLUSTER="etcd1=http://192.168.65.132:2380,etcd2=http://192.168.65.132:2381,etcd3=http://192.168.65.132:2382"
# 初始集群令牌-集群的 ID
# ETCD_INITIAL_CLUSTER_TOKEN="etcd-cluster"
# ETCD_INITIAL_CLUSTER_STATE="new"
安全配置:如果要求 SSL 连接 etcd 的话,把下面的配置启用,并修改文件路径
# ETCD_CERT_FILE="/etc/ssl/client.pem"
# ETCD_KEY_FILE="/etc/ssl/client-key.pem"
# ETCD_CLIENT_CERT_AUTH="true"
# ETCD_TRUSTED_CA_FILE="/etc/ssl/ca.pem"
# ETCD_AUTO_TLS="true"
# ETCD_PEER_CERT_FILE="/etc/ssl/member.pem"
# ETCD_PEER_KEY_FILE="/etc/ssl/member-key.pem"
# ETCD_PEER_CLIENT_CERT_AUTH="false"
# ETCD_PEER_TRUSTED_CA_FILE="/etc/ssl/ca.pem"
# ETCD_PEER_AUTO_TLS="true"
上面修改成如下结果
还有对下面进行修改
ETCD_ADVERTISE_CLIENT_URLS="http://1.12.51.69:2379"
然后重启,结果如下
lighthouse@VM-8-10-ubuntu:~$ sudo netstat -anptu | grep etcd
tcp 0 0 127.0.0.1:2380 0.0.0.0:* LISTEN 2143555/etcd
tcp 0 0 127.0.0.1:45232 127.0.0.1:2379 ESTABLISHED 2143555/etcd
tcp6 0 0 :::2379 :::* LISTEN 2143555/etcd
tcp6 0 0 127.0.0.1:2379 127.0.0.1:45232 ESTABLISHED 2143555/etcd
此时外来的服务器也可以访问这个端口
运行验证
lighthouse@VM-8-10-ubuntu:~$ etcdctl put mykey "this is awesome"
No help topic for 'put'
但是上面出现了报错,说明我们没有确定 etcd 版本,操作如下:
# 在 /etc/profile 末尾声明环境变量 EXCDCTL_API = 3
sudo vi /etc/profile
export ETCDCTL_API = 3
修改完毕后,重新加载配置文件,并重新执行测试指令
lighthouse@VM-8-10-ubuntu:~$ source /etc/profile
lighthouse@VM-8-10-ubuntu:~$ etcdctl put mykey "this is awesome"
OKlighthouse@VM-8-10-ubuntu:~$ etcdctl get mykey
mykey
this is awesome# 删除
lighthouse@VM-8-10-ubuntu:~$ etcdctl del mykey
1
3. 搭建服务注册发现中心
使用 Etcd 作为服务注册发现中心,我们需要定义服务的注册和发现逻辑。这通常涉及到以下几个操作:
- 服务注册:服务启动时,向 Etcd 注册自己的地址和端口
- 服务发现:客户端通过 Etcd 获取服务的地址和端口,用于远程调用
- 健康检查:服务定期向 Etcd 发送心跳,以维持其注册信息的有效性
etcd 采用 golang 编写,v3 版本通信采用 grpcAPI(HTTP2+protobuf);但是官方只维护了 go 语言版本的 client 库,因此需要找到 C/C++ 非官方的 client 开发库
3.1 etcd-cpp-apiv3
etcd-cpp-apiv3 是一个 etcd 的 C++版本客户端API。它依赖于 mipsasm
、boost
、protobuf
、gRPC
、cpprestsdk
等库。
etcd-cpp-apiv3 的 GitHub 地址是:https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3
依赖安装
sudo apt-get install libboost-all-dev libssl-dev # ssl: 加密库
sudo apt-get install libprotobuf-dev protobuf-compiler-grpc
sudo apt-get install libgrpc-dev libgrpc++-dev # etcd 所依赖框架
sudo apt-get install libcpprest-dev # 网络通信第三方库
API 框架安装
git clone https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3.git
cd etcd-cpp-apiv3
mkdir build && cd build # 生成 Makefile
cmake .. -DCMAKE_INSTALL_PREFIX=/usr
# 编译安装
make -j$(nproc) && sudo make install
但是在最后一步的时候,我出现了如下问题
/home/lighthouse/code/project/etcd/etcd-cpp-apiv3/build/src/../proto/gen/proto/google/api/http.pb.h:19:2: error: #error regenerate this file with a newer version of protoc.19 | #error regenerate this file with a newer version of protoc.
然后查看我的 protobuf 版本
lighthouse@VM-8-10-ubuntu:build$ sudo find /usr -name protobuf
/usr/include/google/protobuf
/usr/local/protobuf
/usr/local/protobuf/include/google/protobuf
发现是版本冲突,解决如下:
lighthouse@VM-8-10-ubuntu:build$ ls /usr/local/
bin etc games include lib man protobuf qcloud sbin share src# 把 local/protobuf 移过去
lighthouse@VM-8-10-ubuntu:build$ sudo mv /usr/local/protobuf/ /usr/local/protobuf.baklighthouse@VM-8-10-ubuntu:build$ sudo find /usr -name protobuf
/usr/include/google/protobuf
/usr/local/protobuf.bak/include/google/protobuf
此时就可以重新进行 make 了
3.2 客户端类和接口介绍
pplx::task
:并行库异步结果对象- 阻塞方式
get()
:阻塞直到任务执行完成,并获取任务结果 - 非阻塞方式
wait()
:等待任务到达终止状态,然后返回任务状态
namespace etcd {
class Value {bool is_dir(); //判断是否是一个目录std::string const& key() //键值对的 key 值std::string const& as_string()//键值对的 val 值int64_t lease() //用于创建租约的响应中,返回租约 ID
}// etcd 会监控所管理的数据的变化,一旦数据产生变化会通知客户端
// 在通知客户端的时候,会返回改变前的数据和改变后的数据
class Event {enum class EventType {PUT, //键值对新增或数据发生改变DELETE_, //键值对被删除INVALID,};enum EventType event_type()const Value& kv()const Value& prev_kv()
}class Response {bool is_ok()std::string const& error_message()Value const& value()//当前的数值 或者 一个请求的处理结果Value const& prev_value()//之前的数值Value const& value(int index)//std::vector<Event> const& events();//触发的事件
}class KeepAlive {
KeepAlive(Client const& client, int ttl, int64_t lease_id = 0);// 返回租约 IDint64_t Lease();// 停止保活动作void Cancel();
}class Client {// etcd_url: "http://127.0.0.1:2379"Client(std::string const& etcd_url, std::string const& load_balancer = "round_robin");// Put a new key-value pair 新增一个键值对pplx::task<Response> put(std::string const& key, std::string const& value);// 新增带有租约的键值对 (一定时间后,如果没有续租,数据自动删除)pplx::task<Response> put(std::string const& key, std::string const& value, const int64_t leaseId);// 获取一个指定 key 目录下的数据列表pplx::task<Response> ls(std::string const& key);// 创建并获取一个存活 ttl 时间的租约pplx::task<Response> leasegrant(int ttl);// 获取一个租约保活对象,其参数 ttl 表示租约有效时间pplx::task<std::shared_ptr<KeepAlive>> leasekeepalive(int ttl);// 撤销一个指定的租约pplx::task<Response> leaserevoke(int64_t lease_id);//数据锁pplx::task<Response> lock(std::string const& key);
}class Watcher {Watcher(Client const& client, std::string const& key, // 要监控的键值对 key std::function<void(Response)> callback, //发生改变后的回调 bool recursive = false); //是否递归监控目录下的所有数据改变Watcher(std::string const& address, std::string const& key,std::function<void(Response)> callback,bool recursive = false);// 阻塞等待,直到监控任务被停止bool Wait();bool Cancel();
}
- Client 对象:客户端操作句柄对象,提供了 新增、获取数据的接口,并且还提供了 获取保活对象 和 租约 的接口
- KeepAlive 保活对象:一旦被析构就无法保活,此时租约数据失效被删除
- 本身提供一个获取 租约 ID 的接口
- 作用:针对一个租约可以不断进行续租 – 一直维持租约数据的有效性
- Response 对象:针对请求进行的响应
- Value 对象:存储键值对数据的对象
- Watcher 对象:数据变化通知的类
3.3 代码示例
put.cc
文件
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <iostream>
#include <thread>int main(int argc, char* argv[]){std::string etcd_host = "http://127.0.0.1:2379";// 1. 实例化客户端对象etcd::Client client(etcd_host);// 2. 获取租约保活对象 -- 伴随着创建一个指定有效时长的租约auto keep_alive = client.leasekeepalive(3).get(); // 3 sauto lease_id = keep_alive->Lease(); // 获取租约 ID// 3. 向 ected 新增数据 auto resp1 = client.put("/service/user", "127.0.0.1:8080", lease_id).get();if(resp1.is_ok() == false){std::cout << "新增数据失败: "<< resp1.error_message() << std::endl;return 0;}auto resp2 = client.put("/service/friend", "127.0.0.1:9090", lease_id).get();if(resp2.is_ok() == false){std::cout << "新增数据失败: "<< resp2.error_message() << std::endl;return 0;}std::this_thread::sleep_for(std::chrono::seconds(10));return 0;
}
get.cc
文件
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Watcher.hpp>
#include <iostream>
#include <thread>void callback(const etcd::Response &resp){if(resp.is_ok() == false){std::cout << "收到错误的事件通知: " << resp.error_message() << "\n";return; }for(auto const& ev: resp.events()){if(ev.event_type() == etcd::Event::EventType::PUT){std::cout << "--- 服务信息发生改变 ---\n";std::cout << "现值: " << ev.kv().key() << "-" << ev.kv().as_string() << "\n";std::cout << "原值: " << ev.prev_kv().key() << "-" << ev.prev_kv().as_string() << "\n";}else if(ev.event_type() == etcd::Event::EventType::DELETE_){std::cout << "--- 服务信息下线删除 ---\n";std::cout << "现值: " << ev.kv().key() << "-" << ev.kv().as_string() << "\n";std::cout << "原值: " << ev.prev_kv().key() << "-" << ev.prev_kv().as_string() << "\n";}}
}int main(int argc, char* argv[]){std::string etcd_host = "http://127.0.0.1:2379";// 1. 实例化客户端对象etcd::Client client(etcd_host);// 2. 获取指定键值对消息auto resp = client.ls("/service").get();if(resp.is_ok() == false){std::cout << "获取键值对数据失败: " << resp.error_message() << "\n";return -1;}int sz = resp.keys().size();for(int i = 0; i < sz; ++i){std::cout << resp.value(i).as_string() << "可以提供" << resp.key(i) << "服务\n";}// 3. 实例化一个键值对事件监控对象auto watcher = etcd::Watcher(client, "/service", callback, true);watcher.Wait();return 0;
}
Makefile 文件
all: put get
put: put.ccg++ -o $@ $^ -std=c++17 -letcd-cpp-api -lcpprest
get: get.ccg++ -o $@ $^ -std=c++17 -letcd-cpp-api -lcpprest
.PHONY: clean
clean: rm -f put get
如果我们运行的时候,出现如下报错:
lighthouse@VM-8-10-ubuntu:code$ ./put
./put: error while loading shared libraries: libetcd-cpp-api.so: cannot open shared object file: No such file or directory# 但是这个动态库是存在的
lighthouse@VM-8-10-ubuntu:~$ ls /usr/local/lib
cmake libetcd-cpp-api.so libredis++.a libredis++.so libredis++.so.1 libredis++.so.1.3.15 # 解决办法 -- 更新动态库缓存
sudo ldconfig # 扫描 /etc/ld.so.conf 和 /etc/ld.so.conf.d/ 下的配置
运行结果
lighthouse@VM-8-10-ubuntu:code$ ./putlighthouse@VM-8-10-ubuntu:code$ ./get
127.0.0.1:9090可以提供/service/friend服务
127.0.0.1:8080可以提供/service/user服务
--- 服务信息下线删除 ---
现值: /service/friend-
原值: /service/friend-127.0.0.1:9090
--- 服务信息下线删除 ---
现值: /service/user-
原值: /service/user-127.0.0.1:8080
以上的内容只是一个非常简单的样例,在真正的服务发现实现中,服务发现端,通常需要维护一张服务信息表,根据监控到的服务上线下线事件,对表中数据进行操作。
4. 封装服务发现与注册功能
在服务的注册与发现中,主要基于 etcd 所提供的可以设置有效时间的键值对存储来实现,而不是专门用于 作为注册中心 进行服务注册 和 发现
二次封装:封装 etcd-client-api
,实现两种类型客户端
- 服务注册客户端:向服务器新增服务信息数据,并进行保活
- 服务发现客户端:从服务器查找服务信息数据,并进行改变事件监控
4.1 服务注册
主要是在 etcd 服务器上存储一个租期 ns 的保活键值对,表示所能提供指定服务的节点主机,比如 /service/user/instance-1
的 key,且对应的 val 为提供服务的主机节点地址:
<key, val>--</service/user/instance-1,127.0.0.1:9000>
/service
是主目录,其下会有不同服务的键值对存储/user
是服务名称,表示该键值对是一个用户服务的节点/instance-1
是节点实例名称,提供用户服务可能会有很多节点,每个节点都应该有自己独立且唯一的实例名称
当这个键值对注册之后,服务发现方可以基于目录进行键值对的发现。且一旦注册节点退出,保活失败,则 3s 后租约失效,键值对被删除,etcd 会通知发现方数据的失效,进而实现服务下线通知的功能。
4.2 服务发现
服务发现分为两个过程:
- 刚启动客户端的时候,进行Is目录浏览,进行/service 路径下所有键值对的获取
- 对关心的服务进行 watcher 观测,一旦数值发生变化(新增/删除),收到通知进行节点的管理
如果 ls 的路径为/service,则会获取到 /service/user
,/service/firend
, … 等其路径下的所有能够提供服务的实例节点数据。
客户端可以将发现的所有 <实例-地址> 管理起来,以便于进行节点的管理:
- 收到新增数据通知,则向本地管理添加新增的节点地址 —— 服务上线
- 收到删除数据通知,则从本地管理删除对应的节点地址 —— 服务下线
因为管理了所有的能够提供服务的节点主机的地址,因此当需要进行 rpc 调用的时候则根据服务名称,获取一个能够提供服务的主机节点地址进行访问就可以了,而这里的获取策略,我们采用 RR 轮转策略
4.3 封装思想
将 etcd 的操作全部封装起来,也不需要管理数据,只需要向外四个基础操作接口:
- 封装服务注册客户端类:
- 提供一个接口:向服务器新增数据并进行保活
- 参数:注册中心地址(etcd服务器地址),新增的服务信息(服务名-主机地址键值对)
- 封装服务发现客户端类:
- 提供两个设置回调函数的接口
- 服务上线事件接口(数据新增),
- 服务下线事件接口(数据删除)提供一个设置
这样封装之后,外部的rpc 调用模块,可以先获取所有的当前服务信息,建立通信连接进行rpc调用,也能在有新服务上线的时候新增连接,以及下线的时候移除连接。
代码
测试
registry.cc
#include "etcd.hpp"
#include <gflags/gflags.h>
#include <thread>DEFINE_bool(run_mode, false, "程序运行模式: false-调试; true 发布");
DEFINE_string(log_file, "", "发布模式下指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下指定日志的输出等级");DEFINE_string(etcd_host, "http://127.0.0.1:2379", "服务注册中心地址");
DEFINE_string(base_service, "/service", "服务监控根目录");
DEFINE_string(instance_name, "/usr/instance", "当前实例名称");
DEFINE_string(access_host, "127.0.0.1:8080", "当前实例的外部访问地址");int main(int argc, char* argv[]){google::ParseCommandLineFlags(&argc, &argv, true);init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);Registry::ptr rclient = std::make_shared<Registry>(FLAGS_etcd_host);rclient->registry(FLAGS_base_service + FLAGS_instance_name, FLAGS_access_host);std::this_thread::sleep_for(std::chrono::seconds(600));return 0;
}
discovery.cc
#include "etcd.hpp"
#include <gflags/gflags.h>
#include <thread>DEFINE_bool(run_mode, false, "程序运行模式: false-调试; true 发布");
DEFINE_string(log_file, "", "发布模式下指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下指定日志的输出等级");DEFINE_string(etcd_host, "http://127.0.0.1:2379", "服务注册中心地址");
DEFINE_string(base_service, "/service", "服务监控根目录");void online(const std::string &service_name, const std::string &service_host){LOG_DEBUG("服务上线: {}-{}", service_name, service_host);
}
void offline(const std::string &service_name, const std::string &service_host){LOG_DEBUG("服务下线: {}-{}", service_name, service_host);
}int main(int argc, char* argv[]){google::ParseCommandLineFlags(&argc, &argv, true);init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);Discovery::ptr rclient = std::make_shared<Discovery>(FLAGS_etcd_host, FLAGS_base_service, online, offline);std::this_thread::sleep_for(std::chrono::seconds(600));return 0;
}
Makefile 文件
all: dis reg
dis: discovery.ccg++ -o $@ $^ -std=c++17 -letcd-cpp-api -lcpprest -lgflags -lfmt
reg: registry.ccg++ -o $@ $^ -std=c++17 -letcd-cpp-api -lcpprest -lgflags -lfmt
.PHONY: clean
clean: rm -f dis reg
运行结果
# 先启动服务发现
lighthouse@VM-8-10-ubuntu:test$ ./dis
[default-logger][23:13:03][2547586][debug ][discovery.cc:13] 服务上线: /service/usr/instance-127.0.0.1:8080
[default-logger][23:13:07][2547630][debug ][etcd.hpp:141] 新增服务:/service/usr/instance-127.0.0.1:8080
[default-logger][23:13:17][2547630][debug ][discovery.cc:16] 服务下线: /service/usr/instance-127.0.0.1:8080# 启动服务注册
lighthouse@VM-8-10-ubuntu:test$ ./reg