Reactor 反应堆模式
代码整体围绕 “Reactor 模式” 实现一个网络服务器(以计算器为例),各模块按职责可分为基础工具层、网络通信层、事件驱动层、业务逻辑层四类,具体作用如下:
Listener实例化 → 创建TcpSocket(监听套接字) → 注册到Reactor的Epoller(监控新连接) → (新连接到来时) 创建Channel实例(封装客户端连接) → 注册到Reactor的Epoller(监控客户端I/O) → (客户端发数据) Reactor分发事件到Channel → Channel读取/处理/响应数据
Listener和Channel都继承于connection, Listener
负责监听和接受新连接,Channel
负责已建立连接的数据读写,在listener监听到有新连接到来时创建Channel,将业务cal绑定到protocol的回调事件上,用智能指针共有该匿名函数回调处理,并将fd给Rwactor,Rwactor然后将事件派发给map中事件就绪的channel I/O处理
项目总览
Channel.hpp
#pragma once#include <iostream>
#include <string>
#include "Connection.hpp"
#include "Common.hpp"
#include "Log.hpp"
#include "InetAddr.hpp"
#include <sys/types.h>
#include <sys/socket.h>
#include<functional>
#include<memory>using namespace LogModule;#define SIZE 1024// 普通sockfd的封装
class Channel : public Connection
{
public:Channel(int sockfd, const InetAddr &client): _sockfd(sockfd), _client_addr(client){SetNonBlock(sockfd);}~Channel() {}int GetSockFd() override{return _sockfd;}public:void Recver() override{ //1.while保证本轮数据读完,channel只解决读问题//2.保证完整报文,解决粘包问题 ---反序列化,引入协议// LOG(Loglevel::DEBUG) << "事件被派发到了channel模块";// 读到是字符串char buffer[SIZE];while (true){buffer[0] = 0;//清空字符串ssize_t n = recv(_sockfd, &buffer, sizeof(buffer)-1, 0);if( n > 0){//读成功buffer[n]= 0;_inbuffer+=buffer;//入队列}else if (n == 0){ // 异常Excepter();return;}else{//可能本轮读完了if(errno ==EAGAIN || errno == EWOULDBLOCK){break;}else if(errno == EINTR)//被信号中断{continue;}else{ //真的报错了Excepter();return;}}}LOG(Loglevel::DEBUG)<<"Channel :inbuffer:"<<_inbuffer;if(!_inbuffer.empty()){_outbuffer+=_handler(_inbuffer);}if (!_outbuffer.empty()){Sender(); // 最佳实践//GetOwner()->EnableReadWrite(_sockfd, true, true);}}void Sender() override{while (true){ssize_t n = send(_sockfd, _outbuffer.c_str(), _outbuffer.size(), 0);if (n > 0){_outbuffer.erase(0, n);if (_outbuffer.empty())break;}else if (n == 0){break;}else{if (errno == EAGAIN || errno == EWOULDBLOCK)break;if (errno == EINTR)continue;else{Excepter();return;}}}
}void Excepter() override{}std::string & Inbuffer() {return _inbuffer;}void AppendOutBuffer(const std::string &out) {_outbuffer+=out;}
private:int _sockfd;std::string _inbuffer; // 缓冲区std::string _outbuffer;// client infoInetAddr _client_addr;//handler_t _handler;
};
Common.hpp
#pragma once#include <iostream>
#include<functional>
#include <string>
#include <cstring>
#include <memory>
#include<unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include<fcntl.h>
#include"Log.hpp"
using namespace LogModule;
enum ExitCode
{OK = 0,USAGE_ERR,SOCKET_ERR,BIND_ERR,LISTEN_ERR,CONNECT_ERR,FORK_ERR,OPEN_ERR,EPOLL_CREATE_ERR,EPOLL_CTL_ERR
};class NoCopy
{public:
NoCopy(){}
~NoCopy(){}
NoCopy(const NoCopy&)=delete;
const NoCopy& operator= (const NoCopy &)=delete;
};void SetNonBlock(int fd)
{int fl = fcntl(fd,F_GETFL);if(fl < 0){ LOG(Loglevel::DEBUG)<<"设置非阻塞失败";return;}fcntl(fd, F_SETFL,fl | O_NONBLOCK);LOG(Loglevel::DEBUG)<<"设置非阻塞成功";
}int defaultport = 8080;
#define CONV(addr) ((struct sockaddr *)&addr)
Connection.hpp
#pragma once
#include <iostream>
#include <string>
#include "InetAddr.hpp"
class Reactor;
class Connection;
using handler_t = std::function<std::string(std::string&)>;// 封装fd, 保证给每一个fd一套缓冲
class Connection
{
public:Connection(): _events(0), _owner(nullptr){}void RegisterHandler(handler_t handler) {_handler = handler; }virtual void Recver() = 0;virtual void Sender() = 0;virtual void Excepter() = 0;virtual int GetSockFd() = 0;void SetEvent(uint32_t events){_events = events;}~Connection() {}uint32_t GetEvent(){return _events;}void SetOwner(Reactor *owner){_owner = owner;}Reactor *GetOwner(){return _owner;}private:// 关心事件uint32_t _events;// 回指指针Reactor *_owner;public:handler_t _handler;
};
Epoller.hpp
#pragma once#include <iostream>
#include <unistd.h>
#include <sys/epoll.h>
#include "Common.hpp"
#include "Log.hpp"using namespace LogModule;
class Epoller
{public:Epoller():_epfd(-1){_epfd = epoll_create(128);if(_epfd < 0 ){LOG(Loglevel::FATAL) << "epoll_create error!";exit(EPOLL_CREATE_ERR);}LOG(Loglevel::INIF) << "create epoll success!";}void AddEvent(int sockfd,uint32_t events){struct epoll_event ev;ev.events=events;ev.data.fd=sockfd;int n = epoll_ctl(_epfd,EPOLL_CTL_ADD,sockfd,&ev);if( n < 0 ){LOG(Loglevel::ERROR)<<"epoll_ctl error!";return;}LOG(Loglevel::INIF)<<"epoll_ctl success! epds:" << _epfd;}void DelEvent(){}void ModEvent(){}int WaitEvents(struct epoll_event revs[],int maxnum,int timeout){int n = epoll_wait(_epfd,revs,maxnum,timeout);if(n < 0){LOG(Loglevel::WARNING)<<"epoll_wait error";}else if(n ==0){}return n;}~Epoller(){if(_epfd >= 0){close(_epfd);}}private:int _epfd;};
InetAddr.hpp
#pragma once
#include "Common.hpp"class InetAddr
{public:InetAddr(){}InetAddr(struct sockaddr_in &addr){ SetAddr(addr);}InetAddr(const std::string &ip,uint16_t port):_ip(ip),_port(port){//主机转网络memset(&_addr,0,sizeof(_addr));_addr.sin_family=AF_INET;inet_pton(AF_INET,_ip.c_str(),&_addr.sin_addr);_addr.sin_port=htons(_port);}InetAddr(uint16_t port):_port(port),_ip("0"){ //端口转memset(&_addr,0,sizeof(_addr));_addr.sin_family=AF_INET;_addr.sin_addr.s_addr=INADDR_ANY;_addr.sin_port = htons(_port);}uint16_t Port() { return _port; }std::string Ip() { return _ip; }void SetAddr(struct sockaddr_in &addr) { //网络转主机_addr=addr;_port = ntohs(addr.sin_port);//_ip = inet_ntoa(addr.sin_addr);char ipbuffer[64];inet_ntop(AF_INET,&_addr.sin_addr,ipbuffer,sizeof(_addr));_ip=ipbuffer;}const struct sockaddr_in &NetAddr() { return _addr; }const struct sockaddr *NetAddrPtr() { return CONV(_addr); }socklen_t NetAddrLen() { return sizeof(_addr); }bool operator==(const InetAddr &addr){return addr._ip == _ip && addr._port == _port;}std::string StringAddr(){return _ip + ":" + std::to_string(_port);}~InetAddr() {}
private:struct sockaddr_in _addr;std::string _ip;uint16_t _port;
};
Listener.hpp
#pragma once
#include <iostream>
#include <memory>
#include "Epoller.hpp"
#include "Socket.hpp"
#include "Common.hpp"
#include "Connection.hpp"
#include "Channel.hpp"using namespace SocketModule;
// 专门用来获取新连接
class Listener : public Connection // 可以用基类指向派生类来处理{
public:Listener(int port = defaultport): _port(port),_listensock(std::make_unique<TcpSocket>()){_listensock->BUildTcpLIstenSocketMethod(_port);SetEvent(EPOLLIN | EPOLLET);SetNonBlock(_listensock->Fd());}~Listener() {}int GetSockFd() override{return _listensock->Fd();}void Recver() override{InetAddr client;// 新连接就绪,可能有多个,一次性把所有的链接全部获取上来while (true){int sockfd = _listensock->Accept(&client);if (sockfd == ACCEPT_ERR)break;else if (sockfd == ACCEPT_CONTINUE)continue;else if (sockfd == ACCEPT_DOWN)break;// 合法fdelse{std::shared_ptr<Connection> conn = std::make_shared<Channel>(sockfd,client);conn->SetEvent(EPOLLIN | EPOLLET);if(_handler!=nullptr)conn->RegisterHandler(_handler);GetOwner()->AddConnection(conn);}}}void Sender() override{ }void Excepter() override{ }private:int _port;std::unique_ptr<Socket> _listensock;
};
Log.hpp
#ifndef __LOG_HPP__
#define __LOG_HPP__#include <iostream>
#include <string>
#include "Mutex.hpp"
#include <filesystem>
#include <fstream>
#include <memory>
#include <unistd.h>
#include <sstream>
#include<ctime>namespace LogModule
{const std::string sep = "\r\n";using namespace MutexModule ;// 2.刷新策略class LogStrategy{public:~LogStrategy() = default;virtual void SyncLog(const std::string &message) = 0;};// 显示器刷新日志的策略class ConsoleLogStrategy : public LogStrategy{public:ConsoleLogStrategy() {}~ConsoleLogStrategy() {}void SyncLog(const std::string &message) override{LockGuard lockguard(_mutex);std::cout << message << sep;}private:Mutex _mutex;};// 缺省文件路径以及文件本身const std::string defaultpath = "./log";const std::string defaultfile = "my.log";// 文件刷新日志的策略class FileLogStrategy : public LogStrategy{public:FileLogStrategy(const std::string &path = defaultpath, const std::string &file = defaultfile): _path(path), _file(file){LockGuard lockguard(_mutex);if (std::filesystem::exists(_path)) // 判断路径是否存在{return;}try{std::filesystem::create_directories(_path);}catch (const std::filesystem::filesystem_error &e){std::cerr << e.what() << '\n';}}void SyncLog(const std::string &message) override{LockGuard lockguard(_mutex);std::string filename = _path + (_path.back() == '/' ? "" : "/") + _file;std::ofstream out(filename, std::ios::app); // 追加写入if (!out.is_open()){return;}out << message << sep;out.close();}~FileLogStrategy() {}private:Mutex _mutex;std::string _path; // 日志文件的路径std::string _file; // 要打印的日志文件};// 形成日志等级enum class Loglevel{DEBUG,INIF,WARNING,ERROR,FATAL};std::string Level2Str(Loglevel level){switch (level){case Loglevel::DEBUG:return "DEBUG";case Loglevel::INIF:return "INIF";case Loglevel::WARNING:return "WARNING";case Loglevel::ERROR:return "ERROR";case Loglevel::FATAL:return "FATAL";default:return "UNKNOWN";}}std::string GetTimeStamp(){time_t cuur =time(nullptr);struct tm curr_tm;localtime_r(&cuur,&curr_tm);char buffer[128];snprintf(buffer,sizeof(buffer),"%4d-%02d-%02d %02d:%02d:%02d",curr_tm.tm_year+1900,curr_tm.tm_mon+1,curr_tm.tm_mday,curr_tm.tm_hour,curr_tm.tm_min,curr_tm.tm_sec);return buffer;}class Logger{public:Logger(){EnableConsoleLogStrategy();}// 选择某种策略// 1.文件void EnableFileLogStrategy(){_ffush_strategy = std::make_unique<FileLogStrategy>();}// 显示器void EnableConsoleLogStrategy(){_ffush_strategy = std::make_unique<ConsoleLogStrategy>();}// 表示的是未来的一条日志class LogMessage{public:LogMessage(Loglevel &level, std::string &src_name, int line_number, Logger &logger): _curr_time(GetTimeStamp()), _level(level), _pid(getpid()), _src_name(src_name), _line_number(line_number), _logger(logger){// 合并左半部分std::stringstream ss;ss << "[" << _curr_time << "] "<< "[" << Level2Str(_level) << "] "<< "[" << _pid << "] "<< "[" << _src_name << "] "<< "[" << _line_number << "] "<< "- ";_loginfo = ss.str();}template <typename T>LogMessage &operator<<(const T &info){// 右半部分,可变std::stringstream ss;ss << info;_loginfo += ss.str();return *this;}~LogMessage(){if (_logger._ffush_strategy){_logger._ffush_strategy->SyncLog(_loginfo);}}private:std::string _curr_time; // 日志时间Loglevel _level; // 日志状态pid_t _pid; // 进程pidstd::string _src_name; // 文件名称int _line_number; // 对应的行号std::string _loginfo; // 合并之后的一条完整信息Logger &_logger;};LogMessage operator()(Loglevel level, std::string src_name, int line_number){return LogMessage(level, src_name, line_number, *this);}~Logger() {}private:std::unique_ptr<LogStrategy> _ffush_strategy;};//全局日志对象Logger logger;//使用宏,简化用户操作,获取文件名和行号// __FILE__ 一个宏,替换完成后目标文件的文件名// __LINE__ 一个宏,替换完成后目标文件对应的行号#define LOG(level) logger(level,__FILE__,__LINE__) #define Enable_Console_Log_Strategy() logger.EnableConsoleLogStrategy()#define Enable_File_Log_Strategy() logger.EnableFileLogStrategy()}#endif
Main.cc
#include <iostream>
#include <string>
#include "Reactor.hpp"
#include "Listener.hpp"
#include"Channel.hpp"
#include "Log.hpp"
#include "Common.hpp"
#include"Protocol.hpp"
#include"NetCal.hpp"void Usage(std::string proc)
{
std::cerr<<"Usage: "<<proc<<"prot"<<std::endl;
}
// ./server port
int main(int argc,char * argv[])
{if(argc != 2){Usage(argv[0]);exit(USAGE_ERR);}LogModule::ConsoleLogStrategy();uint16_t port = std::stoi(argv[1]);//1.构建业务模块std::shared_ptr<Cal> cal = std::make_shared<Cal>();//2.构建协议对象std::shared_ptr<Protocol> protocol = std::make_shared<Protocol>([&cal](Request &req)->Response{LOG(Loglevel::DEBUG)<<"进入到了protocol";return cal->Execute(req);});//3.构建listener对象std::shared_ptr<Connection> conn = std::make_shared<Listener>(port);conn ->RegisterHandler([&protocol](std::string &inbuffer)->std::string{LOG(Loglevel::DEBUG)<<"进入到匿名函数";std::string response_str;while (true){std::string package;if (!protocol->Decode(inbuffer, &package))break;// packge一定是一个完整的请求,是字节流的response_str += protocol->Execute(package);}LOG(Loglevel::DEBUG)<<"结束匿名函数...response_str:"<<response_str;return response_str;});//4.构建事件派发模块Reactorstd::unique_ptr<Reactor> R = std::make_unique<Reactor>();R->AddConnection(conn);R->Loop();return 0;
}
Makefile
ReactorServer:Main.ccg++ -o $@ $^ -lpthread -ljsoncpp -std=c++17
.PHONY:clean
clean:rm -f ReactorServer
Mutex.hpp
#pragma once
#include <pthread.h>
#include <iostream>
namespace MutexModule
{ class Mutex{public:Mutex(){pthread_mutex_init(&_mutex, nullptr);}void Lock(){int n = pthread_mutex_lock(&_mutex);(void)n;}void Unlock(){int n = pthread_mutex_unlock(&_mutex);(void)n;}~Mutex(){pthread_mutex_destroy(&_mutex);}pthread_mutex_t *get(){return &_mutex;}private:pthread_mutex_t _mutex;};class LockGuard{public:LockGuard(Mutex &mutex):_mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex;};
}
NetCal.hpp
#pragma once#include "Protocol.hpp"
#include <iostream>class Cal
{
public:Response Execute(Request &req){Response resp(0, 0); // code 0正常 1 除零错误 2 mod零 3 非法错误switch (req.Oper()){case '+':resp.SetResult(req.X() + req.Y());break;case '-':resp.SetResult(req.X() - req.Y());break;case '*':resp.SetResult(req.X() * req.Y());break;case '/':if (req.Y() == 0){resp.SetCode(1);}resp.SetResult(req.X() / req.Y());break;case '%':if (req.Y() == 0){resp.SetCode(2);}resp.SetResult(req.X() % req.Y());break;default:resp.SetCode(3);break;}return resp;}private:
};
Protocol.hpp
#pragma once
#include <iostream>
#include <string>
#include <memory>
#include <jsoncpp/json/json.h>
#include "Socket.hpp"
#include <functional>// 实现网络版本的计算器
using namespace LogModule;
using namespace SocketModule;// client ->serverclass Request
{
public:Request() {}Request(int x, int y, char oper): _x(x), _y(y), _oper(oper){}std::string Serialize(){// {// "x" : _x// "y" : _y// "oper" :_oper// }Json::Value root;root["x"] = _x;root["y"] = _y;root["oper"] = _oper;Json::FastWriter writer;std::string s = writer.write(root);return s;}bool Deserialize(std::string &in){//{"x":10,"y":20,"oper":'+'}Json::Value root;Json::Reader reader;bool ok = reader.parse(in, root);if (ok){_x = root["x"].asInt();_y = root["y"].asInt();_oper = root["oper"].asInt();}return ok;}~Request() {}int X() { return _x; }int Y() { return _y; }char Oper() { return _oper; }private:int _x;int _y;char _oper;
};// server -> client
class Response
{
public:Response() {}Response(int result, int code): _result(result), _code(code){}void SetResult(int res){_result = res;}void SetCode(int code){_code = code;}std::string Serialize(){Json::Value root;root["result"] = _result;root["code"] = _code;Json::FastWriter writer;return writer.write(root);}bool Deserialize(std::string &in){Json::Value root;Json::Reader reader;bool ok = reader.parse(in, root);if (ok){_result = root["result"].asInt();_code = root["code"].asInt();}return ok;}~Response() {}bool ShowResult(){std::cout << "结果是: " << _result << "[" << _code << "]" << std::endl;return true;}private:int _result; // 运算结果int _code; // 0 正常 1,2,3,4 不同的异常
};const std::string ssep = "\r\n";using func_t = std::function<Response(Request &req)>;class Protocol
{
public:Protocol() : _func() {}Protocol(func_t func) : _func(func) {}std::string Encode(const std::string Jsonstr){// 包装// 50\r\n{"x":10,"y":20,"oper":'+'}\r\nstd::string len = std::to_string(Jsonstr.size());return len + ssep + Jsonstr + ssep; // 应用层封装报头}bool Decode(std::string &buffer, std::string *package){ LOG(Loglevel::DEBUG)<<"进入到了protocol中Decode";// 1.判断报文完整性ssize_t pos = buffer.find(ssep);if (pos == std::string::npos){return false; // 让调用方出去继续读}// 2.提取至少一个报文请求并移除在回调提取下一个std::string packge_len_str = buffer.substr(0, pos);int package_len_int = std::stoi(packge_len_str);// buffer 有长度,但是不一定包含整个报文,得先判断int target_len = packge_len_str.size() + package_len_int + ssep.size() * 2;if (buffer.size() < target_len)return false;// 至少包含一个完整报文*package = buffer.substr(pos + ssep.size(), package_len_int);// 移除已读报文,并且传递给输出型参数,然后返回等待下次回调处理buffer.erase(0, target_len);return true;}std::string Execute(std::string &package){LOG(Loglevel::DEBUG)<<"进入到了protocol中Execute";// 读取Request req;bool ok = req.Deserialize(package);if (!ok){return std::string();}Response resp = _func(req);// 序列化std::string jion_str = resp.Serialize();// //根据自定义协议添加封装 size\r\n{...}\r\nstd::string send_str = Encode(jion_str);// 返回运算结果return send_str;}std::string BuildRequsetString(int x, int y, char oper){// 1.构建请求Request req(x, y, oper);// 2.序列化std::string json_req = req.Serialize();std::cout << json_req << std::endl;// 3.封装报头return Encode(json_req);}~Protocol() {}private:func_t _func;
};
Reactor.hpp
#pragma once#include<iostream>
#include<memory>
#include<unordered_map>
#include"Epoller.hpp"
#include"Connection.hpp"
#include"Log.hpp"
using namespace LogModule;
class Reactor
{static const int revs_num = 128;
private:bool IsConnectionExists(const std::shared_ptr<Connection> &conn){return IsConnectionExistsHelper( conn->GetSockFd());}
bool IsConnectionExistsHelper(int sockfd)
{auto iter = _connections.find(sockfd);if(iter ==_connections.end()){return false;}else{return true;}
}
bool IsConnectionExists(int sockfd)
{return IsConnectionExistsHelper(sockfd);
}bool IsConnectionEmpty(){return _connections.empty();}
public:
Reactor():_epoller_ptr(std::make_unique<Epoller>()),_isrunning(false)
{}//事件派发器
void Dispatcher(int n)
{for(int i =0;i<n;i++){int sockfd = _revs[i].data.fd; //就绪fduint32_t revents = _revs[i].events; //就绪事件//1.将所有的异常统一转化为IO错误//2.所有的IO异常处理全部转化为一个异常处理函数if(revents & EPOLLERR) revents |=(EPOLLIN|EPOLLOUT); //异常处理if(revents & EPOLLHUP) revents |=(EPOLLIN|EPOLLOUT); //处理关闭链接//走到这不用区分是否异常,因为多态也不用区分listenfd还是普通sockfd,if(revents &EPOLLIN){//读时间就绪if (IsConnectionExists(sockfd))_connections[sockfd]->Recver();}if(revents & EPOLLOUT){//写事件就绪if (IsConnectionExists(sockfd))_connections[sockfd]->Sender();}}
}int LoopOnce( int timeout)
{return _epoller_ptr->WaitEvents(_revs,revs_num,timeout);
}void Loop()
{ if(IsConnectionEmpty()){return;}_isrunning = true;while(_isrunning){int timeout = 1000;int n = LoopOnce(timeout); Dispatcher(n); }_isrunning = false;
}
void Stop()
{_isrunning = false;
}
//将新连接全部添加到_connections,并且添加到内核
void AddConnection(std::shared_ptr<Connection> & conn)
{//0.防止重复添加if(IsConnectionExists(conn)){LOG(Loglevel::WARNING)<<"conn is exists:"<<conn ->GetSockFd();return ;}//1.conn对应的fd和他关心的事件写入到内核uint32_t events = conn->GetEvent();int sockfd =conn->GetSockFd();_epoller_ptr->AddEvent(sockfd,events);//2.设置当前conn的拥有者回指指针conn->SetOwner(this);//3.将具体的connection添加到_connections;_connections[sockfd]=conn;
}~Reactor(){}private:
//1.epoll模型
std::unique_ptr<Epoller> _epoller_ptr;//2.管理所有的connection,本质是管理未来所有我获取到的fd
//fd : Commection
std::unordered_map<int ,std::shared_ptr<Connection>> _connections;//3.就绪的所有事件
struct epoll_event _revs[revs_num];//是否启动
bool _isrunning;
};
Socket.hpp
#pragma once
#include <iostream>
#include <string>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <cstdlib>
#include <unistd.h>
#include "Log.hpp"
#include "Common.hpp"
#include "InetAddr.hpp"namespace SocketModule
{using namespace LogModule;const static int gbacklog =16;const static int defaultfd =-1;// 基类socketclass Socket{public:virtual ~Socket() {}virtual void SocketOrDie() = 0;virtual void BindOrDie(uint16_t port) = 0;virtual void ListenOrDie(int backlog) = 0;virtual int Accept(InetAddr * client)= 0;virtual void Close()=0;virtual int Recv(std::string * out) = 0;virtual int Send(std::string &message) = 0;virtual int Connect(const std::string &server_ip ,uint16_t port) =0;virtual int Fd() = 0;public:void BuildTcpClientSocketMethod(){SocketOrDie();}void BUildTcpLIstenSocketMethod(uint16_t port,int backlog = gbacklog){SocketOrDie();BindOrDie(port);ListenOrDie(backlog);}// void BUildUdpSocketMethod()// {// SocketOrDie();// BindOrDie();// }};class TcpSocket : public Socket{public:TcpSocket():_sockfd(defaultfd){}TcpSocket(int fd): _sockfd(fd) {}~TcpSocket() {}void SocketOrDie() override{_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);if (_sockfd < 0){LOG(Loglevel::FATAL) << "创建套接字失败!";exit(SOCKET_ERR);}LOG(Loglevel::INIF) << "创建套接字成功!"<< _sockfd;}void BindOrDie(uint16_t port) override{InetAddr localaddr(port);int n = ::bind(_sockfd, localaddr.NetAddrPtr(), localaddr.NetAddrLen());if (n < 0){LOG(Loglevel::FATAL) << "绑定失败!";exit(BIND_ERR);}LOG(Loglevel::INIF) << "绑定成功!";}void ListenOrDie(int backlog) override{int n = ::listen(_sockfd, backlog);if (n < 0){LOG(Loglevel::FATAL) << "监听失败!";exit(LISTEN_ERR);}LOG(Loglevel::INIF) << "监听成功!";}#define ACCEPT_ERR -3#define ACCEPT_DOWN -1#define ACCEPT_CONTINUE -2int Accept(InetAddr * client) override{struct sockaddr_in peer;socklen_t len = sizeof(peer);int fd =::accept(_sockfd,CONV(peer),&len);if (fd < 0){if(errno == EAGAIN ||errno ==EWOULDBLOCK){return -1;//底层没有新连接}else if(errno == EINTR){return -2;//继续读}else{LOG(Loglevel::WARNING)<<"连接失败!";return -3; //连接失败}}LOG(Loglevel::WARNING)<<"连接成功!sockfd:"<< _sockfd;client->SetAddr(peer);return fd;}void Close() override{if(_sockfd >=0){::close(_sockfd);}}int Recv(std::string * out) override{//流式读取,不关心读到的是什么char buffer[4096*2];ssize_t n =::recv(_sockfd,buffer,sizeof(buffer)-1,0);if (n >0){buffer[n]=0;*out+=buffer;return n;}return n;}int Send(std::string &message) override{return send(_sockfd,message.c_str(),message.size(),0);}int Connect(const std::string &server_ip ,uint16_t port) override{InetAddr server(server_ip,port);return ::connect(_sockfd,server.NetAddrPtr(),server.NetAddrLen()) ; }int Fd(){return _sockfd;}private:int _sockfd; //};// class UdpSocket : public Socket// {// };
}
TcpClient.cc(部分)
#include <iostream>
#include <memory>
#include "Socket.hpp"
#include <string>
#include "Protocol.hpp"using namespace SocketModule;
void Usage(std::string proc)
{std::cerr << "Usage: " << proc << "prot" << std::endl;
}void GetDataFromStdin(int *x, int *y, char *oper)
{std::cout << "Please Enter x: ";std::cin >> *x;std::cout << "Please Enter y: ";std::cin >> *y;std::cout << "Please Enter oper: ";std::cin >> *oper;
}
int main(int argc, char *argv[])
{if (argc != 3){Usage(argv[0]);exit(USAGE_ERR);}//std::string server_ip = argv[1];uint16_t server_port = std::stoi(argv[2]);std::shared_ptr<Socket> client = std::make_shared<TcpSocket>();client->BuildTcpClientSocketMethod();if (client->Connect(server_ip, server_port) != 0){// 失败std::cerr << "客户端连接失败" << std::endl;exit(CONNECT_ERR);}// 连接成功std::unique_ptr<Protocol> protocol = std::make_unique<Protocol>();std::string resp_buffer;while (true){// 从标准输入中获取数据int x, y;char oper;GetDataFromStdin(&x, &y, &oper);// 构建一个请求 ->可以发送的字符串std::string req_str = protocol->BuildRequsetString(x, y, oper);std::cout << "--------req_str --------" << std::endl;std::cout << req_str << std::endl;std::cout << "------------------------" << std::endl;// 3.发送请求client->Send(req_str);// 4.获取应答Response resp;bool res = protocol->GetResponse(client, resp_buffer, &resp);if (res == false)break;// 显示结果resp.ShowResult();}client->Close();return 0;
}
server
client