前言
介绍
基于c++的分布式网络框架,项目基于muduo高性能网络库+Protobuf开发,实现的主要功能是通过zookeeper实现服务注册以及发现,muduo网络库以及Protobuf实现远程RPC调用,异步日志。
分布式网络通信整个流程框架如下:
使用到的技术栈
集群和分布式概念以及原理
RPC远程过程调用原理以及实现
Protobuf数据序列化和反序列化协议
ZooKeeper分布式一致性协调服务应用以及编程
muduo网络库编程
conf配置文件读取
异步日志
CMake构建项目集成编译环境
一、集群与分布式
集群:每一台服务器独立运行一个工程的所有模块。缺点是:虽然用户的并发量得到了增加但是,有一个api,函数的修改整个项目代码都需要进行重新的编译并且需要进行多次部署。
分布式:一个工程拆分了很多模块,每一个模块独立部署运行在一个服务器主机上,所有服务器协同工作共同提供服务,每一台服务器称作分布式的一个节点,根据节点的并发要求,对一个节点可以再做节点模块集群部署。比如说一些模块使用量少,并发量少,只需要部署到一台服务器就行了。
二、RPC通讯原理
RPC是Remote Procedure Call(远程过程调用)的缩写,该技术可以让一台服务器上的服务通过网络调用另一台服务器上的服务,简单来说就是让不同网络节点上的服务相互调用。.因此 RPC框架会封装网络调用的细节,让调用远程服务看起来像调用本地服务一样简单。由于微服务架构的兴起,RPC的概念得到广泛应用,在消息队列、分布式缓存、分布式数据库等多个领域都有用到。可以将RPC理解为连接两个城市的高速公路,让车辆能够在城市之间自由通行。由于 RPC屏蔽了远程调用和本地调用的区别,因此程序开发者无须过多关注网络通信,可以把更多精力放到业务逻辑的开发上。
rpc工作原理如下:
1、客户端调用本地代理:客户端通过调用本地代理来访问服务器上的方法。这个代理通常是在客户端和服务器之间建立起来的一个桥梁,可以将远程方法调用转化成本地方法调用。
2、代理封装请求:本地代理把对远程对象的调用封装成一种标准格式的消息,然后将消息发送给服务端。
3、消息传输到服务端:消息通过网络传输到服务端。
4、服务端解析请求:服务端接收到消息后,需要进行解析,找到对应的远程方法,并处理请求得到结果。
5、服务端封装响应:服务端将处理的结果打包成一个返回消息,同样按照标准格式发送到客户端。
6、消息传回客户端:消息通过网络返回给本地代理。
7、代理解析响应:本地代理接收响应消息后解析,返回给调用者。
总结:rpc的工作逻辑就是客户端调用代理时,代理封装请求并发送给服务端,服务端处理完成后将结果发送给代理,代理解析响应后返回给调用者,让调用者看上去好像是直接调用了本地方法一样
rpc 协议栈
rpc 协议需要使用以下协议层:
应用层:由 rpc 库或框架提供,支持编程语言各自的数据类型和序列化、反序列化实现
传输层:通常基于 tcp 或 udp 实现,以提供可靠的数据传输
网络层:负责在不同的网络之间传输数据,通常使用 ip 协议
数据链路层和物理层:实现寻址、帧传输和物理接口等功能
工作原理图:
黄色部分:设计rpc方法参数的打包和解析,也就是数据的序列化和反序列化,使用Protobuf。
绿色部分:网络部分,包括寻找rpc服务主机,发起rpc调用请求和响应rpc调用结果。
rpc的主要优势:它可以让不同编程语言的应用程序之间进行通信,只需要客户端和服务器都支持相同的协议即可。另外,无论使用哪种语言,开发人员都可以将远程方法调用看
需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享
三、protobuf的编写
1、protobuf简介
protobuf(protocol buffer)是google 的一种数据交换的格式,它独立于平台语言。
google 提供了protobuf多种语言的实现:java、c#、c++、go 和 python,每一种实现都包含了相应语言的编译器以及库文件。
由于它是一种二进制的格式,比使用 xml(20倍) 、json(10倍)进行数据交换快许多。可以把它用于分布式应用之间的数据通信或者异构环境下的数据交换。作为一种效率和兼容性都很优秀的二进制数据传输格式,可以用于诸如网络传输、配置文件、数据存储等诸多领域。
2、protobuf格式介绍
首先是声明protobuf的版本,以及代码所在的包(对于C++来说是namespace),如果需要生成服务类以及rpc方法描述就需要配置cc_generic_services。
syntax = "proto3"; // 声明了protobuf的版本 package fixbug; // 声明了代码所在的包(对于C++来说是namespace) // 定义下面的选项,表示生成service服务类和rpc方法描述, option cc_generic_services = true;
定义数据,可以定义嵌套类型,枚举以及数组等等类型
message ResultCode { int32 errcode = 1; bytes errmsg = 2; } // 数据 列表 映射表 // 定义登录请求消息类型 name pwd message LoginRequest { bytes name = 1; bytes pwd = 2; } // 定义登录响应消息类型 message LoginResponse { ResultCode result = 1; bool success = 2; } message GetFriendListsRequest { uint32 userid = 1; } message User { bytes name = 1; uint32 age = 2; // 枚举 enum Sex { MAN = 0; WOMAN = 1; } Sex sex = 3; } message GetFriendListsResponse { ResultCode result = 1; repeated User friend_list = 2; // 定义了一个列表类型 }
定义描述rpc方法的类型 :
service UserServiceRpc { rpc Login(LoginRequest) returns(LoginResponse); rpc GetFriendLists(GetFriendListsRequest) returns(GetFriendListsResponse); }
生成对应的c++文件命令
protoc test.proto --cpp_out=.
3、protobuf的序列化与反序列化例子
首先要导入生成的头文件以及使用他的作用域
#include "test.pb.h" using namespace fixbug;
序列化的时候对于嵌套类型 应该先要获取其对象的常量引用 :
GetFriendListsResponse rsp; ResultCode* rc = rsp.mutable_result(); rc->set_errcode(1); rc->set_errmsg("asdasdasd"); User* user1 = rsp.add_friend_list(); user1->set_name("zhang san"); user1->set_age(20); user1->set_sex(User::MAN); User* user2 = rsp.add_friend_list(); user2->set_name("li si"); user2->set_age(22); user2->set_sex(User::MAN); // 序列化 string send_str; if (rsp.SerializeToString(&send_str)) { cout << send_str << endl; }
反序列化
GetFriendListsResponse rsp2; // 获取反序列化的 if (rsp2.ParseFromString(send_str)) { const ResultCode& result = rsp2.result(); // 获取 ResultCode 对象 int32_t errcode = result.errcode(); // 获取 errcode 字段值 string errmsg = result.errmsg(); // 获取 errmsg 字段值 cout << errmsg << endl; } for (int i = 0; i < rsp2.friend_list_size(); ++i) { const User& user = rsp2.friend_list(i); std::string name = user.name(); uint32_t age = user.age(); User_Sex sex = user.sex(); cout << name << ":" << age << endl; }
4、头格式protobuf的编写
因为调用者与被调用者都需要知道服务名以及方法名字。所以用一个protobuf作为字节流传输的头部信息。这其中包含了服务名字,方法名字以及参数长度,三个数据。
message RpcHeader{ bytes service_name =1; bytes method_name =2; uint32 args_size=3; }
四、RPC通信框架设计
框架主要功能读取config配置文件数据。数据敦促在m_config这个类中。
MprpcConfig MprpcApplication::m_config; void MprpcApplication::init(int argc, char** argv) { if (argc < 2) { cout << "format command -i" << endl; exit(EXIT_FAILURE); } // 检查格式对不对 int c = 0; string config_file; while ((c = getopt(argc, argv, "i:")) != -1) { switch (c) { case 'i': config_file = optarg; cout << "///get config file:" << config_file << endl; break; case '?': cout << "format command -i" << endl; exit(EXIT_FAILURE); case ':': cout << "format command -i" << endl; exit(EXIT_FAILURE); default: break; } } // 读取配置文件信息 ip port zookeeper——ip zookeeper——port // 存储在这个m_config成员变量中 m_config.LoadConfigFile(config_file.c_str()); } MprpcApplication& MprpcApplication::GetInstance() { static MprpcApplication app; return app; } MprpcConfig& MprpcApplication::GetConfig() { return m_config; }
1、配置文件读取类
首先检测配置文件是否存在,然后逐行读取,最后存储在一个k-v字典中。该类提供读取key以及加载配置文件的功能。Trim是一个去掉字符串前后的空格的函数。
class MprpcConfig { private: unordered_mapm_configMap; public: void LoadConfigFile(const char* confi_file); string Load(const string& key); void Trim(string& src_buf); };
void MprpcConfig::LoadConfigFile(const char* config_file) { LOG_INFO("=====================Load Config File !!====================="); FILE* pf = fopen(config_file, "r"); if (!pf) { LOG_ERR("%s is not exist!!", config_file); LOG_ERR("%s:%s:%d", __FILE__, __FUNCTION__, __LINE__); cout << config_file << "is not exist!!" << endl; } // 每一行进行 读取 // 1.注释 2.正确的配置项 = 3.去掉开头的多余的空格 while (!feof(pf)) { char buf[512] = {0}; fgets(buf, 512, pf); // 去掉字符串前面多余的空格 std::string read_buf(buf); Trim(read_buf); // 判断#的注释 if (read_buf[0] == '#' || read_buf.empty()) { continue; } // 解析配置项 int idx = read_buf.find('='); if (idx == -1) { // 配置项不合法 continue; } std::string key; std::string value; key = read_buf.substr(0, idx); Trim(key); // rpcserverip=127.0.0.1\n int endidx = read_buf.find('\n', idx); value = read_buf.substr(idx + 1, endidx - idx - 1); Trim(value); cout << "get kv==> " << key << ":" << value << endl; LOG_INFO("get kv==> %s:%s", key.c_str(), value.c_str()); m_configMap.insert({key, value}); } fclose(pf); } // 查询配置项信息 std::string MprpcConfig::Load(const std::string& key) { auto it = m_configMap.find(key); if (it == m_configMap.end()) { return ""; } return it->second; }
五、分布式协调
1、zookeeper简介
zookeeper 是一个开源的分布式协调服务框架,由雅虎公司开发并贡献给 apache 基金会,它为分布式应用程序提供了高可用、高性能且一致的工作空间。
zookeeper 可以处理众多的分布式系统问题,例如:
充当命名服务:通过在zookeeper中注册一个节点并将它命名,其他进程就可以使用该名称来找到该节点。
分布式配置管理:应用程序可以使用 zookeeper 来管理其配置信息。因此,在更改配置时,无需重新启动整个应用程序。
处理分布式锁:在分布式环境中,许多进程需要对相同资源进行更新操作。使用 zookeeper,可以实现分布式锁定功能,确保这些更新互斥执行。
分布式队列:zookeeper 还支持分布式队列,并提供了一种轻量级的方法来实现“先进先出”和“后进先出”(lifo)队列等各种队列类型。
2、zookeeper工具类定义
目的就是由服务名以及方法名组成的节点中存储ip以及端口,并且是临时性节点pc节点超时未发送心跳消息Zk会自动删除临时性节点,服务发布者根绝服务名以及方法名形成节点存储开启该服务的ip以及端口,服务调用者就根据这个这个节点找到对饮的ip地址以及端口调用对应的服务以及方法。所以这个类必须包含有启动函数,构造函数,以及创建节点和返回节点数据函数。
class ZkClient { public: ZkClient(); ~ZkClient(); void Start(); void Create(const char* path, const char* data, int datalen, int state = 0); std ::string GetData(const char* path); private: // zk的客户端句柄 zhandle_t* m_zhandle; };
3、zookeeper工具类的实现
1、启动函数
首先根据应用程序的配置文件中的“zookeeperip”和“zookeeperport”加载zookeeper服务器的ip地址和端口号,然后将它们组合成zookeeper连接字符串。
接下来,代码调用zookeeper_init函数来初始化zookeeper客户端。
初始化一个名为sem的信号量,并通过zoo_set_context函数将它与m_zhandle关联起来。然后,代码调用sem_wait函数等待zk server的响应。在等待期间,当前线程会被挂起,当zk client连接到zk server并建立成功时,会触发global_watcher回调函数,在回调函数中会调用sem_post函数,使代码继续执行。
void global_watcher(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx) { if (type == ZOO_SESSION_EVENT) // 回调的消息类型是和会话相关的消息类型 { if (state == ZOO_CONNECTED_STATE) // zkclient和zkserver连接成功 { sem_t* sem = (sem_t*)zoo_get_context(zh); sem_post(sem); } } } // 连接zkserver void ZkClient::Start() { std::string host = MprpcApplication::GetInstance().GetConfig().Load("zookeeperip"); std::string port = MprpcApplication::GetInstance().GetConfig().Load("zookeeperport"); std::string connstr = host + ":" + port; /* zookeeper_mt:多线程版本 zookeeper的API客户端程序提供了三个线程 API调用线程 网络I/O线程 pthread_create poll watcher回调线程 pthread_create */ m_zhandle = zookeeper_init(connstr.c_str(), global_watcher, 30000, nullptr, nullptr, 0); if (nullptr == m_zhandle) { std::cout << "zookeeper_init error!" << std::endl; LOG_ERR("zookeeper_init error! "); exit(EXIT_FAILURE); } sem_t sem; sem_init(&sem, 0, 0); zoo_set_context(m_zhandle, &sem); sem_wait(&sem); // 等 zk server响应的时候 结束等待 std::cout << "zookeeper_init success!" << std::endl; LOG_INFO("zookeeper_init success!"); }
其中global_watcher函数:全局的观察器回调函数,用于处理zkserver给zkclient的通知。
当收到与会话相关的消息(即ZOO_SESSION_EVENT)时,如果状态变为连接状态(ZOO_CONNECTED_STATE),则解除对初始化信号量(semaphore)的等待,完成初始化。
2、创建节点
创建一个指定路径的节点,如果该节点已经存在则不创建。代码首先使用zoo_exists函数判断指定路径对应的节点是否存在(如果不存在会返回znonode错误码),如果不存在则调用zoo_create函数创建一个新的节点。
void ZkClient::Create(const char* path, const char* data, int datalen, int state) { char path_buffer[128]; int bufferlen = sizeof(path_buffer); int flag; // 先判断path表示的znode节点是否存在,如果存在,就不再重复创建了 flag = zoo_exists(m_zhandle, path, 0, nullptr); if (ZNONODE == flag) // 表示path的znode节点不存在 { // 创建指定path的znode节点了 flag = zoo_create(m_zhandle, path, data, datalen, &ZOO_OPEN_ACL_UNSAFE, state, path_buffer, bufferlen); if (flag == ZOK) { std::cout << "znode create success... path:" << path << std::endl; LOG_INFO("znode create success... path:%s", path); } else { std::cout << "flag:" << flag << std::endl; std::cout << "znode create error... path:" << path << std::endl; LOG_ERR("znode create error .. path:%s", path); exit(EXIT_FAILURE); } } }
3、返回数据
函数用于获取指定路径对应节点的值。代码通过调用zoo_get函数来实现此功能,并返回节点的值作为字符串。如果获取数据失败(例如节点不存在),则输出错误消息并返回一个空字符串。
std::string ZkClient::GetData(const char* path) { char buffer[64]; int bufferlen = sizeof(buffer); int flag = zoo_get(m_zhandle, path, 0, buffer, &bufferlen, nullptr); if (flag != ZOK) { std::cout << "get znode error... path:" << path << std::endl; LOG_ERR("get znode error .. path:%s", path); return ""; } else { return buffer; } }
4、服务发布
在服务注册的时候,首先定义zookeeper工具类,启动zookeeper服务,然后循环遍历所有的服务以及服务下面的方法,将他们的名字构造层节点路劲,然后这个节点存储的是由ip以及port组成的字符串,这样调用方就可以从zookeeper中根据服务名以及节点名获取到提供该服务的ip以及端口号。
注意:节点必须是临时性的,因为服务挂掉的时候节点会自动去掉 ,为了把当前rpc节点上要发布的服务全部注册到zk上面,并且zkclient 会保留与zkserver的api连接通过zkclient 网络I/O线程 以1/3 * timeout 时间发送心跳ping消息给zkserver以保持维护节点的存在
ZkClient zkCli; zkCli.Start(); // service_name为永久性节点 method_name为临时性节点 for (auto& sp : m_serviceMap) { // /service_name /UserServiceRpc 只能一层一层节点创建 std::string service_path = "/" + sp.first; zkCli.Create(service_path.c_str(), nullptr, 0); for (auto& mp : sp.second.m_methodMap) { // /service_name/method_name /UserServiceRpc/Login // 存储当前这个rpc服务节点主机的ip和port std::string method_path = service_path + "/" + mp.first; char method_path_data[128] = {0}; sprintf(method_path_data, "%s:%d", ip.c_str(), port); // ZOO_EPHEMERAL表示znode是一个临时性节点 zkCli.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL); } }
5、服务发现
调用方需要寻找某项服务的ip以及端口,首先构造路劲,然后调用获取数据犯法获取该节点的ip以及端口字符串,最后解析获取该字符串。
// 从zookeeper 获取ip ZkClient zkCli; zkCli.Start(); // /UserServiceRpc/Login std::string method_path = "/" + service_name + "/" + method_name; // 127.0.0.1:8000 std::string host_data = zkCli.GetData(method_path.c_str()); if (host_data == "") { controller->SetFailed(method_path + " is not exist!"); return; } int idx = host_data.find(":"); if (idx == -1) { controller->SetFailed(method_path + " address is invalid!"); return; } std::string ip = host_data.substr(0, idx); uint16_t port = atoi(host_data.substr(idx + 1, host_data.size() - idx).c_str());
六、异步日志系统
1、为什么需要异步记录日志
因为基于muduo网络库进行网络通讯的,muduo通过多线程来处理并发连接,要添加日志模块那么就会有多个线程写日志信息的情况。这样的话就必须要实现一个保证线程安全的日志队列。时需要启动一个日志线程,专门对日志队列写日志。
2、保证线程安全的日志队列类
模板类 lockqueue,它用于实现异步写日志的日志队列,主要包含 push 和 pop 两个方法。其中,push 方法可以被多个 worker 线程调用以将数据添加到日志队列中,而 pop 方法则只能由一个线程读取队列并将其内容写入日志文件。
具体来说,push 方法首先加锁(使用了 std::lock_guardstd::mutex),然后将数据添加到队列中,最后通过条件变量(std::condition_variable)唤醒 pop 方法所在的线程。pop 方法首先也需要加锁(使用了 std::unique_lockstd::mutex),然后进入一个 while 循环,在循环中检查队列是否为空,如果为空,则调用条件变量的 wait 方法使当前线程阻塞等待。当队列不为空时,将队头元素取出,并从队列中删除。最后释放锁并返回取出的队头元素。
通过这种方式实现日志队列的异步操作,可以让写日志的线程和写文件的线程分别跑在不同的线程中,避免了日志写操作对主程序的性能影响。
templateclass LockQueue { public: // 多个worker线程都会写日志queue void Push(const T& data) { std::lock_guardlock(m_mutex); m_queue.push(data); m_condvariable.notify_one(); } // 一个线程读日志queue,写日志文件 T Pop() { std::unique_locklock(m_mutex); while (m_queue.empty()) { // 日志队列为空,线程进入wait状态 m_condvariable.wait(lock); } T data = m_queue.front(); m_queue.pop(); return data; } private: std::queuem_queue; std::mutex m_mutex; std::condition_variable m_condvariable; };
3、日志类
首先日志类属于是单例模式,确保了整个应用程序中只有一个logger实例。
多线程写日志:在logger的构造函数中,启动了一个带lambda表达式的线程writelogtask,在该线程的主体循环中执行以下操作:
1、获取当前时间,尝试打开当日的日志文件
2、从lockqueue中获取缓存的日志信息
3、根据日志级别,添加前缀(“info"或"error”),并将该条日志写入日志文件中
该线程会一直运行,为整个应用程序提供日志服务,同时除了在构造函数中设置日志级别和启动写日志线程之外,logger还提供了以下两个操作接口:
setloglevel(loglevel level): 设置日志级别
log(std::string msg): 将msg作为一条日志信息写入lockqueue缓冲区。
// 获取日志的单例 Logger& Logger::GetInstance() { static Logger logger; return logger; } Logger::Logger() { // 启动专门的写日志线程 std::thread writeLogTask([&]() { for (;;) { // 获取当前的日期,然后取日志信息,写入相应的日志文件当中 a+ time_t now = time(nullptr); tm* nowtm = localtime(&now); char file_name[128]; sprintf(file_name, "%d-%d-%d-log.txt", nowtm->tm_year + 1900, nowtm->tm_mon + 1, nowtm->tm_mday); FILE* pf = fopen(file_name, "a+"); if (pf == nullptr) { std::cout << "logger file : " << file_name << " open error!" << std::endl; exit(EXIT_FAILURE); } std::string msg = m_lckQue.Pop(); char time_buf[128] = {0}; // std::cout << (m_loglevel == INFO) << std::endl; sprintf(time_buf, "%d:%d:%d =>[%s] ", nowtm->tm_hour, nowtm->tm_min, nowtm->tm_sec, (m_loglevel == INFO ? "info" : "error")); msg.insert(0, time_buf); msg.append("\n"); fputs(msg.c_str(), pf); fclose(pf); } }); // 设置分离线程,守护线程 writeLogTask.detach(); } // 设置日志级别 void Logger::SetLogLevel(LogLevel level) { m_loglevel = level; } // 写日志, 把日志信息写入lockqueue缓冲区当中 void Logger::Log(std::string msg) { m_lckQue.Push(msg); }
4、定义宏记录日志
宏log_info,它接受一个格式化的日志消息和可变数量的参数。在宏内部,获取logger的实例,然后设置日志级别为info。接下来,它创建一个长度为1024的char数组c,使用snprintf函数将格式化字符串(logmsgformat) 和可变参数(va_args)写入这个数组中。最后,它调用logger的log函数将日志消息写入日志文件中。
do-while(0)语法是为了防止宏展开时出现错误。在实际使用过程中,log_info(“xxx %d %s”, 20, “xxxx”) 可以被展开为如下代码:
logger& logger = logger::getinstance(); logger.setloglevel(info); char c[1024] = {0}; snprintf(c, 1024, "xxx %d %s", 20, "xxxx"); logger.log(c);
因此正常信息宏以及错误信息宏的定义如下:
// 定义宏 LOG_INFO("xxx %d %s", 20, "xxxx") #define LOG_INFO(logmsgformat, ...) \ do { \ Logger& logger = Logger::GetInstance(); \ logger.SetLogLevel(INFO); \ char c[1024] = {0}; \ snprintf(c, 1024, logmsgformat, ##__VA_ARGS__); \ logger.Log(c); \ } while (0) #define LOG_ERR(logmsgformat, ...) \ do { \ Logger& logger = Logger::GetInstance(); \ logger.SetLogLevel(ERROR); \ char c[1024] = {0}; \ snprintf(c, 1024, logmsgformat, ##__VA_ARGS__); \ logger.Log(c); \ } while (0)
七、服务提供者
1、服务提供者整体框架
作为服务提供者,必须进行服务注册,并且将注册好的服务名以及方法名字存储起来,用一个字典存储所有的服务,一个服务包含多个方法。所以用一个结构体存储服务,里面包含有服务以及映射方法的字典。其中由于我们使用了protobuf作为rpc通信协议,所以服务以及方法都必须是**google::protobuf: