17_server添加发送缓冲区

server接收和发送数据的能力并不对等,通过设置缓冲区来尝试改变,并且在多种场景下需要定时定量发送数据给客户端,需要设置缓冲区。

image-20240404191231774

消息发送多对一

image-20240404191408495

消息发送一对多

在发送数据时添加缓冲区

#ifndef _EasyTcpServer_hpp_
#define _EasyTcpServer_hpp_

#ifdef _WIN32
#define FD_SETSIZE      2506
#define WIN32_LEAN_AND_MEAN
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#include<windows.h>
#include<WinSock2.h>
#pragma comment(lib,"ws2_32.lib")
#else
#include<unistd.h> //uni std
#include<arpa/inet.h>
#include<string.h>

#define SOCKET int
#define INVALID_SOCKET  (SOCKET)(~0)
#define SOCKET_ERROR            (-1)
#endif

#include<stdio.h>
#include<vector>
#include<map>
#include<thread>
#include<mutex>
#include<atomic>

#include"MessageHeader.hpp"
#include"CELLTimestamp.hpp"

//缓冲区最小单元大小
#ifndef RECV_BUFF_SZIE
#define RECV_BUFF_SZIE 10240 * 5
#define SEND_BUFF_SZIE RECV_BUFF_SZIE
#endif // !RECV_BUFF_SZIE

//客户端数据类型
class ClientSocket
{
public:
    ClientSocket(SOCKET sockfd = INVALID_SOCKET)
    {
        _sockfd = sockfd;
        memset(_szMsgBuf, 0, sizeof(_szMsgBuf));
        _lastPos = 0;

        memset(_szSendBuf, 0, sizeof(_szMsgBuf));
        _lastSendPos = 0;
    }

    SOCKET sockfd()
    {
        return _sockfd;
    }

    char* msgBuf()
    {
        return _szMsgBuf;
    }

    int getLastPos()
    {
        return _lastPos;
    }
    void setLastPos(int pos)
    {
        _lastPos = pos;
    }

    //发送数据
    int SendData(DataHeader* header)
    {
        int ret = SOCKET_ERROR;
        const char* str = (const char*)header;
        int nLen = header->dataLength;

        while (true)
        {
            if (_lastSendPos + nLen >= SEND_BUFF_SZIE)
            {

                memcpy(_szSendBuf + _lastSendPos, str, SEND_BUFF_SZIE - _lastSendPos);
                str += (SEND_BUFF_SZIE - _lastSendPos);
                nLen -= (SEND_BUFF_SZIE - _lastSendPos);
                ret = send(_sockfd, _szSendBuf, SEND_BUFF_SZIE, 0);
                _lastSendPos = 0;
                if (ret == SOCKET_ERROR) return ret;
            }
            else
            {
                memcpy(_szSendBuf + _lastSendPos, str, nLen);
                _lastSendPos += nLen;
                break;
            }
        }
        return ret;
    }

private:
    // socket fd_set  file desc set
    SOCKET _sockfd;
    //第二缓冲区 消息缓冲区
    char _szMsgBuf[RECV_BUFF_SZIE];
    //消息缓冲区的数据尾部位置
    int _lastPos;
    //第二缓冲区 发送消息缓冲区
    char _szSendBuf[RECV_BUFF_SZIE];
    //消息缓冲区的数据尾部位置
    int _lastSendPos;
};

//网络事件接口
class INetEvent
{
public:
    //纯虚函数
    //客户端加入事件
    virtual void OnNetJoin(ClientSocket* pClient) = 0;
    //客户端离开事件
    virtual void OnNetLeave(ClientSocket* pClient) = 0;
    //客户端消息事件
    virtual void OnNetMsg(ClientSocket* pClient, DataHeader* header) = 0;
    //recv事件
    virtual void OnNetRecv(ClientSocket* pClient) = 0;
private:

};

class CellServer
{
public:
    CellServer(SOCKET sock = INVALID_SOCKET)
    {
        _sock = sock;
        _pNetEvent = nullptr;
    }

    ~CellServer()
    {
        Close();
        _sock = INVALID_SOCKET;
    }

    void setEventObj(INetEvent* event)
    {
        _pNetEvent = event;
    }

