31_字节流

为了适配所有的平台,在各种平台上传输不一样的消息结构体,我们创建了消息字节流。首先定义了CELLStream类实现基础类型和数组读写

CELLStream.hpp文件

#ifndef _CELL_STREAM_HPP_
#define _CELL_STREAM_HPP_

#include"CELLLog.hpp"
#include<cstdint>
#include<string>

//字节流BYTE
class CELLStream
{
public:

    CELLStream(char* pData, int nSize, bool bDelete = false)
    {
        _nSize = nSize;
        _pBuff = pData;
        _bDelete = bDelete;
    }


    CELLStream(int nSize = 1024)
    {
        _nSize = nSize;
        _pBuff = new char[_nSize];
        _bDelete = true;
    }

    virtual ~CELLStream()
    {
        if (_bDelete && _pBuff)
        {
            delete[] _pBuff;
            _pBuff = nullptr;
        }
    }
public:

    char* data()
    {
        return _pBuff;
    }

    int length()
    {
        return _nWritePos;
    }

    //内联函数
    //还能读出n字节的数据吗?
    inline bool canRead(int n)
    {
        return _nSize - _nReadPos >= n;
    }
    //还能写入n字节的数据吗?
    inline bool canWrite(int n)
    {
        return _nSize - _nWritePos >= n;
    }
    //已写入位置,添加n字节长度
    inline void push(int n)
    {
        _nWritePos += n;
    }
    //已读取位置,添加n字节长度
    inline void pop(int n)
    {
        _nReadPos += n;
    }

    inline void setWritePos(int n)
    {
        _nWritePos = n;
    }

//////Read
    template<typename T>
    bool Read(T& n, bool bOffset = true)
    {
        //
        //计算要读取数据的字节长度
        auto nLen = sizeof(T);
        //判断能不能读
        if (canRead(nLen))
        {
            //将要读取的数据 拷贝出来
            memcpy(&n, _pBuff + _nReadPos, nLen);
            //计算已读数据位置
            if(bOffset)
                pop(nLen);
            return true;
        }
        //断言assert
        //错误日志
        CELLLog_Error("error, CELLStream::Read failed.");
        return false;
    }

    template<typename T>
    bool onlyRead(T& n)
    {
        return Read(n, false);
    }

    template<typename T>
    uint32_t ReadArray(T* pArr, uint32_t len)
    {
        uint32_t len1 = 0;
        //读取数组元素个数,但不偏移读取位置
        Read(len1,false);
        //判断缓存数组能否放得下
        if (len1 <= len)
        {
            //计算数组的字节长度
            auto nLen = len1 * sizeof(T);
            //判断能不能读出
            if (canRead(nLen + sizeof(uint32_t)))
            {
                //计算已读位置+数组长度所占有空间
                pop(sizeof(uint32_t));
                //将要读取的数据 拷贝出来
                memcpy(pArr, _pBuff + _nReadPos, nLen);
                //计算已读数据位置
                pop(nLen);
                return len1;
            }
        }
        CELLLog_Error("CELLStream::ReadArray failed.");
        return 0;
    }

    //char size_t c# char2 char 1 
    int8_t ReadInt8(int8_t def = 0)
    {
        Read(def);
        return def;
    }
    //short
    int16_t ReadInt16(int16_t n = 0)
    {
        Read(n);
        return n;
    }
    //int
    int32_t ReadInt32(int32_t n = 0)
    {
        Read(n);
        return n;
    }

    int64_t ReadInt64(int64_t n = 0)
    {
        Read(n);
        return n;
    }

    uint8_t ReadUInt8(uint8_t def = 0)
    {
        Read(def);
        return def;
    }
    //short
    uint16_t ReadUInt16(uint16_t n = 0)
    {
        Read(n);
        return n;
    }
    //int
    uint32_t ReadUInt32(uint32_t n = 0)
    {
        Read(n);
        return n;
    }

    uint64_t ReadUInt64(uint64_t n = 0)
    {
        Read(n);
        return n;
    }

    float ReadFloat(float n = 0.0f)
    {
        Read(n);
        return n;
    }
    double ReadDouble(double n = 0.0f)
    {
        Read(n);
        return n;
    }

