30_客户端升级异步收送

客户端升级

共享depends文件夹,使用CELLClient类精简easytcpclient类;并进行异步收发,判断套机字是否可写

EasyTcpClient.hpp的修改

#ifndef _EasyTcpClient_hpp_
#define _EasyTcpClient_hpp_

#include"CELL.hpp"
#include"CELLNetWork.hpp"
#include"MessageHeader.hpp"
#include"CELLClient.hpp"

class EasyTcpClient
{
public:
    EasyTcpClient()
    {
        _isConnect = false;
    }

    virtual ~EasyTcpClient()
    {
        Close();
    }
    //初始化socket
    void InitSocket()
    {
        CELLNetWork::Init();

        if (_pClient)
        {
            CELLLog::Info("warning, initSocket close old socket<%d>...\n", (int)_pClient->sockfd());
            Close();
        }
        SOCKET sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
        if (INVALID_SOCKET == sock)
        {
            CELLLog::Info("error, create socket failed...\n");
        }
        else {
            //CELLLog::Info("create socket<%d> success...\n", (int)sock);
            _pClient = new CELLClient(sock);
        }
    }

    //连接服务器
    int Connect(const char* ip,unsigned short port)
    {
        if (!_pClient)
        {
            InitSocket();
        }
        // 2 连接服务器 connect
        sockaddr_in _sin = {};
        _sin.sin_family = AF_INET;
        _sin.sin_port = htons(port);
#ifdef _WIN32
        _sin.sin_addr.S_un.S_addr = inet_addr(ip);
#else
        _sin.sin_addr.s_addr = inet_addr(ip);
#endif
        //CELLLog::Info("<socket=%d> connecting <%s:%d>...\n", (int)_pClient->sockfd(), ip, port);
        int ret = connect(_pClient->sockfd(), (sockaddr*)&_sin, sizeof(sockaddr_in));
        if (SOCKET_ERROR == ret)
        {
            CELLLog::Info("<socket=%d> connect <%s:%d> failed...\n", (int)_pClient->sockfd(), ip, port);
        }
        else {
            _isConnect = true;
            //CELLLog::Info("<socket=%d> connect <%s:%d> success...\n", (int)_pClient->sockfd(), ip, port);
        }
        return ret;
    }

    //关闭套节字closesocket
    void Close()
    {
        if (_pClient)
        {
            delete _pClient;
            _pClient = nullptr;
        }
        _isConnect = false;
    }

    //处理网络消息
    bool OnRun()
    {
        if (isRun())
        {
            SOCKET _sock = _pClient->sockfd();

            fd_set fdRead;
            FD_ZERO(&fdRead);
            FD_SET(_sock, &fdRead);

            fd_set fdWrite;
            FD_ZERO(&fdWrite);

            timeval t = { 0,1 };
            int ret = 0;
            if (_pClient->needWrite())  //在客户端中添加可写判断,在发送缓冲区中有数据时判断
            {
                FD_SET(_sock, &fdWrite);
                ret = select(_sock + 1, &fdRead, &fdWrite, nullptr, &t);
            }else {
                ret = select(_sock + 1, &fdRead, nullptr, nullptr, &t);
            }

            if (ret < 0)
            {
                CELLLog::Info("error,<socket=%d>OnRun.select exit\n", (int)_sock);
                Close();
                return false;
            }

            if (FD_ISSET(_sock, &fdRead))
            {
                if (-1 == RecvData(_sock))
                {
                    CELLLog::Info("error,<socket=%d>OnRun.select RecvData exit\n", (int)_sock);
                    Close();
                    return false;
                }
            }

            if (FD_ISSET(_sock, &fdWrite))
            {
                if (-1 == _pClient->SendDataReal())
                {
                    CELLLog::Info("error,<socket=%d>OnRun.select SendDataReal exit\n", (int)_sock);
                    Close();
                    return false;
                }
            }
            return true;
        }
        return false;
    }

    //是否工作中
    bool isRun()
    {
        return _pClient && _isConnect;
    }

    //接收数据 处理粘包 拆分包
    int RecvData(SOCKET cSock)
    {
        //接收客户端数据
        int nLen = _pClient->RecvData();
        if (nLen > 0)
        {
            //循环 判断是否有消息需要处理
            while (_pClient->hasMsg())
            {
                //处理网络消息
                OnNetMsg(_pClient->front_msg());
                //移除消息队列(缓冲区)最前的一条数据
                _pClient->pop_front_msg();
            }
        }
        return nLen;
    }

    //响应网络消息
    virtual void OnNetMsg(netmsg_DataHeader* header) = 0;

    //发送数据
    int SendData(netmsg_DataHeader* header)
    {
        return _pClient->SendData(header);
    }
protected:
    CELLClient* _pClient = nullptr;
    bool _isConnect = false;
};

#endif 

client.cpp

#include"EasyTcpClient.hpp"
#include"CELLTimestamp.hpp"
#include<thread>
#include<atomic>


