一、整体流程概览
从GitHub下载源码后,代理的源码在src中,同时还用到了lib库中的一些函数。对项目的工作流程有个大概理解是分析mosquitto的访问控制权限的基础,网络上已有很多中文博客在介绍,如逍遥子,尽管比较老,但是主要结构体的意义没有变;首先对结构体的含义有所理解对后面进一步看源码是非常有帮助的,如struct mosquitto代表了一个客户端,mosquitto_db代表代理内的一个仓库来存储各种东西。
因为是C语言编写,首先寻找main函数,服务器从/src/mosquitto.c中的main函数开始启动。注意,看的时候会发现有很多宏定义(如WIN32),我们选择自己熟悉的一个平台开始看就好,把其他的折叠掉以免产生混乱。main函数进行了订阅树初始化和加载安全配置文件后,便进入mosquitto_main_loop主循环;该函数首先开始用epoll机制来监听socket读,之后便进入了真正的核心主循环while(run){},这里也才是服务器运行真正逻辑开始的地方。
从上至下流程概括如下:
while(run){//进入主死循环
context__free_disused(db);
#ifdef WITH_SYS_TREE
if(db->config->sys_interval > 0){
sys_tree__update(db, db->config->sys_interval, start_time);
}
#endif
#ifndef WITH_EPOLL
memset(pollfds, -1, sizeof(struct pollfd)*pollfd_max);
pollfd_index = 0;
for(i=0; i<listensock_count; i++){
pollfds[pollfd_index].fd = listensock[i];
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
pollfd_index++;
}
#endif
now_time = time(NULL);
time_count = 0;
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){//遍历哈希表
if(time_count > 0){
time_count--;
}else{
time_count = 1000;
now = mosquitto_time();
}
context->pollfd_index = -1;
if(context->sock != INVALID_SOCKET){
#ifdef WITH_BRIDGE
if(context->bridge){
mosquitto__check_keepalive(db, context);
if(context->bridge->round_robin == false
&& context->bridge->cur_address != 0
&& context->bridge->primary_retry
&& now > context->bridge->primary_retry){
if(context->bridge->primary_retry_sock == INVALID_SOCKET){
rc = net__try_connect(context, context->bridge->addresses[0].address,
context->bridge->addresses[0].port,
&context->bridge->primary_retry_sock, NULL, false);
if(rc == 0){
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = 0;
net__socket_close(db, context);
context->bridge->cur_address = 0;
}
}else{
len = sizeof(int);
if(!getsockopt(context->bridge->primary_retry_sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
if(err == 0){
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = 0;
net__socket_close(db, context);
context->bridge->cur_address = context->bridge->address_count-1;
}else{
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = now+5;
}
}else{
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = now+5;
}
}
}
}
#endif
/* Local bridges never time out in this fashion. */
if(!(context->keepalive)
|| context->bridge
|| now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){
//判断当客户端在线时,给客户端发送inflight的数据包
if(db__message_write(db, context) == MOSQ_ERR_SUCCESS){
#ifdef WITH_EPOLL
if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){
if(!(context->events & EPOLLOUT)) {
ev.data.fd = context->sock;
ev.events = EPOLLIN | EPOLLOUT;
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLOUT: %s", strerror(errno));
}
}
context->events = EPOLLIN | EPOLLOUT;
}
context->ws_want_write = false;
}
else{
if(context->events & EPOLLOUT) {
ev.data.fd = context->sock;
ev.events = EPOLLIN;
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLIN: %s", strerror(errno));
}
}
context->events = EPOLLIN;
}
}
#else
pollfds[pollfd_index].fd = context->sock;
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){
pollfds[pollfd_index].events |= POLLOUT;
context->ws_want_write = false;
}
context->pollfd_index = pollfd_index;
pollfd_index++;
#endif
}else{
do_disconnect(db, context);
}
}else{//客户端超时
if(db->config->connection_messages == true){
if(context->id){
id = context->id;
}else{
id = "<unknown>";
}
log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s has exceeded timeout, disconnecting.", id);
}
/* Client has exceeded keepalive*1.5 */
do_disconnect(db, context);
}
}
}
#ifdef WITH_BRIDGE
time_count = 0;
for(i=0; i<db->bridge_count; i++){
if(!db->bridges[i]) continue;
context = db->bridges[i];
if(context->sock == INVALID_SOCKET){
if(time_count > 0){
time_count--;
}else{
time_count = 1000;
now = mosquitto_time();
}
/* Want to try to restart the bridge connection */
if(!context->bridge->restart_t){
context->bridge->restart_t = now+context->bridge->restart_timeout;
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
}else{
if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect)
|| (context->bridge->start_type == bst_automatic && now > context->bridge->restart_t)){
#if defined(__GLIBC__) && defined(WITH_ADNS)
if(context->adns){
/* Connection attempted, waiting on DNS lookup */
rc = gai_error(context->adns);
if(rc == EAI_INPROGRESS){
/* Just keep on waiting */
}else if(rc == 0){
rc = bridge__connect_step2(db, context);
if(rc == MOSQ_ERR_SUCCESS){
#ifdef WITH_EPOLL
ev.data.fd = context->sock;
ev.events = EPOLLIN;
if(context->current_out_packet){
ev.events |= EPOLLOUT;
}
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno));
}
}else{
context->events = ev.events;
}
#else
pollfds[pollfd_index].fd = context->sock;
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
if(context->current_out_packet){
pollfds[pollfd_index].events |= POLLOUT;
}
context->pollfd_index = pollfd_index;
pollfd_index++;
#endif
}else if(rc == MOSQ_ERR_CONN_PENDING){
context->bridge->restart_t = 0;
}else{
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
context->bridge->restart_t = 0;
}
}else{
/* Need to retry */
if(context->adns->ar_result){
freeaddrinfo(context->adns->ar_result);
}
mosquitto__free(context->adns);
context->adns = NULL;
context->bridge->restart_t = 0;
}
}else{
rc = bridge__connect_step1(db, context);
if(rc){
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
}else{
/* Short wait for ADNS lookup */
context->bridge->restart_t = 1;
}
}
#else
{
rc = bridge__connect(db, context);
context->bridge->restart_t = 0;
if(rc == MOSQ_ERR_SUCCESS){
if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
context->bridge->primary_retry = now + 5;
}
#ifdef WITH_EPOLL
ev.data.fd = context->sock;
ev.events = EPOLLIN;
if(context->current_out_packet){
ev.events |= EPOLLOUT;
}
if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) {
if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) {
log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno));
}
}else{
context->events = ev.events;
}
#else
pollfds[pollfd_index].fd = context->sock;
pollfds[pollfd_index].events = POLLIN;
pollfds[pollfd_index].revents = 0;
if(context->current_out_packet){
pollfds[pollfd_index].events |= POLLOUT;
}
context->pollfd_index = pollfd_index;
pollfd_index++;
#endif
}else{
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
}
}
#endif
}
}
}
}
#endif
now_time = time(NULL);
if(db->config->persistent_client_expiration > 0 && now_time > expiration_check_time){
HASH_ITER(hh_id, db->contexts_by_id, context, ctxt_tmp){
if(context->sock == INVALID_SOCKET && context->clean_session == 0){
/* This is a persistent client, check to see if the
* last time it connected was longer than
* persistent_client_expiration seconds ago. If so,
* expire it and clean up.
*/
if(now_time > context->disconnect_t+db->config->persistent_client_expiration){
if(context->id){
id = context->id;
}else{
id = "<unknown>";
}
log__printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", id);
G_CLIENTS_EXPIRED_INC();
context->clean_session = true;
context->state = mosq_cs_expiring;
do_disconnect(db, context);
}
}
}
expiration_check_time = time(NULL) + 3600;
}
#ifndef WIN32
sigprocmask(SIG_SETMASK, &sigblock, &origsig);
#ifdef WITH_EPOLL
//监听socket事件
fdcount = epoll_wait(db->epollfd, events, MAX_EVENTS, 100);
#else
fdcount = poll(pollfds, pollfd_index, 100);
#endif
sigprocmask(SIG_SETMASK, &origsig, NULL);
#else
fdcount = WSAPoll(pollfds, pollfd_index, 100);
#endif
#ifdef WITH_EPOLL
switch(fdcount){
case -1:
if(errno != EINTR){
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll waiting: %s.", strerror(errno));
}
break;
case 0:
break;
default:
//循环处理socket事件
for(i=0; i<fdcount; i++){
for(j=0; j<listensock_count; j++){
if (events[i].data.fd == listensock[j]) {
if (events[i].events & (EPOLLIN | EPOLLPRI)){
//接受客户端的连接,net__socket_accept里同时创建了该客户端的context
while((ev.data.fd = net__socket_accept(db, listensock[j])) != -1){
ev.events = EPOLLIN;
if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, ev.data.fd, &ev) == -1) {
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: %s", strerror(errno));
}
context = NULL;
HASH_FIND(hh_sock, db->contexts_by_sock, &(ev.data.fd), sizeof(mosq_sock_t), context);
if(!context) {
log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: no context");
}
context->events = EPOLLIN;
}
}
break;
}
}
if (j == listensock_count) {
loop_handle_reads_writes(db, events[i].data.fd, events[i].events);
}
}
}
#else
if(fdcount == -1){
log__printf(NULL, MOSQ_LOG_ERR, "Error in poll: %s.", strerror(errno));
}else{
loop_handle_reads_writes(db, pollfds);
for(i=0; i<listensock_count; i++){
if(pollfds[i].revents & (POLLIN | POLLPRI)){
while(net__socket_accept(db, listensock[i]) != -1){
}
}
}
}
#endif
#ifdef WITH_PERSISTENCE
if(db->config->persistence && db->config->autosave_interval){
if(db->config->autosave_on_changes){
if(db->persistence_changes >= db->config->autosave_interval){
persist__backup(db, false);
db->persistence_changes = 0;
}
}else{
if(last_backup + db->config->autosave_interval < mosquitto_time()){
persist__backup(db, false);
last_backup = mosquitto_time();
}
}
}
#endif
#ifdef WITH_PERSISTENCE
if(flag_db_backup){
persist__backup(db, false);
flag_db_backup = false;
}
#endif
if(flag_reload){
log__printf(NULL, MOSQ_LOG_INFO, "Reloading config.");
config__read(db, db->config, true);
mosquitto_security_cleanup(db, true);
mosquitto_security_init(db, true);
mosquitto_security_apply(db);
log__close(db->config);
log__init(db->config);
flag_reload = false;
}
if(flag_tree_print){
sub__tree_print(db->subs, 0);
flag_tree_print = false;
}
#ifdef WITH_WEBSOCKETS
for(i=0; i<db->config->listener_count; i++){
/* Extremely hacky, should be using the lws provided external poll
* interface, but their interface has changed recently and ours
* will soon, so for now websockets clients are second class
* citizens. */
if(db->config->listeners[i].ws_context){
libwebsocket_service(db->config->listeners[i].ws_context, 0);
}
}
if(db->config->have_websockets_listener){
temp__expire_websockets_clients(db);
}
#endif
}//end while(run)
二、mosquitto原生权限功能
在mosquitto_plugin.h中唯一一次出现了对这几个权限宏定义的说明:
/*
* Function: mosquitto_auth_acl_check
*
* Called by the broker when topic access must be checked. access will be one
* of:
* MOSQ_ACL_SUBSCRIBE when a client is asking to subscribe to a topic string.
* This differs from MOSQ_ACL_READ in that it allows you to
* deny access to topic strings rather than by pattern. For
* example, you may use MOSQ_ACL_SUBSCRIBE to deny
* subscriptions to '#', but allow all topics in
* MOSQ_ACL_READ. This allows clients to subscribe to any
* topic they want, but not discover what topics are in use
* on the server.
* MOSQ_ACL_READ when a message is about to be sent to a client (i.e. whether
* it can read that topic or not).
* MOSQ_ACL_WRITE when a message has been received from a client (i.e. whether
* it can write to that topic or not).
*
后面的解释说明了实现时要在哪些位置检查这个权限。执行检查的函数是
int mosquitto_acl_check(struct mosquitto_db *db, struct mosquitto *context, const char *topic, long payloadlen, void* payload, int qos, bool retain, int access)
其中context就是就是被检查的客户端信息,topic、payload、retain等是当前消息的属性,access是要检查的具体权限。通过这个函数参数的接口设计可以猜测其是根据客户端的context来进行检查,也就是根据客户端的事件(ps.不然怎么知道要传入哪个context?一般都是哪个context有行为用哪个吧)。那么是不是所有消息都能找到对应的客户端context呢?请继续看下文分解。
想要查看作者具体是在哪里检查什么权限的可以全局搜索这个函数在哪里调用过。
三、对于mosquitto原生权限的改进
上节提到了,由于权限检查函数需要context的特点,以及retain消息是保存在订阅树叶子节点上的特点,导致retain消息WRITE权限检查丢失。本节讨论如何加入检查retained消息权限的功能。先来看代理是如何处理retained消息的。
所以修改思路就是在存入消息的时候,即db__message_store中,保存retained消息发送源的context(为了复用mosquitto_acl_check);在要发送给订阅客户端的时候,即retain__process中,检查发送源的权限。虽然看似简单但还是要考虑很多其他因素,尤其C语音要自己控制内存释放与初始化,一不小心就会段错误。具体修改细节: