前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何在Native层设计一个消息队列

如何在Native层设计一个消息队列

作者头像
马上就说
发布2022-05-25 14:45:24
4680
发布2022-05-25 14:45:24
举报
文章被收录于专栏:码上就说

做过Android开发的同学对Handler、Looper、MessageQueue、Message应该是非常熟悉了,Android是一个基于消息驱动的系统,我们在日常开发中用到消息队列的地方非常多。Android也给我们封装好了一个强大易用的消息处理API,音视频开发核心逻辑都会放在Native层,我们也希望在C++层实现这样的消息队列。

例如VideoEditor会创建一个GL线程,这个线程会构建EGL环境,我们可以在这个线程中构造EGLContext,然后使用OpenGL工具绘制各种效果。在此过程中,需要保持GL线程的统一,不然不同线程要通过共享EGLContext才可以实现效果了。言归正传,不需要多复杂的IPC机制,我们只需要实现一个简易的消息队列机制就行了。Android的消息队列机制实现太过复杂了,其实在音视频中很多东西不需要这么复杂,我们只需要将我们需要的那部分抠出来就可以了。

Android消息队列

我们首先分析一下Android中消息队列是如何设计的,下面是消息队列相关的类:

代码语言:javascript
复制
HandlerThread
Looper
Handler
MessageQueue
Message

正常构建一个消息分发机制的代码如下:

代码语言:javascript
复制
HandlerThread thread = new HandlerThread("Message Thread");
thread.start();
Handler handler = new Handler(thread.getLooper());
//......
handler.sendMessage(....)

大体的流程如下:

代码语言:javascript
复制
1.通过创建HandlerThread实例,HandlerThread实例中构建一个Looper实例
2.通过调用HandlerThread实例的start()方法开始执行消息队列轮转,进入Looper中的轮转
3.Handler实例中持有刚刚创建的Looper实例
4.Looper实例中构建一个消息队列MessageQueue
5.Handler每次发送消息都会通过Handler持有的Looper实例添加到消息队列中
6.Looper轮转中会消化处理消息

简单的流程示意如下图:

可以看到Looper.java中的轮转函数中有无限循环在执行,这个无限循环中会不断地处理消息队列中的消息(如果消息队列中存在消息的话),如果消息队列中不存在消息,那就一直等着。

从上面我们简单地分析中可以比较清晰地了解了Android原生的消息队列机制,不过有些地方实现的过于复杂了,在音视频SDK处理中可以不必要这么复杂,至于复杂的地方我在下面会提到的。下面我们根据对Android原生消息队列的分析来提供C++层的消息队列机制。

C++消息队列

我们照葫芦画瓢在C++中定义了几个文件:

代码语言:javascript
复制
handler_thread.cc
handler.cc
looper.cc
message_queue.cc
message.cc

每个文件提供的功能和Android基本上一致,不过我们还是先简单分析一下代码,方便后续的阐述。

handler_thread.cc

代码语言:javascript
复制
#include "handler_thread.h"
#include "log.h"