    bool ReadString(std::string& str)
    {
        uint32_t nLen = 0;
        Read(nLen, false);
        if (nLen > 0)
        {
            //判断能不能读出
            if (canRead(nLen + sizeof(uint32_t)))
            {
                //计算已读位置+数组长度所占有空间
                pop(sizeof(uint32_t));
                //将要读取的数据 拷贝出来
                str.insert(0, _pBuff + _nReadPos, nLen);
                //计算已读数据位置
                pop(nLen);
                return true;
            }
        }
        return false;
    }
//////Write
    template<typename T>
    bool Write(T n)
    {
        //计算要写入数据的字节长度
        auto nLen = sizeof(T);
        //判断能不能写入
        if (canWrite(nLen))
        {
            //将要写入的数据 拷贝到缓冲区尾部
            memcpy(_pBuff + _nWritePos, &n, nLen);
            //计算已写入数据尾部位置
            push(nLen);
            return true;
        }
        CELLLog_Error("CELLStream::Write failed.");
        return false;
    }
    template<typename T>
    bool WriteArray(T* pData, uint32_t len)
    {
        //计算要写入数组的字节长度
        auto nLen = sizeof(T)*len;
        //判断能不能写入
        if (canWrite(nLen + sizeof(uint32_t)))
        {
            //先写入数组的元素数量
            Write(len);
            //将要写入的数据 拷贝到缓冲区尾部
            memcpy(_pBuff + _nWritePos, pData, nLen);
            //计算数据尾部位置
            push(nLen);
            return true;
        }
        CELLLog_Error("CELLStream::WriteArray failed.");
        return false;
    }

    //char
    bool WriteInt8(int8_t n)
    {
        return Write(n);
    }
    //short
    bool WriteInt16(int16_t n)
    {
        return Write(n);
    }

    //int
    bool WriteInt32(int32_t n)
    {
        return Write(n);
    }

    bool WriteFloat(float n)
    {
        return Write(n);
    }

    bool WriteDouble(double n)
    {
        return Write(n);
    }
private:
    //数据缓冲区
    char* _pBuff = nullptr;
    //缓冲区总的空间大小,字节长度
    int _nSize = 0;
    //已写入数据的尾部位置,已写入数据长度
    int _nWritePos = 0;
    //已读取数据的尾部位置
    int _nReadPos = 0;
    //_pBuff是外部传入的数据块时是否应该被释放
    bool _bDelete = true;
};


#endif // !_CELL_STREAM_HPP_

CELLMsgStream.hpp文件

#ifndef _CELL_MSG_STREAM_HPP_
#define _CELL_MSG_STREAM_HPP_

#include"MessageHeader.hpp"
#include"CELLStream.hpp"

//消息数据字节流
class CELLReadStream :public CELLStream
{
public:
    CELLReadStream(netmsg_DataHeader* header)
        :CELLReadStream((char*)header, header->dataLength)
    {

    }

    CELLReadStream(char* pData, int nSize, bool bDelete = false)
        :CELLStream(pData, nSize, bDelete)
    {
        push(nSize);
        ////预先读取消息长度
        //ReadInt16();
        ////预先读取消息命令
        //getNetCmd();
    }

    uint16_t getNetCmd()
    {
        uint16_t cmd = CMD_ERROR;
        Read<uint16_t>(cmd);
        return cmd;
    }
};

//消息数据字节流
class CELLWriteStream :public CELLStream
{
public:
    CELLWriteStream(char* pData, int nSize, bool bDelete = false)
        :CELLStream(pData, nSize, bDelete)
    {
        //预先占领消息长度所需空间
        Write<uint16_t>(0);
    }

    CELLWriteStream(int nSize = 1024)
        :CELLStream(nSize)
    {
        //预先占领消息长度所需空间
        Write<uint16_t>(0);
    }

    void setNetCmd(uint16_t cmd)
    {
        Write<uint16_t>(cmd);
    }

    bool WriteString(const char* str, int len)
    {
        return WriteArray(str, len);
    }

    bool WriteString(const char* str)
    {
        return WriteArray(str, strlen(str));
    }

    bool WriteString(std::string& str)
    {
        return WriteArray(str.c_str(), str.length());
    }

    void finsh()
    {
        int pos = length();
        setWritePos(0);
        Write<uint16_t>(pos);
        setWritePos(pos);
    }
};


#endif // !_CELL_MSG_STREAM_HPP_

定义了CELLReadStream读数据类和CELLWriteStream写数据类继承CELLStream类负责网络消息的读写

在server.cpp中进行了读写据流的验证,但是并没有在所有发送和接受数据的地方都进行了替换,其他地方依然使用的cellbuffer的读写数据

