
命运的局限尽可永在,
不屈的挑战却不可须臾或缺。
--- 史铁生 ---
项目地址在这里: https://gitee.com/penggli_2_0/TcpServer
这是一个仿muduo库One Thread One Loop式主从Reactor模型实现⾼并发服务器项目。是对基于事件驱动Reactor模型的改良版!
通过实现的高并发服务器组件,可以简洁快速的完成⼀个⾼性能的服务器搭建。并且,通过组件内提供的不同应⽤层协议⽀持,也可以快速完成⼀个高性能应用服务器的搭建(当前为了便于项目的演⽰,项目中提供HTTP协议组件的支持)。在这里,要明确的是咱们要实现的是⼀个高并发服务器组件,因此当前的项目中并不包含实际的业务内容!
实现的是主从Reactor模型服务器!分为两个部分:主Reactor 与 子Reactor:
One Thread One Loop的思想就是把所有的操作都放到⼀个线程中进行,⼀个线程对应⼀个事件处理的循环。
当前实现中,因为并不确定组件使用者的意向,因此不提供业务层工作线程池的实现,只实现主从Reactor,而Worker工作线程池,可由组件库的使用者的需要自行决定是否使用和实现。

在这项目中我们需要两大模块:
SERVER模块就是对所有的连接以及线程进⾏管理,让它们各司其职,在合适的时候做合适的事,最终完成高性能服务器组件的实现。具体的管理也分为三个方面:
基于以上的管理思想,将这个模块进⾏细致的划分⼜可以划分为以下多个子模块:
时间轮思想:时间轮的思想来源于钟表,如果我们定了⼀个3点钟的闹铃,则当时针⾛到3的时候,就代表时间到了。
同样的道理,如果我们定义了⼀个数组,并且有⼀个指针,指向数组起始位置,这个指针每秒钟向后⾛动⼀步,⾛到哪⾥,则代表哪⾥的任务该被执⾏了,那么如果我们想要定⼀个3s后的任务,则只需要将任务添加到tick+3位置,则每秒中⾛⼀步,三秒钟后tick⾛到对应位置,这时候执⾏对应位置的任务即可。
这里先对时间轮模块进行一个简单实现: 首先需要设计一个TimeTask 定时任务类
class Timer
{
private:
uint64_t _id; // 任务Id
uint32_t _timeout; // 延迟时间
bool _canceled; // false表示没有被取消了 true表示被取消了
Task_t _task_cb; // 定时任务
Release_t _release; // 释放操作
public:
Timer(uint64_t id, uint32_t timeout, Task_t task) : _id(id),
_timeout(timeout),
_canceled(false),
_task_cb(task)
{
}
void Cancel()
{
_canceled = true;
}
~Timer()
{
if (_canceled == false)
_task_cb();
_release();
}
// 设置释放操作
void SetRelease(Release_t release)
{
_release = release;
}
// 返回延迟时间
uint32_t DelayTime()
{
return _timeout;
}
};注意:这里使用会使用智能指针!shared_ptr 指针 与 weak_ptr 指针配合使用,weak_ptr不会增加shared_ptr的计数!
class TimeWheel
{
using TaskPtr = std::shared_ptr<Timer>;
using WeakPtr = std::weak_ptr<Timer>; // 辅助shared_ptr 不会增加引用计数
private:
int _capacity; // 最大容量 表盘最大数量(默认60秒)
int _tick; // 移动表针
std::vector<std::vector<TaskPtr>> _wheel; // 时间轮
std::unordered_map<uint64_t, WeakPtr> _timers; // 定时任务对象哈希表
private:
void RemoveTimer(uint64_t id)
{
auto it = _timers.find(id);
if (it != _timers.end())
{
_timers.erase(it);
}
}
public:
TimeWheel() : _capacity(gnum), _tick(0), _wheel(_capacity, std::vector<TaskPtr>())
{
}
void TimerAdd(uint64_t id, int delay, Task_t cb)
{
TaskPtr p(new Timer(id, delay, cb));
p->SetRelease(std::bind(&TimeWheel::RemoveTimer, this, id));
_timers[id] = WeakPtr(p); // 进行映射 注意是WeakPtr
// 放入时间轮
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(p);
}
void TimerRefresh(uint64_t id)
{
// 更新任务
auto it = _timers.find(id);
if (it == _timers.end())
return;
// 通过WeakPtr构造一个shared_ptr
TaskPtr p = it->second.lock();
int delay = p->DelayTime();
// 放入时间轮
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(p);
}
void RunTimerTask()
{
_tick = (_tick + 1) % _capacity;
_wheel[_tick].clear();
}
void TimerCancel(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
return;//没有找到直接退出!
TaskPtr p = it->second.lock();
if (p)
p->Cancel();
}
~TimeWheel()
{
}
};