为了适配所有的平台,在各种平台上传输不一样的消息结构体,我们创建了消息字节流。首先定义了CELLStream类实现基础类型和数组读写
CELLStream.hpp文件
#ifndef _CELL_STREAM_HPP_
#define _CELL_STREAM_HPP_
#include"CELLLog.hpp"
#include<cstdint>
#include<string>
//字节流BYTE
class CELLStream
{
public:
CELLStream(char* pData, int nSize, bool bDelete = false)
{
_nSize = nSize;
_pBuff = pData;
_bDelete = bDelete;
}
CELLStream(int nSize = 1024)
{
_nSize = nSize;
_pBuff = new char[_nSize];
_bDelete = true;
}
virtual ~CELLStream()
{
if (_bDelete && _pBuff)
{
delete[] _pBuff;
_pBuff = nullptr;
}
}
public:
char* data()
{
return _pBuff;
}
int length()
{
return _nWritePos;
}
//内联函数
//还能读出n字节的数据吗?
inline bool canRead(int n)
{
return _nSize - _nReadPos >= n;
}
//还能写入n字节的数据吗?
inline bool canWrite(int n)
{
return _nSize - _nWritePos >= n;
}
//已写入位置,添加n字节长度
inline void push(int n)
{
_nWritePos += n;
}
//已读取位置,添加n字节长度
inline void pop(int n)
{
_nReadPos += n;
}
inline void setWritePos(int n)
{
_nWritePos = n;
}
//////Read
template<typename T>
bool Read(T& n, bool bOffset = true)
{
//
//计算要读取数据的字节长度
auto nLen = sizeof(T);
//判断能不能读
if (canRead(nLen))
{
//将要读取的数据 拷贝出来
memcpy(&n, _pBuff + _nReadPos, nLen);
//计算已读数据位置
if(bOffset)
pop(nLen);
return true;
}
//断言assert
//错误日志
CELLLog_Error("error, CELLStream::Read failed.");
return false;
}
template<typename T>
bool onlyRead(T& n)
{
return Read(n, false);
}
template<typename T>
uint32_t ReadArray(T* pArr, uint32_t len)
{
uint32_t len1 = 0;
//读取数组元素个数,但不偏移读取位置
Read(len1,false);
//判断缓存数组能否放得下
if (len1 <= len)
{
//计算数组的字节长度
auto nLen = len1 * sizeof(T);
//判断能不能读出
if (canRead(nLen + sizeof(uint32_t)))
{
//计算已读位置+数组长度所占有空间
pop(sizeof(uint32_t));
//将要读取的数据 拷贝出来
memcpy(pArr, _pBuff + _nReadPos, nLen);
//计算已读数据位置
pop(nLen);
return len1;
}
}
CELLLog_Error("CELLStream::ReadArray failed.");
return 0;
}
//char size_t c# char2 char 1
int8_t ReadInt8(int8_t def = 0)
{
Read(def);
return def;
}
//short
int16_t ReadInt16(int16_t n = 0)
{
Read(n);
return n;
}
//int
int32_t ReadInt32(int32_t n = 0)
{
Read(n);
return n;
}
int64_t ReadInt64(int64_t n = 0)
{
Read(n);
return n;
}
uint8_t ReadUInt8(uint8_t def = 0)
{
Read(def);
return def;
}
//short
uint16_t ReadUInt16(uint16_t n = 0)
{
Read(n);
return n;
}
//int
uint32_t ReadUInt32(uint32_t n = 0)
{
Read(n);
return n;
}
uint64_t ReadUInt64(uint64_t n = 0)
{
Read(n);
return n;
}
float ReadFloat(float n = 0.0f)
{
Read(n);
return n;
}
double ReadDouble(double n = 0.0f)
{
Read(n);
return n;
}
bool ReadString(std::string& str)
{
uint32_t nLen = 0;
Read(nLen, false);
if (nLen > 0)
{
//判断能不能读出
if (canRead(nLen + sizeof(uint32_t)))
{
//计算已读位置+数组长度所占有空间
pop(sizeof(uint32_t));
//将要读取的数据 拷贝出来
str.insert(0, _pBuff + _nReadPos, nLen);
//计算已读数据位置
pop(nLen);
return true;
}
}
return false;
}
//////Write
template<typename T>
bool Write(T n)
{
//计算要写入数据的字节长度
auto nLen = sizeof(T);
//判断能不能写入
if (canWrite(nLen))
{
//将要写入的数据 拷贝到缓冲区尾部
memcpy(_pBuff + _nWritePos, &n, nLen);
//计算已写入数据尾部位置
push(nLen);
return true;
}
CELLLog_Error("CELLStream::Write failed.");
return false;
}
template<typename T>
bool WriteArray(T* pData, uint32_t len)
{
//计算要写入数组的字节长度
auto nLen = sizeof(T)*len;
//判断能不能写入
if (canWrite(nLen + sizeof(uint32_t)))
{
//先写入数组的元素数量
Write(len);
//将要写入的数据 拷贝到缓冲区尾部
memcpy(_pBuff + _nWritePos, pData, nLen);
//计算数据尾部位置
push(nLen);
return true;
}
CELLLog_Error("CELLStream::WriteArray failed.");
return false;
}
//char
bool WriteInt8(int8_t n)
{
return Write(n);
}
//short
bool WriteInt16(int16_t n)
{
return Write(n);
}
//int
bool WriteInt32(int32_t n)
{
return Write(n);
}
bool WriteFloat(float n)
{
return Write(n);
}
bool WriteDouble(double n)
{
return Write(n);
}
private:
//数据缓冲区
char* _pBuff = nullptr;
//缓冲区总的空间大小,字节长度
int _nSize = 0;
//已写入数据的尾部位置,已写入数据长度
int _nWritePos = 0;
//已读取数据的尾部位置
int _nReadPos = 0;
//_pBuff是外部传入的数据块时是否应该被释放
bool _bDelete = true;
};
#endif // !_CELL_STREAM_HPP_
CELLMsgStream.hpp文件
#ifndef _CELL_MSG_STREAM_HPP_
#define _CELL_MSG_STREAM_HPP_
#include"MessageHeader.hpp"
#include"CELLStream.hpp"
//消息数据字节流
class CELLReadStream :public CELLStream
{
public:
CELLReadStream(netmsg_DataHeader* header)
:CELLReadStream((char*)header, header->dataLength)
{
}
CELLReadStream(char* pData, int nSize, bool bDelete = false)
:CELLStream(pData, nSize, bDelete)
{
push(nSize);
////预先读取消息长度
//ReadInt16();
////预先读取消息命令
//getNetCmd();
}
uint16_t getNetCmd()
{
uint16_t cmd = CMD_ERROR;
Read<uint16_t>(cmd);
return cmd;
}
};
//消息数据字节流
class CELLWriteStream :public CELLStream
{
public:
CELLWriteStream(char* pData, int nSize, bool bDelete = false)
:CELLStream(pData, nSize, bDelete)
{
//预先占领消息长度所需空间
Write<uint16_t>(0);
}
CELLWriteStream(int nSize = 1024)
:CELLStream(nSize)
{
//预先占领消息长度所需空间
Write<uint16_t>(0);
}
void setNetCmd(uint16_t cmd)
{
Write<uint16_t>(cmd);
}
bool WriteString(const char* str, int len)
{
return WriteArray(str, len);
}
bool WriteString(const char* str)
{
return WriteArray(str, strlen(str));
}
bool WriteString(std::string& str)
{
return WriteArray(str.c_str(), str.length());
}
void finsh()
{
int pos = length();
setWritePos(0);
Write<uint16_t>(pos);
setWritePos(pos);
}
};
#endif // !_CELL_MSG_STREAM_HPP_
定义了CELLReadStream读数据类和CELLWriteStream写数据类继承CELLStream类负责网络消息的读写
在server.cpp中进行了读写据流的验证,但是并没有在所有发送和接受数据的地方都进行了替换,其他地方依然使用的cellbuffer的读写数据
class MyServer : public EasyTcpServer
{
public:
MyServer()
{
_bSendBack = CELLConfig::Instance().hasKey("-sendback");
_bSendFull = CELLConfig::Instance().hasKey("-sendfull");
_bCheckMsgID = CELLConfig::Instance().hasKey("-checkMsgID");
}
//cellServer 4 多个线程触发 不安全
//如果只开启1个cellServer就是安全的
virtual void OnNetJoin(CELLClient* pClient)
{
EasyTcpServer::OnNetJoin(pClient);
}
//cellServer 4 多个线程触发 不安全
//如果只开启1个cellServer就是安全的
virtual void OnNetLeave(CELLClient* pClient)
{
EasyTcpServer::OnNetLeave(pClient);
}
//cellServer 4 多个线程触发 不安全
//如果只开启1个cellServer就是安全的
virtual void OnNetMsg(CELLServer* pServer, CELLClient* pClient, netmsg_DataHeader* header)
{
EasyTcpServer::OnNetMsg(pServer, pClient, header);
switch (header->cmd)
{
case CMD_LOGIN:
{
pClient->resetDTHeart();
netmsg_Login* login = (netmsg_Login*)header;
//检查消息ID
if (_bCheckMsgID)
{
if (login->msgID != pClient->nRecvMsgID)
{//当前消息ID和本地收消息次数不匹配
CELLLog_Error("OnNetMsg socket<%d> msgID<%d> _nRecvMsgID<%d> %d", pClient->sockfd(), login->msgID, pClient->nRecvMsgID, login->msgID - pClient->nRecvMsgID);
}
++pClient->nRecvMsgID;
}
//登录逻辑
//......
//回应消息
if (_bSendBack)
{
netmsg_LoginR ret;
ret.msgID = pClient->nSendMsgID;
if (SOCKET_ERROR == pClient->SendData(&ret))
{
//发送缓冲区满了,消息没发出去,目前直接抛弃了
//客户端消息太多,需要考虑应对策略
//正常连接,业务客户端不会有这么多消息
//模拟并发测试时是否发送频率过高
if (_bSendFull)
{
CELLLog_Warring("<Socket=%d> Send Full", pClient->sockfd());
}
}
else {
++pClient->nSendMsgID;
}
}
//CELLLog_Info("recv <Socket=%d> msgType:CMD_LOGIN, dataLen:%d,userName=%s PassWord=%s", cSock, login->dataLength, login->userName, login->PassWord);
}//接收 消息---处理 发送 生产者 数据缓冲区 消费者
break;
case CMD_LOGOUT:
{
pClient->resetDTHeart();
CELLReadStream r(header);
//读取消息长度
r.ReadInt16();
//读取消息命令
r.getNetCmd();
auto n1 = r.ReadInt8();
auto n2 = r.ReadInt16();
auto n3 = r.ReadInt32();
auto n4 = r.ReadFloat();
auto n5 = r.ReadDouble();
uint32_t n = 0;
r.onlyRead(n);
char name[32] = {};
auto n6 = r.ReadArray(name, 32);
char pw[32] = {};
auto n7 = r.ReadArray(pw, 32);
int ata[10] = {};
auto n8 = r.ReadArray(ata, 10);
///
CELLWriteStream s(128);
s.setNetCmd(CMD_LOGOUT_RESULT);
s.WriteInt8(n1);
s.WriteInt16(n2);
s.WriteInt32(n3);
s.WriteFloat(n4);
s.WriteDouble(n5);
s.WriteArray(name, n6);
s.WriteArray(pw, n7);
s.WriteArray(ata, n8);
s.finsh();
pClient->SendData(s.data(), s.length());
}
break;
case CMD_C2S_HEART:
{
pClient->resetDTHeart();
netmsg_s2c_Heart ret;
pClient->SendData(&ret);
}
default:
{
CELLLog_Info("recv <socket=%d> undefine msgType,dataLen:%d", pClient->sockfd(), header->dataLength);
}
break;
}
}
private:
//自定义标志 收到消息后将返回应答消息
bool _bSendBack;
//自定义标志 是否提示:发送缓冲区已写满
bool _bSendFull;
//是否检查接收到的消息ID是否连续
bool _bCheckMsgID;
};