C++流媒体服务器 ZLMediaKit框架ZLToolKit源码解读

发布时间:2024年01月07日

ZLMediaKit是国人开发的开源C++流媒体服务器,同SRS一样是主流的流媒体服务器。
ZLToolKit是基于C++11的高性能服务器框架,和ZLMediaKit是同一个作者,ZLMediaKit正是使用该框架开发的。

ZLMediaKit开源地址:https://github.com/ZLMediaKit/ZLMediaKit
ZLToolKit开源地址:https://github.com/ZLMediaKit/ZLToolKit

推荐ZLToolKit的理由
1、基于C++11,大量使用C++11新特性,如智能指针、lambda表达式等,安全性高,是高度运用C++特性的框架。
2、ZLMediaKit是应用ZLToolKit开发的,可以看到框架的使用实例,且ZLMediaKit流媒体服务器被全世界开发者使用,相当于是在测试ZLToolKit框架,因此框架的实用性和稳定性很高。
3、看过一些github上star数量很高的C++服务器框架,功能模块大同小异,但ZLT是把C++发挥到极致的框架。

ZLToolKit源码框架

主要分为Thread、Poller、Network、Util四大部分。

Thread

semaphore.h(自定义信号量,封装类,由条件变量实现)

class semaphore,接口:post、wait。

TaskExecutor.h(cpu负载计算,Task函数指针模板,任务执行器管理,管理任务执行线程池)

class ThreadLoadCounter,cpu负载计算器,基类,统计线程每一次的睡眠时长和工作时长,并记录样本,调用load计算cpu负载=工作时长/总时长。
class TaskCancelable : public noncopyable,抽象类,可取消任务基类。
class TaskCancelableImp<R(ArgTypes…)> : public TaskCancelable,函数指针模板,event poller async 任务、DelayTask 任务等均使用该类型,任务可取消,重载()运算符执行任务,根据返回值类型,返回默认返回值。
class TaskExecutorInterface,抽象类,提供任务执行接口:async、async_first、sync、sync_first。
class TaskExecutor : public ThreadLoadCounter, public TaskExecutorInterface,任务执行器,抽象类,无新增接口。
class TaskExecutorGetter,获得任务执行器,抽象类,接口:getExecutor、getExecutorSize。
class TaskExecutorGetterImp : public TaskExecutorGetter,实现抽象类接口,提供接口:getExecutorLoad(cpu负载)、for_each(遍历所有线程)、addPoller(创建 EventPoller 线程池)。

TaskQueue.h(由信号量控制的任务队列,加了线程锁,线程安全)

class TaskQueue,接口:push_task、push_exit、get_task、size。

threadgroup.h(线程组管理,创建线程,移除线程)

class thread_group,成员:_threads(umap存储线程组),接口:create_thread、remove_thread、is_thread_in、join_all、size。

ThreadPool.h(线程池任务管理,管理线程组执行任务队列)

class ThreadPool : public TaskExecutor,成员:thread_group、TaskQueueTask::Ptr,接口:start(启动线程池)、async(异步加入任务到队列)。

WorkThreadPool.h(创建一个工作线程池,可以加入线程负载均衡分配算法,类似EventPollerPool)

class WorkThreadPool : public TaskExecutorGetterImp,接口:getPoller、getFirstPoller、setPoolSize。

Poller

Pipe.h(管道对象封装)

class Pipe,成员:std::shared_ptr、EventPoller::Ptr _poller。

PipeWrap.h(管道的封装,windows下由socket模拟)

class PipeWrap,成员:int _pipe_fd[2],接口:write、read。

SelectWrap.h(select 模型的简单封装)

class FdSet

Timer.h(定时器对象)

class Timer,成员:EventPoller::Ptr(引用),构造函数传参超时时长和超时回调函数,EventPoller 选传或自动获取,超时回调在 EventPoller 线程执行。

EventPoller.h(基于epoll事件轮询模块)

class EventPoller : public TaskExecutor, public AnyStorage,基于epoll,可监听fd网络事件,async管道触发执行异步任务,doDelayTask定时器回调任务,runLoop执行事件循环体,添加/删除/修改监听事件,_event_map<网络fd和管道fd,回调>,_delay_task_map<延迟触发时间,回调>,_list_task<异步任务列表Task>。
class EventPollerPool :public TaskExecutorGetterImp,管理 EventPoller 线程池,可创建多个 EventPoller 线程,使用cpu负载均衡算法均匀分配线程,getPoller-》getExecutor 获得线程池内cpu负载最低的 EventPoller 线程。

Network

Buffer.h

class Buffer : public noncopyable,缓存抽象类,纯虚函数:data、size、toString、getCapacity,成员:ObjectStatistic对象个数统计。
class BufferOffset : public Buffer,成员:typename _data,构造函数传参offset,data获取+offset偏移的buffer。
class BufferRaw : public Buffer,成员:char *_data,接口:setCapacity分配,assign赋值,指针式缓存,根据分配内存大小自动扩减容。
class BufferLikeString : public Buffer,成员:std::string _str,接口:erase、append、push_back、insert、assign、clear、capacity、reserve、resize、empty、substr等,字符串操作缓存。

BufferSock.h

class BufferSock : public Buffer,成员:Buffer::Ptr _buffer、sockaddr_storage,管理_buffer指向的缓存。
class BufferList : public noncopyable,抽象类,接口:create、empty、count、send。
内部类
class BufferCallBack,成员:BufferList::SendResult回调函数,List<std::pair<Buffer::Ptr, bool> > 缓存列表,接口:sendFrontSuccess、sendCompleted,发送结果回调。
class BufferSendMsg final : public BufferList, public BufferCallBack,成员:_remain_size(剩余字节数)、_iovec(data和len组成的vector)、_iovec_off(_iovec当前发送下标),接口:send、send_l(执行系统调用sendmsg),socket发送数据时的buffer封装,用于tcp发送。
class BufferSendTo final: public BufferList, public BufferCallBack,接口:send(执行系统调用::sendto和::send)。
class BufferSendMMsg : public BufferList, public BufferCallBack,和 BufferSendMsg 类似,用于udp发送。

Server.h

class SessionMap,成员:std::unordered_map<std::string, std::weak_ptr >,管理Session,add、del、get。
class SessionHelper,成员:Session::Ptr、SessionMap::Ptr、Server,记录session至全局的map,方便后面管理。
class Server : public mINI,成员:EventPoller::Ptr,初始化设置EventPoller线程。

Session.h

class TcpSession : public Session
class UdpSession : public Session
class Session : public SocketHelper,成员:std::unique_ptr<toolkit::ObjectStatistictoolkit::TcpSession >、std::unique_ptr<toolkit::ObjectStatistictoolkit::UdpSession >,用于存储一对客户端与服务端间的关系。

Socket.h

typedef enum ErrCode,自定义socket错误枚举。
class SockException : public std::exception,成员:ErrCode,错误信息类,用于抛出系统和自定义异常,接口:what、getErrCode、getCustomCode、reset。
typedef enum SockType,socket类型,udp、tcp、tcpserver。
class SockNum,成员:int _fd、SockType _type,析构时关闭socket。
class SockFD : public noncopyable,成员:SockNum、EventPoller,文件描述符fd的封装,析构时停止事件监听,关闭socket。
class MutexWrapper,接口:lock、unlock,线程锁的封装,默认使用递归锁recursive_mutex。
class SockInfo,抽象类,接口:get_local_ip、get_local_port、get_peer_ip、get_peer_port、getIdentifier。
class Socket : public noncopyable, public SockInfo,成员:SockFD、EventPoller(网络事件触发和异步执行在此线程),异步IO Socket对象,包括tcp客户端、服务器和udp套接字,包含:错误回调、接收数据回调、tcp服务监听进入回调,connect、listen、send等接口的封装。
class SockSender,抽象类,接口:send、shutdown,重载运算符<<发送数据,定义socket发送接口。
class SocketHelper : public SockSender, public SockInfo, public TaskExecutorInterface,抽象类,成员:Socket、EventPoller,主要是对Socket类的二次封装,自定义类继承该类,实现纯虚接口即可创建一个完整的socket类,比如tcpclient。
class SockUtil,套接字工具类,封装了socket、网络的一些基本操作,提供静态全局接口,比如connect、listen等。
class TcpClient : public SocketHelper,抽象类,Tcp客户端,自定义类继承与该类,实现onConnect、onManager回调即可创建一个可运行的tcp客户端。
class TcpServer : public Server,可配置的TCP服务器。
class UdpServer : public Server,可配置的UDP服务器。

Util

NoticeCenter.h(通知中心)

class EventDispatcher,成员:std::unordered_multimap<void *, Any>(first指针,多个对象监听相同事件传的指针必须不同,second是监听该事件的回调),recursive_mutex,事件分发器,监听同一个事件的回调。
class NoticeCenter,成员:std::unordered_map<std::string, EventDispatcher::Ptr>(first事件名,second分发器),recursive_mutex,接口:emitEvent,addListener,delListener,广播中心,全局单例。

ResourcePool.h(资源池)

class shared_ptr_imp : public std::shared_ptr,对智能指针封装,增加接口:quit,放弃或回收到资源池。
class ResourcePool_l,成员:std::vector<C *> _objs(C对象指针内存数组),std::atomic_flag _busy(原子锁,线程安全),接口:obtain、obtain2,内存池功能实现。
class ResourcePool,成员:std::shared_ptr<ResourcePool_l> pool,接口:obtain、obtain2,封装内存池对外接口。

mini.h(读写配置文件)

class mINI_basic : public std::map<key, variant>,接口:parseFile(解析配置文件)、dumpFile(保存配置文件),实际上是个map,保存的是配置文件键值对。
struct variant : public std::string,把任何配置项按 std::string 字符串处理。
using mINI = mINI_basic<std::string, variant>,mINI::Instance() 是全局单例对象,管理<key,value>配置项。

CMD.h(命令行参数解析)

class Option,选项类,成员:_short_opt(短选项名)、_long_opt(长选项名)、_des(描述)、_default_value(默认值)、_cb(回调)、_type(参数类型)。
class OptionParser,选项解析类,成员:Option _helper(初始化帮助选项)、std::map<char, int> _map_char_index(短选项名映射)、std::map<int, Option> _map_options(选项映射),接口:重载 operator<< 增加选项,delOption 删除选项。
class CMD : public mINI,成员:std::shared_ptr _parser,接口:重载 operator() 解析命令行参数,hasKey(是否存在key),splitedVal(按分隔符分隔字符串)。
class CMDRegister,全局单例对象,成员:std::map<std::string, std::shared_ptr > _cmd_map,接口:registCMD,宏:GET_CMD、CMD_DO、REGIST_CMD。

ZLToolKit源码测试

在tests/文件夹中有作者写的测试程序,这里记录我对框架关键模块的测试。

EventPollerPool事件循环线程池测试

框架的核心是 EventPoller 事件循环线程,由 EventPollerPool 管理多个 EventPoller 组成的线程池。
在这里插入图片描述

EventPollerPool,获取一个可用的线程池。
EventPollerPool 是全局单例对象,用来管理多个 EventPoller 组成的线程池,后者是基于 epoll 实现的线程。
EventPoller 对象 只能在 EventPollerPool 中构造,EventPollerPool 管理 EventPoller 线程池。
EventPollerPool 负责创建和管理 EventPoller 对象, 可获取当前EventPoller线程,最低负荷EventPoller线程,第一个EventPoller线程等,也可以自定义规则获取EventPoller线程对象。

EventPoller 线程主要处理:定时器(Timer)、异步任务(async)、网络事件(socket),epoll 监听管道fd和 socket fd 事件,加入 _event_map<fd,CB>。

1、定时器(Timer):可由任意线程调用,线程安全,异步加入延时任务队列 _delay_task_map<触发时间(ms),Task> ,距离最近触发定时器时间传入 epoll_wait 的超时时间,每次循环检测定时器队列,触发回调。
2、异步任务(async):可由任意线程调用,任务加入 _list_task 队列,有锁线程安全,并通过写管道 _pipe 唤醒epoll线程执行任务。
3、网络事件(socket):EventPoller 智能指针可以和 socket 绑定,监听处理fd接收/断开/错误等网络事件;socket 数据发送可以在单线程或任意线程进行(enable_mutex),线程锁 _mtx_sock_fd 。

#ifndef TEST_EVENTPOLLERPOOL_H
#define TEST_EVENTPOLLERPOOL_H

#include <csignal>
#include <iostream>
#include "Util/logger.h"
#include "Network/TcpClient.h"
using namespace std;
using namespace toolkit;
void test_EventPollerPool() {
    //全局单例,获取实例即执行构造,addPoller 创建线程池,线程保存在其基类成员: std::vector<TaskExecutor::Ptr> _threads;
    //默认创建线程个数=CPU个数,也可以 setPoolSize 设置线程个数
    EventPollerPool::Instance();

    //从线程池返回一个 EventPoller 线程的智能指针,增加其引用计数
    //可以选择优先返回当前线程,或返回最低负荷线程
    std::shared_ptr<EventPoller> poller1 = EventPollerPool::Instance().getPoller();
    std::shared_ptr<EventPoller> poller2 = EventPollerPool::Instance().getPoller();
    printf("use_count=%ld\n",poller1.use_count());//use_count=3

    printf("main threadid=%ld\n",pthread_self());
    //异步执行,可以在任意线程调用,lambda 表达式在 EventPoller 线程异步执行
    //通过 lambda 表达式传参,把 lambda 这个匿名函数加入 EventPoller 线程的 List<Task::Ptr> _list_task 任务队列,Task 无参无返回值。
    int num = 15;
    poller1->async([num](){
        printf("poller1 threadid=%ld,num=%d\n",pthread_self(),num);
    });

    //定时器(Timer)参考: test_timer.h
    //网络事件(socket)参考: TestClient.h

    /**
     * 打印:
     * use_count=3
     * main threadid=139907182602176
     * poller1 threadid=139907174205184,num=15
     */

    //退出程序事件处理
    static semaphore sem;
    signal(SIGINT, [](int) { sem.post(); });// 设置退出信号
    sem.wait();
}
#endif // TEST_EVENTPOLLERPOOL_H

WorkThreadPool工作线程池

WorkThreadPool,获取一个可用的线程池。
WorkThreadPool 是全局单例对象,和 EventPollerPool 功能几乎一致,都是管理 EventPoller 所组成的线程池。
EventPollerPool 为了线程安全,支持优先返回当前线程,也可以选择返回最低负荷线程; WorkThreadPool 只返回最低负荷线程。

EventPollerPool 通常用于实时性较高的业务,比如定时器、fd网络事件等,该线程不应该被耗时业务阻塞。
ZLM 使用 WorkThreadPool 主要用于文件读写、DNS解析、mp4关闭等耗时的工作,完成后再通过 EventPollerPool::async 切回自己的线程。

Timer定时器测试

测试定时器
Ticker类:可以统计代码执行时间,一般的计时统计。
Timer类:定时器类,构造函数传参超时时长、回调函数等,根据回调函数判断是否重复下次任务,回调函数在event poller线程异步执行。
程序运行5个线程:stamp thread、QT_ZLToolKit、async log、event poller 0、event poller 1。

调用栈

1、添加定时器(任意线程):Timer::Timer-》EventPoller::doDelayTask-》async_first(异步执行,把定时器任务添加到 _delay_task_map)-》EventPoller::async_l(_list_task.emplace_front)。
2、定时器超时:EventPoller::runLoop-》EventPoller::getMinDelay-》EventPoller::flushDelayTask-》(*(it->second))(),在这里执行 Timer 构造函数第二个参数传的回调函数。

实现原理
创建定时器时把延迟触发时间和回调函数传参加入 _delay_task_map,时间会加上当前时间(now+delay_ms),event poller 线程轮询所有定时器,比较当前时间now与上述 _delay_task_map 里的时间,超时则执行回调函数。

实现技巧
1、先看 event poller 线程的唤醒,如果是网络事件(比如接收tcp数据)可以直接唤醒 epoll_wait ,新加入异步任务是通过管道唤醒;
2、Timer 定时器任务则是依赖 event poller 线程轮询检测是否超时,那多久检测一次(也就是 epoll_wait 超时时间)?这里在每次执行 getMinDelay 时会把最近定时器超时时长作为返回值,传参给 epoll_wait,下次线程唤醒时正好最近的定时器超时,执行任务;
3、这样既保证线程不会过度轮询浪费cpu资源,也可以保证定时器任务能尽快执行。
4、EventPoller::doDelayTask 加入 _delay_task_map 是在 event poller 线程异步执行,通过写管道唤醒 event poller 线程,可以刷新 minDelay=getMinDelay 时间,也就是下次唤醒 epoll_wait 的时间。