class MyServer : public EasyTcpServer
{
public:
    MyServer()
    {
        _bSendBack = CELLConfig::Instance().hasKey("-sendback");
        _bSendFull = CELLConfig::Instance().hasKey("-sendfull");
        _bCheckMsgID = CELLConfig::Instance().hasKey("-checkMsgID");
    }
    //cellServer 4 多个线程触发 不安全
    //如果只开启1个cellServer就是安全的
    virtual void OnNetJoin(CELLClient* pClient)
    {
        EasyTcpServer::OnNetJoin(pClient);
    }
    //cellServer 4 多个线程触发 不安全
    //如果只开启1个cellServer就是安全的
    virtual void OnNetLeave(CELLClient* pClient)
    {
        EasyTcpServer::OnNetLeave(pClient);
    }
    //cellServer 4 多个线程触发 不安全
    //如果只开启1个cellServer就是安全的
    virtual void OnNetMsg(CELLServer* pServer, CELLClient* pClient, netmsg_DataHeader* header)
    {
        EasyTcpServer::OnNetMsg(pServer, pClient, header);
        switch (header->cmd)
        {
        case CMD_LOGIN:
        {
            pClient->resetDTHeart();
            netmsg_Login* login = (netmsg_Login*)header;
            //检查消息ID
            if (_bCheckMsgID)
            {
                if (login->msgID != pClient->nRecvMsgID)
                {//当前消息ID和本地收消息次数不匹配
                    CELLLog_Error("OnNetMsg socket<%d> msgID<%d> _nRecvMsgID<%d> %d", pClient->sockfd(), login->msgID, pClient->nRecvMsgID, login->msgID - pClient->nRecvMsgID);
                }
                ++pClient->nRecvMsgID;
            }
            //登录逻辑
            //......
            //回应消息
            if (_bSendBack)
            {
                netmsg_LoginR ret;
                ret.msgID = pClient->nSendMsgID;
                if (SOCKET_ERROR == pClient->SendData(&ret))
                {
                    //发送缓冲区满了,消息没发出去,目前直接抛弃了
                    //客户端消息太多,需要考虑应对策略
                    //正常连接,业务客户端不会有这么多消息
                    //模拟并发测试时是否发送频率过高
                    if (_bSendFull)
                    {
                        CELLLog_Warring("<Socket=%d> Send Full", pClient->sockfd());
                    }
                }
                else {
                    ++pClient->nSendMsgID;
                }
            }

            //CELLLog_Info("recv <Socket=%d> msgType:CMD_LOGIN, dataLen:%d,userName=%s PassWord=%s", cSock, login->dataLength, login->userName, login->PassWord);
        }//接收 消息---处理 发送   生产者 数据缓冲区  消费者 
        break;
        case CMD_LOGOUT:
        {
            pClient->resetDTHeart();
            CELLReadStream r(header);
            //读取消息长度
            r.ReadInt16();
            //读取消息命令
            r.getNetCmd();
            auto n1 = r.ReadInt8();
            auto n2 = r.ReadInt16();
            auto n3 = r.ReadInt32();
            auto n4 = r.ReadFloat();
            auto n5 = r.ReadDouble();
            uint32_t n = 0;
            r.onlyRead(n);
            char name[32] = {};
            auto n6 = r.ReadArray(name, 32);
            char pw[32] = {};
            auto n7 = r.ReadArray(pw, 32);
            int ata[10] = {};
            auto n8 = r.ReadArray(ata, 10);
            ///
            CELLWriteStream s(128);
            s.setNetCmd(CMD_LOGOUT_RESULT);
            s.WriteInt8(n1);
            s.WriteInt16(n2);
            s.WriteInt32(n3);
            s.WriteFloat(n4);
            s.WriteDouble(n5);
            s.WriteArray(name, n6);
            s.WriteArray(pw, n7);
            s.WriteArray(ata, n8);
            s.finsh();
            pClient->SendData(s.data(), s.length());
        }
        break;
        case CMD_C2S_HEART:
        {
            pClient->resetDTHeart();
            netmsg_s2c_Heart ret;
            pClient->SendData(&ret);
        }
        default:
        {
            CELLLog_Info("recv <socket=%d> undefine msgType,dataLen:%d", pClient->sockfd(), header->dataLength);
        }
        break;
        }
    }
private:
    //自定义标志 收到消息后将返回应答消息
    bool _bSendBack;
    //自定义标志 是否提示:发送缓冲区已写满
    bool _bSendFull;
    //是否检查接收到的消息ID是否连续
    bool _bCheckMsgID;
};

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top