class MyClient : public EasyTcpClient
{
public:
    //响应网络消息
    virtual void OnNetMsg(netmsg_DataHeader* header)
    {
        switch (header->cmd)
        {
        case CMD_LOGIN_RESULT:
        {
            netmsg_LoginR* login = (netmsg_LoginR*)header;
            //CELLLog::Info("<socket=%d> recv msgType:CMD_LOGIN_RESULT\n", (int)_pClient->sockfd());
        }
        break;
        case CMD_LOGOUT_RESULT:
        {
            netmsg_LogoutR* logout = (netmsg_LogoutR*)header;
            //CELLLog::Info("<socket=%d> recv msgType:CMD_LOGOUT_RESULT\n", (int)_pClient->sockfd());
        }
        break;
        case CMD_NEW_USER_JOIN:
        {
            netmsg_NewUserJoin* userJoin = (netmsg_NewUserJoin*)header;
            //CELLLog::Info("<socket=%d> recv msgType:CMD_NEW_USER_JOIN\n", (int)_pClient->sockfd());
        }
        break;
        case CMD_ERROR:
        {
            CELLLog::Info("<socket=%d> recv msgType:CMD_ERROR\n", (int)_pClient->sockfd());
        }
        break;
        default:
        {
            CELLLog::Info("error, <socket=%d> recv undefine msgType\n", (int)_pClient->sockfd());
        }
        }
    }
private:

};


bool g_bRun = true;
void cmdThread()
{
    while (true)
    {
        char cmdBuf[256] = {};
        scanf("%s", cmdBuf);
        if (0 == strcmp(cmdBuf, "exit"))
        {
            g_bRun = false;
            CELLLog::Info("退出cmdThread线程\n");
            break;
        }
        else {
            CELLLog::Info("不支持的命令。\n");
        }
    }
}

//客户端数量
const int cCount = 1000;
//发送线程数量
const int tCount = 4;
//客户端数组
EasyTcpClient* client[cCount];
std::atomic_int sendCount(0);
std::atomic_int readyCount(0);

void recvThread(int begin, int end)
{
    //CELLTimestamp t;
    while (g_bRun)
    {
        for (int n = begin; n < end; n++)
        {
            //if (t.getElapsedSecond() > 3.0 && n == begin)
            //  continue;
            client[n]->OnRun();
        }
    }
}

void sendThread(int id)
{
    CELLLog::Info("thread<%d>,start\n", id);
    //4个线程 ID 1~4
    int c = cCount / tCount;
    int begin = (id - 1)*c;
    int end = id*c;

    for (int n = begin; n < end; n++)
    {
        client[n] = new MyClient();
    }
    for (int n = begin; n < end; n++)
    {
        //win10 "192.168.1.102" i5 6300
        //win7 "192.168.1.114" i7 2670qm
        //127.0.0.1
        //39.108.13.69
        //ubuntu vm 192.168.74.141
        //macOS vm 192.168.74.134
        client[n]->Connect("192.168.1.102", 4567);
    }
    //心跳检测 死亡计时 
    CELLLog::Info("thread<%d>,Connect<begin=%d, end=%d>\n", id, begin, end);

    readyCount++;
    while (readyCount < tCount)
    {//等待其它线程准备好发送数据
        std::chrono::milliseconds t(10);
        std::this_thread::sleep_for(t);
    }
    //
    std::thread t1(recvThread, begin, end);
    t1.detach();
    //
    netmsg_Login login[1];
    for (int n = 0; n < 1; n++)
    {
        strcpy(login[n].userName, "lyd");
        strcpy(login[n].PassWord, "lydmm");
    }
    const int nLen = sizeof(login);

    while (g_bRun)
    {
        for (int n = begin; n < end; n++)
        {
            if (SOCKET_ERROR != client[n]->SendData(login))
            {
                sendCount++;
            }
        }
        std::chrono::milliseconds t(99);
        std::this_thread::sleep_for(t);
    }

    for (int n = begin; n < end; n++)
    {
        client[n]->Close();
        delete client[n];
    }

    CELLLog::Info("thread<%d>,exit\n", id);
}

int main()
{
    CELLLog::Instance().setLogPath("clientLog.txt", "w");
    //启动UI线程
    std::thread t1(cmdThread);
    t1.detach();

    //启动发送线程
    for (int n = 0; n < tCount; n++)
    {
        std::thread t1(sendThread,n+1);
        t1.detach();
    }

    CELLTimestamp tTime;

    while (g_bRun)
    {
        auto t = tTime.getElapsedSecond();
        if (t >= 1.0)
        {
            CELLLog::Info("thread<%d>,clients<%d>,time<%lf>,send<%d>\n",tCount, cCount,t,(int)(sendCount/ t));
            sendCount = 0;
            tTime.update();
        }
        std::chrono::milliseconds ts(1);
        std::this_thread::sleep_for(ts);
    }

    CELLLog::Info("已退出。\n");
    return 0;
}

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top