#ifndef TEST_TIMER_H
#define TEST_TIMER_H

#include <csignal>
#include <iostream>
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Poller/Timer.h"

using namespace std;
using namespace toolkit;
void test_timer() {
    //设置日志
    Logger::Instance().add(std::make_shared<ConsoleChannel>());
    Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());


    Ticker ticker0;
    Timer::Ptr timer0 = std::make_shared<Timer>(0.5f,[&](){
        TraceL << "timer0重复:" << ticker0.elapsedTime();
        ticker0.resetTime();
        return true;
    }, nullptr);

    Timer::Ptr timer1 = std::make_shared<Timer>(1.0f,[](){
        DebugL << "timer1不再重复";
        return false;
    },nullptr);

    Ticker ticker2;
    Timer::Ptr timer2 = std::make_shared<Timer>(2.0f,[&]() -> bool {
        InfoL << "timer2,测试任务中抛异常" << ticker2.elapsedTime();
        ticker2.resetTime();
        throw std::runtime_error("timer2,测试任务中抛异常");
    },nullptr);

    //退出程序事件处理
    static semaphore sem;
    signal(SIGINT, [](int) { sem.post(); });// 设置退出信号
    sem.wait();
}
#endif // TEST_TIMER_H

TcpClient测试

自定义类,继承于TcpClient,并重写onConnect、onRecv等虚函数,根据需求实现相应功能
程序运行5个线程:stamp thread、QT_ZLToolKit、async log、event poller 0、event poller 1

调用栈:
发起连接(主线程或其他线程):test_TcpClient->TcpClient::startConnect->Socket::connect,_poller->async加入 event poller 线程异步执行连接任务。
异步执行连接(event poller 线程):EventPoller::runLoop->EventPoller::onPipeEvent->Socket::connect->Socket::connect_l->async_con_cb(SockUtil::connect)->::connect,开始连接。
这里使用的是非阻塞连接,connect返回EINPROGRESS则表示正在连接,async_con_cb->_poller->addEvent给fd添加可写事件,添加成功则说明连接成功。

连接结果回调(eventpoller线程):async_con_cb-》Socket::onConnected(Socket::attachEvent 添加epoll监听读事件,监听接收数据)-》con_cb-》con_cb_in-》TcpClient::onSockConnect-》TestClient::onConnect。

数据接收回调:TcpClient::onSockConnect-》Socket::setOnRead-》_on_read=TestClient::onRecv
数据接收(eventpoller线程):EventPoller::runLoop-》epoll_wait监听到事件-》Socket::onRead-》_on_read-》TestClient::onRecv。

数据发送:demo里 TcpClient::startConnect 会创建定时器:_timer,每隔2秒回调一次 TestClient::onManager ,执行数据发送,定时器超时回调是在 event poller 线程,socket跨线程安全,线程锁:Socket::_mtx_sock_fd。
EventPoller::runLoop->EventPoller::getMinDelay->EventPoller::flushDelayTask(_timer 定时器超时)->TestClient::onManager->SockSender::<< ->SockSender::send->SocketHelper::send->Socket::send->Socket::send_l->Socket::flushAll->Socket::flushData->BufferSendMsg::send->BufferSendMsg::send_l->sendmsg(系统调用)。

小结
发起tcp连接可以在任意线程,非阻塞连接任务是在 event poller 线程异步执行,连接成功会添加 epoll 事件监听数据接收,发送数据可以在任意线程,使用线程锁保证线程安全。

#ifndef TESTCLIENT_H
#define TESTCLIENT_H

#include <csignal>
#include <iostream>
#include "Util/logger.h"
#include "Network/TcpClient.h"
using namespace std;
using namespace toolkit;
class TestClient: public TcpClient {
public:
    using Ptr = std::shared_ptr<TestClient>;
    TestClient():TcpClient() {
        DebugL;
    }
    ~TestClient(){
        DebugL;
    }
protected:
    virtual void onConnect(const SockException &ex) override{
        //连接结果事件
        InfoL << (ex ?  ex.what() : "success");
    }
    virtual void onRecv(const Buffer::Ptr &pBuf) override{
        //接收数据事件
        DebugL << pBuf->data() << " from port:" << get_peer_port();
    }
    virtual void onFlush() override{
        //发送阻塞后,缓存清空事件
        DebugL;
    }
    virtual void onError(const SockException &ex) override{
        //断开连接事件,一般是EOF
        WarnL << ex.what();
    }
    virtual void onManager() override{
        //定时发送数据到服务器
        auto buf = BufferRaw::create();
        if(buf){
            buf->assign("[BufferRaw]\0");
            (*this) << _nTick++ << " "
                    << 3.14 << " "
                    << string("string") << " "
                    <<(Buffer::Ptr &)buf;
        }
    }
private:
    int _nTick = 0;
};

int test_TcpClient() {
    // 设置日志系统
    Logger::Instance().add(std::make_shared<ConsoleChannel>());
    Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());

    TestClient::Ptr client(new TestClient());//必须使用智能指针
    client->startConnect("192.168.3.64",9090);//连接服务器