    //关闭Socket
    void Close()
    {
        if (_sock != INVALID_SOCKET)
        {
#ifdef _WIN32
            for (auto iter : _clients)
            {
                closesocket(iter.second->sockfd());
                delete iter.second;
            }
            //关闭套节字closesocket
            closesocket(_sock);
#else
            for (auto iter : _clients)
            {
                close(iter.second->sockfd());
                delete iter.second;
            }
            //关闭套节字closesocket
            close(_sock);
#endif
            _clients.clear();
        }
    }

    //是否工作中
    bool isRun()
    {
        return _sock != INVALID_SOCKET;
    }

    //处理网络消息
    //备份客户socket fd_set
    fd_set _fdRead_bak;
    //客户列表是否有变化
    bool _clients_change;
    SOCKET _maxSock;
    bool OnRun()
    {
        _clients_change = true;
        while (isRun())
        {
            if (_clientsBuff.size() > 0)
            {//从缓冲队列里取出客户数据
                std::lock_guard<std::mutex> lock(_mutex);
                for (auto pClient : _clientsBuff)
                {
                    _clients[pClient->sockfd()] = pClient;
                }
                _clientsBuff.clear();
                _clients_change = true;
            }

            //如果没有需要处理的客户端,就跳过
            if (_clients.empty())
            {
                std::chrono::milliseconds t(1);
                std::this_thread::sleep_for(t);
                continue;
            }

            //伯克利套接字 BSD socket
            fd_set fdRead;//描述符(socket) 集合
                          //清理集合
            FD_ZERO(&fdRead);
            if (_clients_change)
            {
                _clients_change = false;
                //将描述符(socket)加入集合
                _maxSock = _clients.begin()->second->sockfd();
                for (auto iter : _clients)
                {
                    FD_SET(iter.second->sockfd(), &fdRead);
                    if (_maxSock < iter.second->sockfd())
                    {
                        _maxSock = iter.second->sockfd();
                    }
                }
                memcpy(&_fdRead_bak, &fdRead, sizeof(fd_set));
            }
            else {
                memcpy(&fdRead, &_fdRead_bak, sizeof(fd_set));
            }

            ///nfds 是一个整数值 是指fd_set集合中所有描述符(socket)的范围,而不是数量
            ///既是所有文件描述符最大值+1 在Windows中这个参数可以写0
            int ret = select(_maxSock + 1, &fdRead, nullptr, nullptr, nullptr);
            if (ret < 0)
            {
                printf("select任务结束。\n");
                Close();
                return false;
            }
            else if (ret == 0)
            {
                continue;
            }

#ifdef _WIN32
            for (int n = 0; n < fdRead.fd_count; n++)
            {
                auto iter = _clients.find(fdRead.fd_array[n]);
                if (iter != _clients.end())
                {
                    if (-1 == RecvData(iter->second))
                    {
                        if (_pNetEvent)
                            _pNetEvent->OnNetLeave(iter->second);
                        _clients_change = true;
                        _clients.erase(iter->first);
                    }
                }
                else {
                    printf("error. if (iter != _clients.end())\n");
                }

            }
#else
            std::vector<ClientSocket*> temp;
            for (auto iter : _clients)
            {
                if (FD_ISSET(iter.second->sockfd(), &fdRead))
                {
                    if (-1 == RecvData(iter.second))
                    {
                        if (_pNetEvent)
                            _pNetEvent->OnNetLeave(iter.second);
                        _clients_change = false;
                        temp.push_back(iter.second);
                    }
                }
            }
            for (auto pClient : temp)
            {
                _clients.erase(pClient->sockfd());
                delete pClient;
            }
#endif
        }
    }
    //缓冲区
    //char _szRecv[RECV_BUFF_SZIE] = {};
    //接收数据 处理粘包 拆分包
    int RecvData(ClientSocket* pClient)
    {

        //接收客户端数据
        char* szRecv = pClient->msgBuf() + pClient->getLastPos();
        int nLen = (int)recv(pClient->sockfd(), szRecv, (RECV_BUFF_SZIE) - pClient->getLastPos(), 0);
        _pNetEvent->OnNetRecv(pClient);
        //printf("nLen=%d\n", nLen);
        if (nLen <= 0)
        {
            //printf("客户端<Socket=%d>已退出,任务结束。\n", pClient->sockfd());
            return -1;
        }
        //将收取到的数据拷贝到消息缓冲区
        //memcpy(pClient->msgBuf() + pClient->getLastPos(), _szRecv, nLen);
        //消息缓冲区的数据尾部位置后移
        pClient->setLastPos(pClient->getLastPos() + nLen);

        //判断消息缓冲区的数据长度大于消息头DataHeader长度
        while (pClient->getLastPos() >= sizeof(DataHeader))
        {
            //这时就可以知道当前消息的长度
            DataHeader* header = (DataHeader*)pClient->msgBuf();
            //判断消息缓冲区的数据长度大于消息长度
            if (pClient->getLastPos() >= header->dataLength)
            {
                //消息缓冲区剩余未处理数据的长度
                int nSize = pClient->getLastPos() - header->dataLength;
                //处理网络消息
                OnNetMsg(pClient, header);
                //将消息缓冲区剩余未处理数据前移
                memcpy(pClient->msgBuf(), pClient->msgBuf() + header->dataLength, nSize);
                //消息缓冲区的数据尾部位置前移
                pClient->setLastPos(nSize);
            }
            else {
                //消息缓冲区剩余数据不够一条完整消息
                break;
            }
        }
        return 0;
    }