namespace thread {

HandlerThread *HandlerThread::Create(std::string name) {
  return new HandlerThread(name);
}

static void *RunTask(void *context) {
  auto handler_thread = reinterpret_cast<HandlerThread *>(context);
  handler_thread->RunInternal();
  pthread_exit(nullptr);
}

HandlerThread::HandlerThread(std::string name)
  : name_(name)
  , looper_(nullptr)
  , exiting_(false)
  , exited_(false) {
  pthread_mutex_init(&mutex_, nullptr);
  pthread_cond_init(&cond_, nullptr);
  pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
  pthread_create(&thread_, &attr, RunTask, (void *) this);
}

void HandlerThread::RunInternal() {
  pthread_mutex_lock(&mutex_);
  exiting_ = false;
  exited_ = false;
  pthread_mutex_unlock(&mutex_);

  Looper::Prepare();
  pthread_mutex_lock(&mutex_);
  looper_ = Looper::MyLooper();
  pthread_cond_broadcast(&cond_);
  pthread_mutex_unlock(&mutex_);

  Looper::Loop();
  Looper::Exit();

  pthread_mutex_lock(&mutex_);
  exiting_ = false;
  looper_ = nullptr;
  exited_ = true;
  pthread_mutex_unlock(&mutex_);
}

HandlerThread::~HandlerThread() {
  if (looper_) {
    looper_->Quit(true);
  }
  pthread_join(thread_, nullptr);
  pthread_mutex_destroy(&mutex_);
  pthread_cond_destroy(&cond_);
  if (looper_) {
    delete looper_;
    looper_ = nullptr;
  }
}

void HandlerThread::Quit() {
  pthread_mutex_lock(&mutex_);
  if (exiting_ || exited_) {
    pthread_mutex_unlock(&mutex_);
    return;
  }
  exiting_ = true;
  pthread_mutex_unlock(&mutex_);
  Looper *looper = GetLooper();
  if (looper) {
    looper->Quit(false);
  }
}

bool HandlerThread::QuitSafely() {
  pthread_mutex_lock(&mutex_);
  if (exiting_ || exited_) {
    pthread_mutex_unlock(&mutex_);
    return false;
  }
  exiting_ = true;
  pthread_mutex_unlock(&mutex_);
  Looper *looper = GetLooper();
  if (looper) {
    looper->Quit(true);
    return true;
  }
  return false;
}

Looper *HandlerThread::GetLooper() {
  pthread_mutex_lock(&mutex_);
  if (exited_) {
    LOGE("Thread has been exited");
    pthread_mutex_unlock(&mutex_);
    return nullptr;
  }
  if (looper_ == nullptr) {
    LOGE("Thread should wait");
    pthread_cond_wait(&cond_, &mutex_);
  }
  pthread_mutex_unlock(&mutex_);
  return looper_;
}

handler.cc

代码语言:javascript
复制
#include "handler.h"
#include "log.h"
/**
 *
 * HandlerThread 持有 Looper
 * Handler 持有 Looper
 * Handler 发送消息通过Looper轮转消息
 * Looper 中持有MessageQueue来管理消息
 */

namespace thread {

Handler::Handler(Looper *looper, HandlerCallback *callback)
  : looper_(looper)
  , callback_(callback) {
}

Handler::~Handler() {
}

void Handler::SendMessage(Message *msg) {
  if (looper_) {
    msg->target = this;
    looper_->SendMessage(msg);
  }
}

void Handler::DispatchMessage(Message *msg) {
  if (callback_) {
    callback_->HandleMessage(msg);
  }
}

void Handler::RemoveMessage(int what) {
  if (looper_) {
    looper_->RemoveMessage(what);
  }
}

int Handler::Size() {
  if (looper_) {
    return looper_->Size();
  }
  return 0;
}

}

looper.cc

代码语言:javascript
复制
#include "looper.h"
#include "thread.h"
#include "log.h"
#include <cassert>
#include "time_utils.h"

namespace thread {

Looper::Looper()
  : exiting_(false)
  , exited_(false)
  , exit_safely_(false)
  , looping_(false) {
  message_queue_ = new MessageQueue();
  pthread_mutex_init(&variable_mutex_, nullptr);
}

Looper::~Looper() {
  pthread_mutex_destroy(&variable_mutex_);
}

void Looper::Prepare() {
  int64_t tid = Thread::CurrentThreadId();
  Looper *looper = LooperManager::GetInstance()->Create(tid);
  if (looper == nullptr) {
    LOGE("Current thread looper has been called");
  }
}

void Looper::Loop() {
  MyLooper()->LoopInternal();
}

Looper * Looper::MyLooper() {
  int64_t tid = Thread::CurrentThreadId();
  Looper *looper = LooperManager::GetInstance()->Get(tid);
  if (looper == nullptr) {
    LOGE("Please invoke Looper::Prepare first");
  }
  assert(looper);
  return looper;
}

int64_t Looper::MyLooperId() {
  return reinterpret_cast<int64_t>(MyLooper());
}

void Looper::Exit() {
  int64_t tid = Thread::CurrentThreadId();
  LooperManager::GetInstance()->Remove(tid);
}

void Looper::Quit(bool safely) {
  pthread_mutex_lock(&variable_mutex_);
  if (exiting_ || exited_) {
    pthread_mutex_unlock(&variable_mutex_);
    return;
  }
  exit_safely_ = safely;
  exiting_ = true;
  pthread_mutex_unlock(&variable_mutex_);
  message_queue_->Notify();
}

void Looper::Dump() {
  message_queue_->Dump();
}

int Looper::Size() {
  return message_queue_->Size();
}

void Looper::SendMessage(Message *msg) {
  pthread_mutex_lock(&variable_mutex_);
  if (exiting_ || exited_) {
    pthread_mutex_unlock(&variable_mutex_);
    return;
  }
  pthread_mutex_unlock(&variable_mutex_);
  EnqueueMessage(msg);
}

void Looper::RemoveMessage(int what) {
  message_queue_->RemoveMessage(what);
}

void Looper::LoopInternal() {
  pthread_mutex_lock(&variable_mutex_);
  if (looping_ || exiting_ || exited_) {
    pthread_mutex_unlock(&variable_mutex_);
    return;
  }
  looping_ = true;
  pthread_mutex_unlock(&variable_mutex_);

  for (;;) {
    Message *msg = Take();
    if (msg) {
      if (msg->target) {
        msg->target->DispatchMessage(msg);
      }
      delete msg;
    }

    pthread_mutex_lock(&variable_mutex_);
    if (exit_safely_) {
      if (exiting_ && message_queue_->Size() == 0) {
        pthread_mutex_unlock(&variable_mutex_);
        break;
      }
    } else {
      if (exiting_) {
        pthread_mutex_unlock(&variable_mutex_);
        break;
      }
    }
    pthread_mutex_unlock(&variable_mutex_);
  }

  int64_t time = TimeUtils::GetCurrentTimeUs();
  while (message_queue_->Size() > 0) {
    Message *msg = message_queue_->Take();
    if (msg) {
      delete msg;
    }
  }
  message_queue_->Clear();
  LOGI("Clear message_queue cost time=%lld us", (TimeUtils::GetCurrentTimeUs() - time));

  pthread_mutex_lock(&variable_mutex_);
  exiting_ = false;
  exited_ = true;
  looping_ = false;
  pthread_mutex_unlock(&variable_mutex_);
}

void Looper::EnqueueMessage(Message *msg) {
  /// TODO msg 模式, 可以放在队头, 也可以放在队尾
  message_queue_->Offer(msg);
}

Message * Looper::Take() {
  return message_queue_->Take();
}

/// ------------------------------------------------------------------

LooperManager *LooperManager::instance_ = new LooperManager();

LooperManager::LooperManager() {

}

LooperManager::~LooperManager() {

}

LooperManager * LooperManager::GetInstance() {
  return instance_;
}

Looper * LooperManager::Create(int64_t tid) {
  std::lock_guard<std::mutex> guard(looper_mutex_);
  auto it = looper_map_.find(tid);
  if (it == looper_map_.end()) {
    Looper *looper = new Looper();
    looper_map_[tid] = looper;
    return looper;
  }
  return nullptr;
}

Looper * LooperManager::Get(int64_t tid) {
  std::lock_guard<std::mutex> guard(looper_mutex_);
  auto it = looper_map_.find(tid);
  if (it == looper_map_.end()) {
    return nullptr;
  }
  return it->second;

}

void LooperManager::Remove(int64_t tid) {
  std::lock_guard<std::mutex> guard(looper_mutex_);
  auto it = looper_map_.find(tid);
  if (it != looper_map_.end()) {
    looper_map_.erase(it);
  }
}

int LooperManager::Size() {
  std::lock_guard<std::mutex> guard(looper_mutex_);
  return looper_map_.size();
}

}

message_queue.cc

代码语言:javascript
复制
#include "message_queue.h"
#include "log.h"
#include <sstream>

namespace thread {

MessageQueue::MessageQueue()
  : is_destroyed_(false) {
  pthread_mutex_init(&queue_mutex_, nullptr);
  pthread_cond_init(&queue_cond_, nullptr);
}

MessageQueue::~MessageQueue() {
  LOGI("Enter");
  pthread_mutex_lock(&queue_mutex_);
  is_destroyed_ = true;
  pthread_mutex_unlock(&queue_mutex_);

  Clear();

  pthread_mutex_destroy(&queue_mutex_);
  pthread_cond_destroy(&queue_cond_);
  LOGI("Leave");
}

void MessageQueue::Offer(Message *msg) {
  pthread_mutex_lock(&queue_mutex_);
  if (is_destroyed_) {
    pthread_mutex_unlock(&queue_mutex_);
    return;
  }
  queue_.push_back(msg);
  pthread_cond_broadcast(&queue_cond_);
  pthread_mutex_unlock(&queue_mutex_);
}

void MessageQueue::OfferAtFront(Message *msg) {
  pthread_mutex_lock(&queue_mutex_);
  if (is_destroyed_) {
    pthread_mutex_unlock(&queue_mutex_);
    return;
  }
  queue_.push_front(msg);
  pthread_cond_broadcast(&queue_cond_);
  pthread_mutex_unlock(&queue_mutex_);
}

Message *MessageQueue::Take() {
  pthread_mutex_lock(&queue_mutex_);
  if (is_destroyed_) {
    pthread_mutex_unlock(&queue_mutex_);
    return nullptr;
  }
  if (Size() <= 0) {
    pthread_cond_wait(&queue_cond_, &queue_mutex_);
  }
  if (queue_.empty()) {
    pthread_mutex_unlock(&queue_mutex_);
    return nullptr;
  }
  Message *msg = queue_.front();
  queue_.pop_front();
  pthread_mutex_unlock(&queue_mutex_);
  return msg;
}

void MessageQueue::Notify() {
  pthread_mutex_lock(&queue_mutex_);
  pthread_cond_broadcast(&queue_cond_);
  pthread_mutex_unlock(&queue_mutex_);
}

int MessageQueue::Size() {
  return queue_.size();
}

bool MessageQueue::IsEmpty() {
  return queue_.empty();
}

void MessageQueue::Clear() {
  Notify();
  if (queue_.empty()) {
    return;
  }
  pthread_mutex_lock(&queue_mutex_);
  while (!queue_.empty()) {
    Message *msg = queue_.front();
    queue_.pop_front();
    if (msg) {
      delete msg;
    }
  }
  queue_.clear();
  pthread_mutex_unlock(&queue_mutex_);
}

void MessageQueue::RemoveMessage(int what) {
  pthread_mutex_lock(&queue_mutex_);
  if (is_destroyed_) {
    pthread_mutex_unlock(&queue_mutex_);
    return;
  }
  std::list<Message *>::iterator it = queue_.begin();
  while (it != queue_.end()) {
    Message *msg = *it;
    if (what == msg->what) {
      delete msg;
      it = queue_.erase(it);
      continue;
    }
    ++it;
  }
  pthread_mutex_unlock(&queue_mutex_);
}

void MessageQueue::Dump() {
  std::ostringstream os;
  std::list<Message *>::iterator it = queue_.begin();
  while (it != queue_.end()) {
    Message *msg = *it;
    os << msg->what<<"\n";
    ++it;
  }
  LOGI("Result=%s", os.str().c_str());
}

}

message.cc

代码语言:javascript
复制
#include "message.h"

#include "log.h"

namespace thread {

Message::Message()
  : what(-1)
  , arg1(-1)
  , arg2(-1)
  , arg3(-1)
  , arg4(-1)
  , arg5(-1)
  , arg6(-1)
  , arg7(-1)
  , obj1(nullptr)
  , obj2(nullptr)
  , target(nullptr) {

}

Message::~Message() {
  /**
   * obj1
   * obj2
   * target
   * 不应该在Message析构函数中销毁, 应该由开发者决定是否销毁
   */
}

}

C++消息队列怎么使用

初始化:

代码语言:javascript
复制
std::string name("AV Message Queue");
thread::HandlerThread *handler_thread = thread::HandlerThread::Create(name);
thread::Handler *handler = new thread::Handler(handler_thread->GetLooper(), this);

同时保证当前的类实现thread::HandlerCallback,实现函数HandleMessage(thread::Message *msg)

不要忘记在析构函数中将handler_thread和handler指针销毁。

发送消息:

代码语言:javascript
复制
thread::Message *msg = new thread::Message();
msg->what = MSG_WHAT;
msg->obj1 = XXXX;
handler->SendMessage(msg);

遗留问题

代码语言:javascript
复制
同步等待的消息处理
延时消息处理

延时消息处理需要使用链表的结果,目前我们使用的双端队列,不过目前音视频SDK已经够用了,但是如果需要延时处理的话,你愿意来尝试一下吗?

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-05-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 音视频平凡之路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • C++消息队列怎么使用
    • 遗留问题
    相关产品与服务
    消息队列 CMQ 版
    消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档