//    TcpClientWithSSL<TestClient>::Ptr clientSSL(new TcpClientWithSSL<TestClient>());//必须使用智能指针
//    clientSSL->startConnect("192.168.3.64",9090);//连接服务器

    //退出程序事件处理
    static semaphore sem;
    ///SIGINT:Ctrl+C发送信号,结束程序
    signal(SIGINT, [](int) { sem.post(); });// 设置退出信号
    sem.wait();
    return 0;
}
#endif // TESTCLIENT_H

ThreadPool任务线程池测试

线程池,可以输入functional任务至后台线程执行。

构造函数中创建线程组 thread_group _thread_group,从任务队列 TaskQueueTask::Ptr _queue 获取任务在线程池执行。

线程池中所有线程共用一个任务队列 _queue,ThreadPool 无锁,但 _queue 中有锁,目前ZLM中没有 ThreadPool 应用。

#ifndef TEST_THREADPOOL_H
#define TEST_THREADPOOL_H

#include <chrono>
#include "Util/logger.h"
#include "Util/onceToken.h"
#include "Util/TimeTicker.h"
#include "Thread/ThreadPool.h"

using namespace std;
using namespace toolkit;

/**
 * @brief thread_group :线程组,移植自boost。
 * create_thread 快速创建一组线程,并指定参数和线程处理函数。
 */
int test_ThreadPool() {
    //初始化日志系统
    Logger::Instance().add(std::make_shared<ConsoleChannel>());
    Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());

    ThreadPool pool(thread::hardware_concurrency(), ThreadPool::PRIORITY_HIGHEST, true);

    //每个任务耗时3秒
    auto task_second = 3;
    //每个线程平均执行4次任务,总耗时应该为12秒
    auto task_count = thread::hardware_concurrency() * 4;

    semaphore sem;
    vector<int> vec;
    vec.resize(task_count);
    Ticker ticker;
    {
        //放在作用域中确保token引用次数减1
        auto token = std::make_shared<onceToken>(nullptr, [&]() {
            sem.post();
        });

        for (auto i = 0; i < task_count; ++i) {
            pool.async([token, i, task_second, &vec]() {
                setThreadName(("thread pool " + to_string(i)).data());
                std::this_thread::sleep_for(std::chrono::seconds(task_second)); //休眠三秒
                InfoL << "task " << i << " done!";
                vec[i] = i;
            });
        }
    }

    sem.wait();
    InfoL << "all task done, used milliseconds:" << ticker.elapsedTime();

    //打印执行结果
    for (auto i = 0; i < task_count; ++i) {
        InfoL << vec[i];
    }
    return 0;
}
#endif // TEST_THREADPOOL_H

ResourcePool内存池测试

ResourcePool :基于智能指针实现的一个循环池,不需要手动回收对象。

ResourcePool 是个类模板,可传入自定义数据类型C,内存池中存放的是该数据类型C的指针数组 std::vector<C > _objs。
setSize(_pool_size) 可设置内存池最大容量,当超出最大容量,recycle 不再回收该资源,而是直接释放。
obtain 获取内存,返回的是指向内存对象C的自定义智能指针 shared_ptr_imp,特点是提供接口quit,当内存对象使用完后,可以选择不再回收到内存池,此时可以自定义回收或直接释放。
obtain2 获取内存,返回指向内存对象C的智能指针 std::shared_ptr,当智能指针离开作用域时自动回收内存对象C到内存池。
私有接口:
getPtr 获取C原始指针,当 _objs 为空时分配一个新的C
返回,当 _objs 不为空则从 _objs 尾部取已一个C*,并从 _objs 中删除。
recycle 回收C*,如果 _objs 已满(>=_pool_size),直接释放C*,否则回收到 _objs 尾部。

总结
1、内存池中存放的是任意类型数据指针C*,C大小固定或可动态扩容,刚开始内存池是空的,使用时分配内存,用完后回收到内存池,下次再使用时就不用重新分配了,直接用上次分配并回收的C*;
2、不用担心高并发内存池不够用,因为当内存池为空时总会立即分配内存,如果分配的太多,回收时超出内存池大小后会直接释放,合理的内存池大小在高并发时会减少分配和释放的次数。

#ifndef TEST_RESOURCEPOOL_H
#define TEST_RESOURCEPOOL_H

#include <csignal>
#include <iostream>
#include <random>
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/ResourcePool.h"
#include "Thread/threadgroup.h"
#include <list>

using namespace std;
using namespace toolkit;

//程序退出标志
bool g_bExitFlag = false;
class string_imp : public string{
public:
    template<typename ...ArgTypes>
    string_imp(ArgTypes &&...args) : string(std::forward<ArgTypes>(args)...){
        DebugL << "创建string对象:" << this << " " << *this;
    };
    ~string_imp(){
        WarnL << "销毁string对象:" << this << " " << *this;
    }
};


//后台线程任务
void onRun(ResourcePool<string_imp> &pool,int threadNum){
    std::random_device rd;
    while(!g_bExitFlag){
        //从循环池获取一个可用的对象
        auto obj_ptr = pool.obtain();
        if(obj_ptr->empty()){
            //这个对象是全新未使用的
            InfoL << "后台线程 " << threadNum << ":" << "obtain a emptry object!";
        }else{
            //这个对象是循环使用的
            InfoL << "后台线程 " << threadNum << ":" << *obj_ptr;
        }
        //标记该对象被本线程使用
        obj_ptr->assign(StrPrinter << "keeped by thread:" << threadNum );

        //随机休眠,打乱循环使用顺序
        usleep( 1000 * (rd()% 10));
        obj_ptr.reset();//手动释放,也可以注释这句代码。根据RAII的原理,该对象会被自动释放并重新进入循环列队
        usleep( 1000 * (rd()% 1000));
    }
}