    //响应网络消息
    virtual void OnNetMsg(ClientSocket* pClient, DataHeader* header)
    {
        _pNetEvent->OnNetMsg(pClient, header);
    }

    void addClient(ClientSocket* pClient)
    {
        std::lock_guard<std::mutex> lock(_mutex);
        //_mutex.lock();
        _clientsBuff.push_back(pClient);
        //_mutex.unlock();
    }

    void Start()
    {
        _thread = std::thread(std::mem_fn(&CellServer::OnRun), this);
    }

    size_t getClientCount()
    {
        return _clients.size() + _clientsBuff.size();
    }
private:
    SOCKET _sock;
    //正式客户队列
    std::map<SOCKET, ClientSocket*> _clients;
    //缓冲客户队列
    std::vector<ClientSocket*> _clientsBuff;
    //缓冲队列的锁
    std::mutex _mutex;
    std::thread _thread;
    //网络事件对象
    INetEvent* _pNetEvent;
};

class EasyTcpServer : public INetEvent
{
private:
    SOCKET _sock;
    //消息处理对象,内部会创建线程
    std::vector<CellServer*> _cellServers;
    //每秒消息计时
    CELLTimestamp _tTime;
protected:
    //SOCKET recv计数
    std::atomic_int _recvCount;
    //收到消息计数
    std::atomic_int _msgCount;
    //客户端计数
    std::atomic_int _clientCount;
public:
    EasyTcpServer()
    {
        _sock = INVALID_SOCKET;
        _recvCount = 0;
        _msgCount = 0;
        _clientCount = 0;
    }
    virtual ~EasyTcpServer()
    {
        Close();
    }
    //初始化Socket
    SOCKET InitSocket()
    {
#ifdef _WIN32
        //启动Windows socket 2.x环境
        WORD ver = MAKEWORD(2, 2);
        WSADATA dat;
        WSAStartup(ver, &dat);
#endif
        if (INVALID_SOCKET != _sock)
        {
            printf("<socket=%d>关闭旧连接...\n", (int)_sock);
            Close();
        }
        _sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
        if (INVALID_SOCKET == _sock)
        {
            printf("错误,建立socket失败...\n");
        }
        else {
            printf("建立socket=<%d>成功...\n", (int)_sock);
        }
        return _sock;
    }

    //绑定IP和端口号
    int Bind(const char* ip, unsigned short port)
    {
        //if (INVALID_SOCKET == _sock)
        //{
        //  InitSocket();
        //}
        // 2 bind 绑定用于接受客户端连接的网络端口
        sockaddr_in _sin = {};
        _sin.sin_family = AF_INET;
        _sin.sin_port = htons(port);//host to net unsigned short

#ifdef _WIN32
        if (ip) {
            _sin.sin_addr.S_un.S_addr = inet_addr(ip);
        }
        else {
            _sin.sin_addr.S_un.S_addr = INADDR_ANY;
        }
#else
        if (ip) {
            _sin.sin_addr.s_addr = inet_addr(ip);
        }
        else {
            _sin.sin_addr.s_addr = INADDR_ANY;
        }
#endif
        int ret = bind(_sock, (sockaddr*)&_sin, sizeof(_sin));
        if (SOCKET_ERROR == ret)
        {
            printf("错误,绑定网络端口<%d>失败...\n", port);
        }
        else {
            printf("绑定网络端口<%d>成功...\n", port);
        }
        return ret;
    }

