select异步发送
socket依然是阻塞的,但是我们之前并没有在select中判断客户端是否可写,如果客户端故意不接收数据或者其他故障,那么单个客户端的故障会导致所有客户端和服务器端的交互发生故障。所以,我们添加了在select中添加了客户端的可写判断,同时,放弃了定量发送数据,定时发送数据的意义也不大。
cellclient.hpp中的变化
#ifndef _CELLClient_HPP_
#define _CELLClient_HPP_
#include"CELL.hpp"
//客户端心跳检测死亡计时时间
#define CLIENT_HREAT_DEAD_TIME 60000
//在间隔指定时间后
//把发送缓冲区内缓存的消息数据发送给客户端
#define CLIENT_SEND_BUFF_TIME 200
//客户端数据类型
class CELLClient
{
public:
int id = -1;
//所属serverid
int serverId = -1;
public:
//立即将发送缓冲区的数据发送给客户端
int SendDataReal()
{
int ret = 0;
//缓冲区有数据
if (_lastSendPos > 0 && INVALID_SOCKET != _sockfd)
{
//发送数据
ret = send(_sockfd, _szSendBuf, _lastSendPos, 0);
//数据尾部位置清零
_lastSendPos = 0;
//
_sendBuffFullCount = 0;
//
resetDTSend();
}
return ret;
}
//缓冲区的控制根据业务需求的差异而调整
//发送数据
int SendData(netmsg_DataHeader* header)
{
int ret = SOCKET_ERROR;
//要发送的数据长度
int nSendLen = header->dataLength;
//要发送的数据
const char* pSendData = (const char*)header;
if (_lastSendPos + nSendLen <= SEND_BUFF_SZIE)
{
//将要发送的数据 拷贝到发送缓冲区尾部
memcpy(_szSendBuf + _lastSendPos, pSendData, nSendLen);
//计算数据尾部位置
_lastSendPos += nSendLen;
if (_lastSendPos == SEND_BUFF_SZIE)
{
_sendBuffFullCount++;
}
return nSendLen;
}else{
_sendBuffFullCount++;
}
return ret;
}
private:
//发送缓冲区遇到写满情况计数
int _sendBuffFullCount = 0;
};
#endif // !_CELLClient_HPP_
cellserver.hpp的修改,舍弃了CELLTaskServer的使用
#ifndef _CELL_SERVER_HPP_
#define _CELL_SERVER_HPP_
#include"CELL.hpp"
#include"INetEvent.hpp"
#include"CELLClient.hpp"
#include"CELLSemaphore.hpp"
#include<vector>
#include<map>
//网络消息接收处理服务类
class CELLServer
{
public:
//处理网络消息
void OnRun(CELLThread* pThread)
{
while (pThread->isRun())
{
if (!_clientsBuff.empty())
{//从缓冲队列里取出客户数据
std::lock_guard<std::mutex> lock(_mutex);
for (auto pClient : _clientsBuff)
{
_clients[pClient->sockfd()] = pClient;
pClient->serverId = _id;
if (_pNetEvent)
_pNetEvent->OnNetJoin(pClient);
}
_clientsBuff.clear();
_clients_change = true;
}
//如果没有需要处理的客户端,就跳过
if (_clients.empty())
{
std::chrono::milliseconds t(1);
std::this_thread::sleep_for(t);
//旧的时间戳
_oldTime = CELLTime::getNowInMilliSec();
continue;
}
//伯克利套接字 BSD socket
//描述符(socket) 集合
fd_set fdRead;
fd_set fdWrite;
//fd_set fdExc;
if (_clients_change)
{
_clients_change = false;
//清理集合
FD_ZERO(&fdRead);
//将描述符(socket)加入集合
_maxSock = _clients.begin()->second->sockfd();
for (auto iter : _clients)
{
FD_SET(iter.second->sockfd(), &fdRead);
if (_maxSock < iter.second->sockfd())
{
_maxSock = iter.second->sockfd();
}
}
memcpy(&_fdRead_bak, &fdRead, sizeof(fd_set));
}
else {
memcpy(&fdRead, &_fdRead_bak, sizeof(fd_set));
}
memcpy(&fdWrite, &_fdRead_bak, sizeof(fd_set));
//memcpy(&fdExc, &_fdRead_bak, sizeof(fd_set));
///nfds 是一个整数值 是指fd_set集合中所有描述符(socket)的范围,而不是数量
///既是所有文件描述符最大值+1 在Windows中这个参数可以写0
timeval t{ 0,1 };
int ret = select(_maxSock + 1, &fdRead, &fdWrite, nullptr, &t);
if (ret < 0)
{
printf("CELLServer%d.OnRun.select Error exit\n", _id);
pThread->Exit();
break;
}
//else if (ret == 0)
//{
// continue;
//}
ReadData(fdRead);
WriteData(fdWrite);
//WriteData(fdExc);
//printf("CELLServer%d.OnRun.select: fdRead=%d,fdWrite=%d\n", _id, fdRead.fd_count, fdWrite.fd_count);
//if (fdExc.fd_count > 0)
//{
// printf("###fdExc=%d\n", fdExc.fd_count);
//}
CheckTime();
}
printf("CELLServer%d.OnRun exit\n", _id);
}
void WriteData(fd_set& fdWrite)
{
#ifdef _WIN32
for (int n = 0; n < fdWrite.fd_count; n++)
{
auto iter = _clients.find(fdWrite.fd_array[n]);
if (iter != _clients.end())
{
if (-1 == iter->second->SendDataReal())
{
OnClientLeave(iter->second);
_clients.erase(iter);
}
}
}
#else
for (auto iter = _clients.begin(); iter != _clients.end(); )
{
if (FD_ISSET(iter->second->sockfd(), &fdWrite))
{
if (-1 == iter->second->SendDataReal())
{
OnClientLeave(iter->second);
auto iterOld = iter;
iter++;
_clients.erase(iterOld);
continue;
}
}
iter++;
}
#endif
}
};
#endif // !_CELL_SERVER_HPP_