int test_ResourcePool() {
    //初始化日志
    Logger::Instance().add(std::make_shared<ConsoleChannel>());
    Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());

    //大小为50的循环池
    ResourcePool<string_imp> pool;
    pool.setSize(50);

    //获取一个对象,该对象将被主线程持有,并且不会被后台线程获取并赋值
    auto reservedObj = pool.obtain();
    //在主线程赋值该对象
    reservedObj->assign("This is a reserved object , and will never be used!");

    thread_group group;
    //创建4个后台线程,该4个线程模拟循环池的使用场景,
    //理论上4个线程在同一时间最多同时总共占用4个对象


    WarnL << "主线程打印:" << "开始测试,主线程已经获取到的对象应该不会被后台线程获取到:" << *reservedObj;

    for(int i = 0 ;i < 4 ; ++i){
        group.create_thread([i,&pool](){
            onRun(pool,i);
        });
    }

    //等待3秒钟,此时循环池里面可用的对象基本上最少都被使用过一遍了
    sleep(3);

    //但是由于reservedObj早已被主线程持有,后台线程是获取不到该对象的
    //所以其值应该尚未被覆盖
    WarnL << "主线程打印: 该对象还在被主线程持有,其值应该保持不变:" << *reservedObj;

    //获取该对象的引用
    auto &objref = *reservedObj;

    //显式释放对象,让对象重新进入循环列队,这时该对象应该会被后台线程持有并赋值
    reservedObj.reset();

    WarnL << "主线程打印: 已经释放该对象,它应该会被后台线程获取到并被覆盖值";

    //再休眠3秒,让reservedObj被后台线程循环使用
    sleep(3);

    //这时,reservedObj还在循环池内,引用应该还是有效的,但是值应该被覆盖了
    WarnL << "主线程打印:对象已被后台线程赋值为:" << objref << endl;

    {
        WarnL << "主线程打印:开始测试主动放弃循环使用功能";

        List<decltype(pool)::ValuePtr> objlist;
        for (int i = 0; i < 8; ++i) {
            reservedObj = pool.obtain();
            string str = StrPrinter << i << " " << (i % 2 == 0 ? "此对象将脱离循环池管理" : "此对象将回到循环池");
            reservedObj->assign(str);
            reservedObj.quit(i % 2 == 0);
            objlist.emplace_back(reservedObj);
        }
    }
    sleep(3);

    //通知后台线程退出
    g_bExitFlag = true;
    //等待后台线程退出
    group.join_all();
    return 0;
}
#endif // TEST_RESOURCEPOOL_H

stampthread时间戳线程测试

测试时间戳线程
只要执行了静态方法initMillisecondThread,就会创建时间戳线程,最多创建1个,线程名称:stamp thread。
提供getCurrentMillisecond和getCurrentMicrosecond接口,获取程序启动到当前时间的毫秒数和微秒数,或从1970年开始到当前时间的毫秒数和微秒数。
程序启动后有两个线程:QT_ZLToolKit(主线程)和stamp thread。

#ifndef TEST_STAMPTHREAD_H
#define TEST_STAMPTHREAD_H

#include "Util/util.h"
#include <sys/time.h>
using namespace std;
using namespace toolkit;

void test_stampthread() {

    uint64_t cur_ms = getCurrentMillisecond(true);
    printf("cur_ms = %lu\n",cur_ms);

    usleep(100*1000);

    cur_ms = getCurrentMillisecond(true);
    printf("cur_ms = %lu\n",cur_ms);
}

#endif // TEST_STAMPTHREAD_H

NoticeCenter广播中心测试

广播中心,可以在程序的任意线程添加监听事件并定义回调;可以在任意线程发出一个事件,通知所有监听了该事件的地方执行回调。

每个事件创建一个分发器 EventDispatcher ,分发器存放监听该事件的key和回调,加线程锁,多线程安全。

NoticeCenter::Instance() 定义对外接口,是全局单例对象,加线程锁,添加/删除事件、发出事件均是多线程安全。

addListener(指针key,事件名,回调) 可以在任意线程添加监听,emitEvent(事件名,参数列表) 可以在任意线程发出一个事件,注意:监听回调是在 emitEvent 所在线程执行的

class EventDispatcher,成员:std::unordered_multimap<void *, Any>(first指针,多个对象监听相同事件传的指针必须不同,second是监听该事件的回调),recursive_mutex,事件分发器,监听同一个事件的回调。
class NoticeCenter,成员:std::unordered_map<std::string, EventDispatcher::Ptr>(first事件名,second分发器),recursive_mutex,接口:emitEvent,addListener,delListener,全局单例。

下面实验:多线程监听相同事件,线程安全。

#ifndef TEST_NOTICECENTER_H
#define TEST_NOTICECENTER_H

#include <csignal>
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/NoticeCenter.h"
using namespace std;
using namespace toolkit;

//定义两个事件,事件是字符串类型
//广播名称1
#define NOTICE_NAME1 "NOTICE_NAME1"
//广播名称2
#define NOTICE_NAME2 "NOTICE_NAME2"

//程序退出标记
bool g_bExitFlag = false;

static void *tag1;
static void *tag2;

void* func0(void*) {
    //addListener方法第一个参数是标签,用来删除监听时使用
    //需要注意的是监听回调的参数列表个数类型需要与emitEvent广播时的完全一致,否则会有无法预知的错误
    NoticeCenter::Instance().addListener(tag1,NOTICE_NAME1,
            [](int &a,const char * &b,double &c,string &d){
        printf("func0=%d\n",a);
    });

    return nullptr;
}