    //监听端口号
    int Listen(int n)
    {
        // 3 listen 监听网络端口
        int ret = listen(_sock, n);
        if (SOCKET_ERROR == ret)
        {
            printf("socket=<%d>错误,监听网络端口失败...\n", _sock);
        }
        else {
            printf("socket=<%d>监听网络端口成功...\n", _sock);
        }
        return ret;
    }

    //接受客户端连接
    SOCKET Accept()
    {
        // 4 accept 等待接受客户端连接
        sockaddr_in clientAddr = {};
        int nAddrLen = sizeof(sockaddr_in);
        SOCKET cSock = INVALID_SOCKET;
#ifdef _WIN32
        cSock = accept(_sock, (sockaddr*)&clientAddr, &nAddrLen);
#else
        cSock = accept(_sock, (sockaddr*)&clientAddr, (socklen_t *)&nAddrLen);
#endif
        if (INVALID_SOCKET == cSock)
        {
            printf("socket=<%d>错误,接受到无效客户端SOCKET...\n", (int)_sock);
        }
        else
        {
            //将新客户端分配给客户数量最少的cellServer
            addClientToCellServer(new ClientSocket(cSock));
            //获取IP地址 inet_ntoa(clientAddr.sin_addr)
        }
        return cSock;
    }

    void addClientToCellServer(ClientSocket* pClient)
    {
        //查找客户数量最少的CellServer消息处理对象
        auto pMinServer = _cellServers[0];
        for (auto pCellServer : _cellServers)
        {
            if (pMinServer->getClientCount() > pCellServer->getClientCount())
            {
                pMinServer = pCellServer;
            }
        }
        pMinServer->addClient(pClient);
        OnNetJoin(pClient);
    }

    void Start(int nCellServer)
    {
        for (int n = 0; n < nCellServer; n++)
        {
            auto ser = new CellServer(_sock);
            _cellServers.push_back(ser);
            //注册网络事件接受对象
            ser->setEventObj(this);
            //启动消息处理线程
            ser->Start();
        }
    }
    //关闭Socket
    void Close()
    {
        if (_sock != INVALID_SOCKET)
        {
#ifdef _WIN32
            //关闭套节字closesocket
            closesocket(_sock);
            //------------
            //清除Windows socket环境
            WSACleanup();
#else
            //关闭套节字closesocket
            close(_sock);
#endif
        }
    }
    //处理网络消息
    bool OnRun()
    {
        if (isRun())
        {
            time4msg();
            //伯克利套接字 BSD socket
            fd_set fdRead;//描述符(socket) 集合
                          //清理集合
            FD_ZERO(&fdRead);
            //将描述符(socket)加入集合
            FD_SET(_sock, &fdRead);
            ///nfds 是一个整数值 是指fd_set集合中所有描述符(socket)的范围,而不是数量
            ///既是所有文件描述符最大值+1 在Windows中这个参数可以写0
            timeval t = { 0,10 };
            int ret = select(_sock + 1, &fdRead, 0, 0, &t); //
            if (ret < 0)
            {
                printf("Accept Select任务结束。\n");
                Close();
                return false;
            }
            //判断描述符(socket)是否在集合中
            if (FD_ISSET(_sock, &fdRead))
            {
                FD_CLR(_sock, &fdRead);
                Accept();
                return true;
            }
            return true;
        }
        return false;
    }
    //是否工作中
    bool isRun()
    {
        return _sock != INVALID_SOCKET;
    }

    //计算并输出每秒收到的网络消息
    void time4msg()
    {
        auto t1 = _tTime.getElapsedSecond();
        if (t1 >= 1.0)
        {
            printf("thread<%d>,time<%lf>,socket<%d>,clients<%d>,recv<%d>,msg<%d>\n", _cellServers.size(), t1, _sock, (int)_clientCount, (int)(_recvCount / t1), (int)(_msgCount / t1));
            _recvCount = 0;
            _msgCount = 0;
            _tTime.update();
        }
    }
    //只会被一个线程触发 安全
    virtual void OnNetJoin(ClientSocket* pClient)
    {
        _clientCount++;
    }
    //cellServer 4 多个线程触发 不安全
    //如果只开启1个cellServer就是安全的
    virtual void OnNetLeave(ClientSocket* pClient)
    {
        _clientCount--;
    }
    //cellServer 4 多个线程触发 不安全
    //如果只开启1个cellServer就是安全的
    virtual void OnNetMsg(ClientSocket* pClient, DataHeader* header)
    {
        _recvCount++;
    }
};

#endif // !_EasyTcpServer_hpp_

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top