做过Android开发的同学对Handler、Looper、MessageQueue、Message应该是非常熟悉了,Android是一个基于消息驱动的系统,我们在日常开发中用到消息队列的地方非常多。Android也给我们封装好了一个强大易用的消息处理API,音视频开发核心逻辑都会放在Native层,我们也希望在C++层实现这样的消息队列。
例如VideoEditor会创建一个GL线程,这个线程会构建EGL环境,我们可以在这个线程中构造EGLContext,然后使用OpenGL工具绘制各种效果。在此过程中,需要保持GL线程的统一,不然不同线程要通过共享EGLContext才可以实现效果了。言归正传,不需要多复杂的IPC机制,我们只需要实现一个简易的消息队列机制就行了。Android的消息队列机制实现太过复杂了,其实在音视频中很多东西不需要这么复杂,我们只需要将我们需要的那部分抠出来就可以了。
Android消息队列
我们首先分析一下Android中消息队列是如何设计的,下面是消息队列相关的类:
HandlerThread
Looper
Handler
MessageQueue
Message
正常构建一个消息分发机制的代码如下:
HandlerThread thread = new HandlerThread("Message Thread");
thread.start();
Handler handler = new Handler(thread.getLooper());
//......
handler.sendMessage(....)
大体的流程如下:
1.通过创建HandlerThread实例,HandlerThread实例中构建一个Looper实例
2.通过调用HandlerThread实例的start()方法开始执行消息队列轮转,进入Looper中的轮转
3.Handler实例中持有刚刚创建的Looper实例
4.Looper实例中构建一个消息队列MessageQueue
5.Handler每次发送消息都会通过Handler持有的Looper实例添加到消息队列中
6.Looper轮转中会消化处理消息
简单的流程示意如下图:
可以看到Looper.java中的轮转函数中有无限循环在执行,这个无限循环中会不断地处理消息队列中的消息(如果消息队列中存在消息的话),如果消息队列中不存在消息,那就一直等着。
从上面我们简单地分析中可以比较清晰地了解了Android原生的消息队列机制,不过有些地方实现的过于复杂了,在音视频SDK处理中可以不必要这么复杂,至于复杂的地方我在下面会提到的。下面我们根据对Android原生消息队列的分析来提供C++层的消息队列机制。
C++消息队列
我们照葫芦画瓢在C++中定义了几个文件:
handler_thread.cc
handler.cc
looper.cc
message_queue.cc
message.cc
每个文件提供的功能和Android基本上一致,不过我们还是先简单分析一下代码,方便后续的阐述。
handler_thread.cc
#include "handler_thread.h"
#include "log.h"
namespace thread {
HandlerThread *HandlerThread::Create(std::string name) {
return new HandlerThread(name);
}
static void *RunTask(void *context) {
auto handler_thread = reinterpret_cast<HandlerThread *>(context);
handler_thread->RunInternal();
pthread_exit(nullptr);
}
HandlerThread::HandlerThread(std::string name)
: name_(name)
, looper_(nullptr)
, exiting_(false)
, exited_(false) {
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_create(&thread_, &attr, RunTask, (void *) this);
}
void HandlerThread::RunInternal() {
pthread_mutex_lock(&mutex_);
exiting_ = false;
exited_ = false;
pthread_mutex_unlock(&mutex_);
Looper::Prepare();
pthread_mutex_lock(&mutex_);
looper_ = Looper::MyLooper();
pthread_cond_broadcast(&cond_);
pthread_mutex_unlock(&mutex_);
Looper::Loop();
Looper::Exit();
pthread_mutex_lock(&mutex_);
exiting_ = false;
looper_ = nullptr;
exited_ = true;
pthread_mutex_unlock(&mutex_);
}
HandlerThread::~HandlerThread() {
if (looper_) {
looper_->Quit(true);
}
pthread_join(thread_, nullptr);
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
if (looper_) {
delete looper_;
looper_ = nullptr;
}
}
void HandlerThread::Quit() {
pthread_mutex_lock(&mutex_);
if (exiting_ || exited_) {
pthread_mutex_unlock(&mutex_);
return;
}
exiting_ = true;
pthread_mutex_unlock(&mutex_);
Looper *looper = GetLooper();
if (looper) {
looper->Quit(false);
}
}
bool HandlerThread::QuitSafely() {
pthread_mutex_lock(&mutex_);
if (exiting_ || exited_) {
pthread_mutex_unlock(&mutex_);
return false;
}
exiting_ = true;
pthread_mutex_unlock(&mutex_);
Looper *looper = GetLooper();
if (looper) {
looper->Quit(true);
return true;
}
return false;
}
Looper *HandlerThread::GetLooper() {
pthread_mutex_lock(&mutex_);
if (exited_) {
LOGE("Thread has been exited");
pthread_mutex_unlock(&mutex_);
return nullptr;
}
if (looper_ == nullptr) {
LOGE("Thread should wait");
pthread_cond_wait(&cond_, &mutex_);
}
pthread_mutex_unlock(&mutex_);
return looper_;
}
handler.cc
#include "handler.h"
#include "log.h"
/**
*
* HandlerThread 持有 Looper
* Handler 持有 Looper
* Handler 发送消息通过Looper轮转消息
* Looper 中持有MessageQueue来管理消息
*/
namespace thread {
Handler::Handler(Looper *looper, HandlerCallback *callback)
: looper_(looper)
, callback_(callback) {
}
Handler::~Handler() {
}
void Handler::SendMessage(Message *msg) {
if (looper_) {
msg->target = this;
looper_->SendMessage(msg);
}
}
void Handler::DispatchMessage(Message *msg) {
if (callback_) {
callback_->HandleMessage(msg);
}
}
void Handler::RemoveMessage(int what) {
if (looper_) {
looper_->RemoveMessage(what);
}
}
int Handler::Size() {
if (looper_) {
return looper_->Size();
}
return 0;
}
}
looper.cc
#include "looper.h"
#include "thread.h"
#include "log.h"
#include <cassert>
#include "time_utils.h"
namespace thread {
Looper::Looper()
: exiting_(false)
, exited_(false)
, exit_safely_(false)
, looping_(false) {
message_queue_ = new MessageQueue();
pthread_mutex_init(&variable_mutex_, nullptr);
}
Looper::~Looper() {
pthread_mutex_destroy(&variable_mutex_);
}
void Looper::Prepare() {
int64_t tid = Thread::CurrentThreadId();
Looper *looper = LooperManager::GetInstance()->Create(tid);
if (looper == nullptr) {
LOGE("Current thread looper has been called");
}
}
void Looper::Loop() {
MyLooper()->LoopInternal();
}
Looper * Looper::MyLooper() {
int64_t tid = Thread::CurrentThreadId();
Looper *looper = LooperManager::GetInstance()->Get(tid);
if (looper == nullptr) {
LOGE("Please invoke Looper::Prepare first");
}
assert(looper);
return looper;
}
int64_t Looper::MyLooperId() {
return reinterpret_cast<int64_t>(MyLooper());
}
void Looper::Exit() {
int64_t tid = Thread::CurrentThreadId();
LooperManager::GetInstance()->Remove(tid);
}
void Looper::Quit(bool safely) {
pthread_mutex_lock(&variable_mutex_);
if (exiting_ || exited_) {
pthread_mutex_unlock(&variable_mutex_);
return;
}
exit_safely_ = safely;
exiting_ = true;
pthread_mutex_unlock(&variable_mutex_);
message_queue_->Notify();
}
void Looper::Dump() {
message_queue_->Dump();
}
int Looper::Size() {
return message_queue_->Size();
}
void Looper::SendMessage(Message *msg) {
pthread_mutex_lock(&variable_mutex_);
if (exiting_ || exited_) {
pthread_mutex_unlock(&variable_mutex_);
return;
}
pthread_mutex_unlock(&variable_mutex_);
EnqueueMessage(msg);
}
void Looper::RemoveMessage(int what) {
message_queue_->RemoveMessage(what);
}
void Looper::LoopInternal() {
pthread_mutex_lock(&variable_mutex_);
if (looping_ || exiting_ || exited_) {
pthread_mutex_unlock(&variable_mutex_);
return;
}
looping_ = true;
pthread_mutex_unlock(&variable_mutex_);
for (;;) {
Message *msg = Take();
if (msg) {
if (msg->target) {
msg->target->DispatchMessage(msg);
}
delete msg;
}
pthread_mutex_lock(&variable_mutex_);
if (exit_safely_) {
if (exiting_ && message_queue_->Size() == 0) {
pthread_mutex_unlock(&variable_mutex_);
break;
}
} else {
if (exiting_) {
pthread_mutex_unlock(&variable_mutex_);
break;
}
}
pthread_mutex_unlock(&variable_mutex_);
}
int64_t time = TimeUtils::GetCurrentTimeUs();
while (message_queue_->Size() > 0) {
Message *msg = message_queue_->Take();
if (msg) {
delete msg;
}
}
message_queue_->Clear();
LOGI("Clear message_queue cost time=%lld us", (TimeUtils::GetCurrentTimeUs() - time));
pthread_mutex_lock(&variable_mutex_);
exiting_ = false;
exited_ = true;
looping_ = false;
pthread_mutex_unlock(&variable_mutex_);
}
void Looper::EnqueueMessage(Message *msg) {
/// TODO msg 模式, 可以放在队头, 也可以放在队尾
message_queue_->Offer(msg);
}
Message * Looper::Take() {
return message_queue_->Take();
}
/// ------------------------------------------------------------------
LooperManager *LooperManager::instance_ = new LooperManager();
LooperManager::LooperManager() {
}
LooperManager::~LooperManager() {
}
LooperManager * LooperManager::GetInstance() {
return instance_;
}
Looper * LooperManager::Create(int64_t tid) {
std::lock_guard<std::mutex> guard(looper_mutex_);
auto it = looper_map_.find(tid);
if (it == looper_map_.end()) {
Looper *looper = new Looper();
looper_map_[tid] = looper;
return looper;
}
return nullptr;
}
Looper * LooperManager::Get(int64_t tid) {
std::lock_guard<std::mutex> guard(looper_mutex_);
auto it = looper_map_.find(tid);
if (it == looper_map_.end()) {
return nullptr;
}
return it->second;
}
void LooperManager::Remove(int64_t tid) {
std::lock_guard<std::mutex> guard(looper_mutex_);
auto it = looper_map_.find(tid);
if (it != looper_map_.end()) {
looper_map_.erase(it);
}
}
int LooperManager::Size() {
std::lock_guard<std::mutex> guard(looper_mutex_);
return looper_map_.size();
}
}
message_queue.cc
#include "message_queue.h"
#include "log.h"
#include <sstream>
namespace thread {
MessageQueue::MessageQueue()
: is_destroyed_(false) {
pthread_mutex_init(&queue_mutex_, nullptr);
pthread_cond_init(&queue_cond_, nullptr);
}
MessageQueue::~MessageQueue() {
LOGI("Enter");
pthread_mutex_lock(&queue_mutex_);
is_destroyed_ = true;
pthread_mutex_unlock(&queue_mutex_);
Clear();
pthread_mutex_destroy(&queue_mutex_);
pthread_cond_destroy(&queue_cond_);
LOGI("Leave");
}
void MessageQueue::Offer(Message *msg) {
pthread_mutex_lock(&queue_mutex_);
if (is_destroyed_) {
pthread_mutex_unlock(&queue_mutex_);
return;
}
queue_.push_back(msg);
pthread_cond_broadcast(&queue_cond_);
pthread_mutex_unlock(&queue_mutex_);
}
void MessageQueue::OfferAtFront(Message *msg) {
pthread_mutex_lock(&queue_mutex_);
if (is_destroyed_) {
pthread_mutex_unlock(&queue_mutex_);
return;
}
queue_.push_front(msg);
pthread_cond_broadcast(&queue_cond_);
pthread_mutex_unlock(&queue_mutex_);
}
Message *MessageQueue::Take() {
pthread_mutex_lock(&queue_mutex_);
if (is_destroyed_) {
pthread_mutex_unlock(&queue_mutex_);
return nullptr;
}
if (Size() <= 0) {
pthread_cond_wait(&queue_cond_, &queue_mutex_);
}
if (queue_.empty()) {
pthread_mutex_unlock(&queue_mutex_);
return nullptr;
}
Message *msg = queue_.front();
queue_.pop_front();
pthread_mutex_unlock(&queue_mutex_);
return msg;
}
void MessageQueue::Notify() {
pthread_mutex_lock(&queue_mutex_);
pthread_cond_broadcast(&queue_cond_);
pthread_mutex_unlock(&queue_mutex_);
}
int MessageQueue::Size() {
return queue_.size();
}
bool MessageQueue::IsEmpty() {
return queue_.empty();
}
void MessageQueue::Clear() {
Notify();
if (queue_.empty()) {
return;
}
pthread_mutex_lock(&queue_mutex_);
while (!queue_.empty()) {
Message *msg = queue_.front();
queue_.pop_front();
if (msg) {
delete msg;
}
}
queue_.clear();
pthread_mutex_unlock(&queue_mutex_);
}
void MessageQueue::RemoveMessage(int what) {
pthread_mutex_lock(&queue_mutex_);
if (is_destroyed_) {
pthread_mutex_unlock(&queue_mutex_);
return;
}
std::list<Message *>::iterator it = queue_.begin();
while (it != queue_.end()) {
Message *msg = *it;
if (what == msg->what) {
delete msg;
it = queue_.erase(it);
continue;
}
++it;
}
pthread_mutex_unlock(&queue_mutex_);
}
void MessageQueue::Dump() {
std::ostringstream os;
std::list<Message *>::iterator it = queue_.begin();
while (it != queue_.end()) {
Message *msg = *it;
os << msg->what<<"\n";
++it;
}
LOGI("Result=%s", os.str().c_str());
}
}
message.cc
#include "message.h"
#include "log.h"
namespace thread {
Message::Message()
: what(-1)
, arg1(-1)
, arg2(-1)
, arg3(-1)
, arg4(-1)
, arg5(-1)
, arg6(-1)
, arg7(-1)
, obj1(nullptr)
, obj2(nullptr)
, target(nullptr) {
}
Message::~Message() {
/**
* obj1
* obj2
* target
* 不应该在Message析构函数中销毁, 应该由开发者决定是否销毁
*/
}
}
初始化:
std::string name("AV Message Queue");
thread::HandlerThread *handler_thread = thread::HandlerThread::Create(name);
thread::Handler *handler = new thread::Handler(handler_thread->GetLooper(), this);
同时保证当前的类实现thread::HandlerCallback,实现函数HandleMessage(thread::Message *msg)
不要忘记在析构函数中将handler_thread和handler指针销毁。
发送消息:
thread::Message *msg = new thread::Message();
msg->what = MSG_WHAT;
msg->obj1 = XXXX;
handler->SendMessage(msg);
同步等待的消息处理
延时消息处理
延时消息处理需要使用链表的结果,目前我们使用的双端队列,不过目前音视频SDK已经够用了,但是如果需要延时处理的话,你愿意来尝试一下吗?