void* func1(void*) {
    NoticeCenter::Instance().addListener(tag2,NOTICE_NAME1,
            [](int &a,const char * &b,double &c,string &d){
        printf("func1=%d\n",a);
    });
    return nullptr;
}

void* func2(void*) {
    //监听NOTICE_NAME2事件
    NoticeCenter::Instance().addListener(0,NOTICE_NAME2,
            [](string &d,double &c,const char *&b,int &a){
        printf("func2=%d\n",a);
    });

    return nullptr;
}

int test_NoticeCenter() {
    //设置程序退出信号处理函数
    signal(SIGINT, [](int){g_bExitFlag = true;});
    //设置日志
    Logger::Instance().add(std::make_shared<ConsoleChannel>());

    pthread_t tid[5];
    pthread_create(&tid[0],nullptr,func0,nullptr);
    pthread_create(&tid[1],nullptr,func1,nullptr);
    pthread_create(&tid[2],nullptr,func2,nullptr);

    int a = 0;
    while(!g_bExitFlag){
        const char *b = "b";
        double c = 3.14;
        string d("d");
        //每隔1秒广播一次事件,如果无法确定参数类型,可加强制转换
        NoticeCenter::Instance().emitEvent(NOTICE_NAME1,++a,(const char *)"b",c,d);
        NoticeCenter::Instance().emitEvent(NOTICE_NAME2,d,c,b,a);
        sleep(1); // sleep 1 second
    }
    return 0;
}
#endif // TEST_NOTICECENTER_H

onceToken测试

RAII [1] (Resource Acquisition Is Initialization)
也称为“资源获取就是初始化”,是C++语言的一种管理资源、避免泄漏的惯用法。
RAII的思想:构造时获取资源,在对象生命周期内保持资源有效,最后对象析构时释放资源。

onceToken
使用RAII模式实现,可以在对象构造和析构时执行一段代码。
也就是在构造时执行一段代码(传nullptr则什么都不执行),在离开作用域时执行一段代码。

在ZLM中,onceToken 主要用于防止在程序抛出异常时提前返回,没有执行接下来的代码。
把一定要执行的代码放在 onceToken 析构时执行,防止程序抛出异常提前返回。
如果要等待异步执行后再析构,在执行 async 时把 onceToken 智能指针作为行参传递给Lambda表达式。

#ifndef TEST_ONCETOKEN_H
#define TEST_ONCETOKEN_H

#include <csignal>
#include "Util/onceToken.h"
#include "Poller/EventPoller.h"
using namespace std;
using namespace toolkit;
void token_start() {
    printf("token start\n");
}

void test_onceToken() {
    EventPoller::Ptr poller = EventPollerPool::Instance().getPoller();

    //异步执行时传递 onceToken 智能指针行参,引用计数加1,等所有的异步执行结束后引用计数变为0,执行析构
    /**
      * 打印:
      * token start
      * async=0
      * async=1
      * async=2
      * token destruct
      */
    {
        auto token = std::make_shared<onceToken>(token_start, []() {
            printf("token destruct\n");
        });

        for (auto i = 0; i < 3; ++i) {
            poller->async([token, i]() {//EventPoller 线程异步执行
                printf("async=%d\n",i);
                std::this_thread::sleep_for(std::chrono::seconds(1)); //休眠1秒
            });
        }
    }

    //退出程序事件处理
    static semaphore sem;
    signal(SIGINT, [](int) { sem.post(); });// 设置退出信号
    sem.wait();
}
#endif // TEST_ONCETOKEN_H

Any数据结构测试

Any可以保存任意类型的数据。

#ifndef TEST_ANY_H
#define TEST_ANY_H

#include <csignal>
#include <iostream>
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Poller/Timer.h"

using namespace std;
using namespace toolkit;

class Student{
public:
    Student(int age):age(age) {
        printf("Student\n");
    }

    ~Student() {
        printf("~Student\n");
    }

    Student(Student &s) {
        printf("Student copy\n");
        age = s.age;
    }

    int age;
};

template <typename FUNC>
void test_func(FUNC &&func) {

}

/**
 * @brief Any :可以保存任意的对象
 */
void test_Any() {

    //1.Any保存类的对象
    {
        Any aa;
        //创建 Student 的智能指针,(17)是构造函数的参数
        aa.set<Student>(17);

        //拷贝构造,get<Student>(bool可选参数)返回智能指针所管理的原始指针的对象引用
        Student S1 = aa.get<Student>();
//        aa.reset();//如果天提前释放aa,则捕获异常,打印: ex=Any is empty
        try{
            printf("aa age=%d\n",aa.get<Student>().age);
        }catch(std::exception& e) {
            printf("ex=%s\n",e.what());
        }

        printf("s1 age=%d\n",S1.age);

        //离开作用域打印:
        // Student
        // Student copy
        // aa age=17
        // s1 age=17
        // ~Student
        // ~Student
    }

    //2.Any保存 function 函数指针模板
    Any bb;
    //set<function函数指针模板的数据类型>(function 的实例化,lambda表达式)
    bb.set<std::function<void(int)>>([](int a){
        printf("a=%d\n",a);
    });

    //获取bb所管理的对象并调用方法,(bool可选参数)(10:调用对象所传的参数)
    bb.get<std::function<void(int)>>()(10);//调用lambda表达式,打印:a=10


    //退出程序事件处理
    static semaphore sem;
    signal(SIGINT, [](int) { sem.post(); });// 设置退出信号
    sem.wait();
}
#endif // TEST_ANY_H

function_traits测试

源码类似于: 《深入应用C++11 代码优化与工程级应用》3.3.6 function_traits

function_traits 用来获取所有函数语义类型的信息(函数类型、返回类型、参数个数和参数的具体类型),通过 stl_function_type 把任意函数转换成 std::function。
函数类型包括:普通函数、函数指针、function/lambda、成员函数、函数对象。

实现function_traits的关键技术:
要通过模板特化和可变参数模板来获取函数类型和返回类型。
先定义一个基本的function_traits的模板类:
template
struct function_traits;
再通过特化,将返回类型和可变参数模板作为模板参数,就可以获取函数类型、函数返回值和参数的个数了。

如:
int func(int a, string b);
1## 获取函数类型
function_traits<decltype(func)>::function_type; // int __cdecl(int, string)
2# 获取函数返回值
function_traits<decltype(func)>::return_type; // int
3# 获取函数的参数个数
function_traits<decltype(func)>::arity; // 2
4# 获取函数第一个入参类型
function_traits<decltype(func)>::args<0>::type; // int
5# 获取函数第二个入参类型
function_traits<decltype(func)>::args<1>::type; // string
6# 将函数转换为 std::function
stl_function_type

#ifndef TEST_FUNCTION_TRAITS_H
#define TEST_FUNCTION_TRAITS_H

#include <csignal>
#include <iostream>
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Poller/Timer.h"
#include "Util/function_traits.h"

using namespace std;
using namespace toolkit;
//打印数据类型
template<typename T>
void printType()
{
    printf("%s\n",demangle(typeid(T).name()).c_str());
}

//自定义类
class Student2{

};

//函数指针
float(*cast_func)(int, int, int, int);

//普通函数
int func(int a, Student2 b)
{
    printf("a=%d\n",a);
    return 0;
}

struct AA
{
    int f(int a, int b)volatile { return a + b; }//成员函数
    int operator()(int)const { return 0; }//函数对象
};

//function 函数包装模板,指向 lambda 表达式
std::function<int(int)> func_lam = [](int a) {return a; };

template <typename FUNC>
void to_function(FUNC &&func) {
    using stl_func = typename function_traits<typename std::remove_reference<FUNC>::type>::stl_function_type;
    stl_func f = func;

    f(10,Student2());//调用func,打印: a=10
}

void test_function_traits() {

    //1.获取函数信息
    printf("func:%s\n",demangle(typeid(function_traits<decltype(func)>::function_type).name()).c_str());//打印函数类型
    printf("func ret:%s\n",demangle(typeid(function_traits<decltype(func)>::return_type).name()).c_str());//打印返回值类型
    printf("fucn arg num:%d\n",function_traits<decltype(func)>::arity);//打印参数个数
    printf("fucn arg[0]:%s\n",demangle(typeid(function_traits<decltype(func)>::args<0>::type).name()).c_str());//打印第一个参数的类型

    printType<function_traits<std::function<int(int)>>::function_type>();
    printType<function_traits<std::function<int(int)>>::args<0>::type>();
    printType<function_traits<decltype(func_lam)>::function_type>();

    printType<function_traits<decltype(cast_func)>::function_type>();
    printType<function_traits<AA>::function_type>();
    using T = decltype(&AA::f);
    printType<T>();
    printType<function_traits<decltype(&AA::f)>::function_type>();
    static_assert(std::is_same<function_traits<decltype(func_lam)>::return_type, int>::value, "");

    //2.使用 stl_function_type 把任意函数转换为 std::function
    to_function(func);

    /**
     * 打印:
     * func:int (int, Student2)
     * func ret:int
     * fucn arg num:2
     * fucn arg[0]:int
     * int (int)
     * int
     * int (int)
     * float (int, int, int, int)
     * int (int)
     * int (AA::*)(int, int) volatile
     * int (int, int)
     * a=10
     */

    //退出程序事件处理
    static semaphore sem;
    signal(SIGINT, [](int) { sem.post(); });// 设置退出信号
    sem.wait();
}
#endif // TEST_FUNCTION_TRAITS_H

ObjectStatistic类统计测试

ObjectStatistic :统计类所实例化对象的个数。

创建私有成员: ObjectStatistic<Test_Obj> _statistic;并使用宏声明: StatisticImp(Test_Obj)。

在任意实例化的对象里均可调用接口 _statistic.count 查询实例化对象的总数。

#ifndef TEST_OBJECTSTATISTIC_H
#define TEST_OBJECTSTATISTIC_H

#include <csignal>
#include <iostream>
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Poller/Timer.h"

using namespace std;
using namespace toolkit;

class Test_CLS {

public:
    int get_count() {
        return _statistic.count();
    }
private:
    ObjectStatistic<Test_CLS> _statistic;
};

StatisticImp(Test_CLS)

void test_ObjectStatistic() {
    Test_CLS tTest_CLS1;
    Test_CLS tTest_CLS2;

    printf("count=%d\n",tTest_CLS1.get_count());//count=2

    {
        Test_CLS tTest_CLS3;
        printf("count=%d\n",tTest_CLS1.get_count());//count=3
    }

    printf("count=%d\n",tTest_CLS1.get_count());//count=2

    //退出程序事件处理
    static semaphore sem;
    signal(SIGINT, [](int) { sem.post(); });// 设置退出信号
    sem.wait();
}
#endif // TEST_OBJECTSTATISTIC_H
文章来源:https://blog.csdn.net/weixin_40355471/article/details/135422631
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。