支持插件的消息中间件
msg broker with plugin
msg broker是实现application 之间互通讯的组件。通常为实现application之间的解耦,消息都是通过msg broker完成转发。application只需知道其他applicatipn的逻辑名称,而不需要知道对方的具体位置。Broker中维护一个查找表,记录着哪个application注册在此逻辑名称之下,所以消息总是会被正确的投递到目的地。
msg broker不限于1-1的转发,也支持1-N的模式。其主要功能有:
Msg Broker的结构:
项目地址:http://www.rabbitmq.com/
RabbitMQ是由Erlang开发的以高效、健壮以及高度伸缩性的消息服务器。其所包含的概念有Producer、Consumer、Exchange、Queue。RabbitMQ基于QMQP协议,支持的语言也非常丰富,文档也非常清晰。使用RabbitMQ可以实现订阅发布模型、RPC模型、路由模型等,参见RabbitMQ的例子:http://www.rabbitmq.com/getstarted.html。
但是它有如下局限性:
项目地址:http://www.zeromq.org/
ZeroMQ是一个Socket封装库,号称是最快的消息内核。ZeroMQ可以支持TCP、UDP、IPC等多种通讯协议。ZeroMQ可以实现的通讯模型就更多了,几乎涵盖了消息通讯的所有模式,参见官网介绍http://www.zeromq.org/intro:read-the-manual 。
其局限性为:
在网络游戏中,cliet和服务器是通过tcp长连接的。相对于HTTP+WebServer的不同在于:
由于msg broker支持Python和lua作为插件,那么必须确保linux下安装了相应的头文件。示例中的插件均只实现了echo功能。
svn co https://ffown.googlecode.com/svn/trunk/
#ifndef _PLUGIN_H_
#define _PLUGIN_H_
#include "channel.h"
#include "message.h"
class plugin_i
{
public:
virtual ~plugin_i(){}
virtual int start() = 0;
virtual int stop() = 0;
virtual int handle_broken(channel_ptr_t channel_) = 0;
virtual int handle_msg(const message_t& msg_, channel_ptr_t channel_) = 0;
};
typedef plugin_i* plugin_ptr_t;
typedef int (*handle_channel_msg_func_t)(const message_t& msg_, channel_ptr_t);
typedef int (*handle_channel_broken_func_t)(channel_ptr_t);
#define HANDLE_CHANNEL_MSG "handle_channel_msg"
#define HANDLE_CHANNEL_BROKEN "handle_channel_broken"
#endif
各个接口作用如下:
channel 用来表示一个连接,可以理解成socket的抽象,也可直接理解成远程client。
#ifndef _CHANNEL_H_
#define _CHANNEL_H_
#include "socket_i.h"
class channel_t
{
public:
channel_t(socket_ptr_t sock_);
~channel_t();
void set_data(void* p);
void* get_data() const;
template<typename T>
T* get_data() const { return (T*)this->get_data(); }
void async_send(const string& buff_);
void close();
private:
socket_ptr_t m_socket;
void* m_data;
};
typedef channel_t* channel_ptr_t;
#endif
int plugin_dll_t::start()
{
m_dll_handler = ::dlopen(m_dll_name.c_str(), RTLD_NOW|RTLD_GLOBAL);
if (NULL == m_dll_handler)
{
logerror((PLUGIN_IMPL, "plugin_dll_t::start dlopen failed:<%s>", dlerror()));
return -1;
}
m_msg_cb = (handle_channel_msg_func_t)::dlsym(m_dll_handler, HANDLE_CHANNEL_MSG);
m_broken_cb = (handle_channel_broken_func_t)::dlsym(m_dll_handler, HANDLE_CHANNEL_BROKEN);
if (NULL == m_msg_cb)
{
logerror((PLUGIN_IMPL, "plugin_dll_t::start dlopen failed:<%s> not exist", HANDLE_CHANNEL_MSG));
return -1;
}
if (NULL == m_broken_cb)
{
logerror((PLUGIN_IMPL, "plugin_dll_t::start dlopen failed:<%s> not exist", HANDLE_CHANNEL_BROKEN));
return -1;
}
return 0;
}
int plugin_dll_t::stop()
{
::dlclose(m_dll_handler);
return 0;
}
int plugin_dll_t::handle_broken(channel_ptr_t channel_)
{
return m_broken_cb(channel_);
}
int plugin_dll_t::handle_msg(const message_t& msg_, channel_ptr_t channel_)
{
return m_msg_cb(msg_, channel_);
}
#include "plugin_impl/plugin_python.h"
#include "plugin_impl/pyext.h"
#include "log_module.h"
plugin_python_t::plugin_python_t(const string& name_):
m_py_mod(NULL)
{
string pythonpath = "./";
int pos = name_.find_last_of('/');
if (-1 == pos)
{
m_py_name = name_;
}
else
{
m_py_name = name_.substr(pos+1);
pythonpath = name_.substr(0, pos+1);
}
pos = m_py_name.find_first_of('.');
m_py_name = m_py_name.substr(0, pos);
Py_InitializeEx(0);
Py_SetPythonHome((char*)pythonpath.c_str());
initpyext(this);
PyRun_SimpleString("import channel;import sys;sys.path.append('./plugin/plugin_echo_py/')");
}
plugin_python_t::~plugin_python_t()
{
Py_Finalize();
}
int plugin_python_t::start()
{
if(load_py_mod())
{
return -1;
}
return 0;
}
int plugin_python_t::stop()
{
return 0;
}
int plugin_python_t::load_py_mod()
{
PyObject *pName, *pModule;
pName = PyString_FromString(m_py_name.c_str());
pModule = PyImport_Import(pName);
if (!pModule )
{
Py_DECREF(pName);
logerror((PLUGIN_IMPL, "can't find %s.py\n", m_py_name.c_str()));
if (PyErr_Occurred())
{
PyErr_Print();
PyErr_Clear();
return -1;
}
return -1;
}
m_py_mod = PyModule_GetDict(pModule);
Py_DECREF(pName);
Py_DECREF(pModule);
return 0;
}
int plugin_python_t::handle_broken(channel_ptr_t channel_)
{
m_channel_mgr.erase(long(channel_));
delete channel_;
return call_py_handle_broken(long(channel_));
}
int plugin_python_t::handle_msg(const message_t& msg_, channel_ptr_t channel_)
{
m_channel_mgr.insert(make_pair((long)channel_, channel_));
return call_py_handle_msg((long)channel_, msg_.get_body().c_str());
}
int plugin_python_t::call_py_handle_msg(long val, const char* msg)
{
PyObject *pDict = m_py_mod;
const char* func_name = "handle_msg";
PyObject *pFunc, *arglist, *pRetVal;
pFunc = PyDict_GetItemString(pDict, func_name);
if (!pFunc || !PyCallable_Check(pFunc))
{
logerror((PLUGIN_IMPL, "can't find function [%s]\n", func_name));
return -1;
}
arglist = Py_BuildValue("ls", val, msg);
pRetVal = PyObject_CallObject(pFunc, arglist);
Py_DECREF(arglist);
if (pRetVal)
{
Py_DECREF(pRetVal);
}
if (PyErr_Occurred())
{
PyErr_Print();
PyErr_Clear();
return -1;
}
return 0;
}
int plugin_python_t::call_py_handle_broken(long val)
{
PyObject *pDict = m_py_mod;
const char* func_name = "handle_broken";
PyObject *pFunc, *arglist, *pRetVal;
pFunc = PyDict_GetItemString(pDict, func_name);
if (!pFunc || !PyCallable_Check(pFunc))
{
logerror((PLUGIN_IMPL, "can't find function [%s]\n", func_name));
return -1;
}
arglist = Py_BuildValue("l", val);
pRetVal = PyObject_CallObject(pFunc, arglist);
Py_DECREF(arglist);
if (pRetVal)
{
Py_DECREF(pRetVal);
}
if (PyErr_Occurred())
{
PyErr_Print();
PyErr_Clear();
return -1;
}
return 0;
}
channel_ptr_t plugin_python_t::get_channel(long p)
{
map<long, channel_ptr_t>::iterator it = m_channel_mgr.find(p);
if (it != m_channel_mgr.end())
{
return it->second;
}
return NULL;
}
static plugin_lua_t* g_plugin_lua_obj = NULL;
static int channel_send_msg(lua_State* ls_)
{
long ptr = (long)luaL_checknumber(ls_, 1);
size_t len = 0;
const char* msg = luaL_checklstring(ls_, 2, &len);
channel_ptr_t c = g_plugin_lua_obj->get_channel(ptr);
if (c)
{
c->async_send(msg);
}
return 0;
}
plugin_lua_t::plugin_lua_t(const string& name_):
m_ls(NULL)
{
g_plugin_lua_obj = this;
string luapath = "./";
int pos = name_.find_last_of('/');
if (-1 == pos)
{
m_lua_name = name_;
}
else
{
m_lua_name = name_.substr(pos+1);
luapath = name_.substr(0, pos+1);
}
pos = m_lua_name.find_first_of('.');
m_lua_name = m_lua_name.substr(0, pos);
m_ls = lua_open();
lua_checkstack(m_ls, 20);
lua_pushcfunction(m_ls, channel_send_msg);
lua_setglobal(m_ls, "_tmp_func_");
luaL_dostring(m_ls, "channel = {} channel.send = _tmp_func_ _tmp_func_ = nil");
string lua_str = "package.path = package.path .. \"" + luapath + "?.lua\"";
luaL_openlibs(m_ls);
if (luaL_dostring(m_ls, lua_str.c_str()))
{
lua_pop(m_ls, 1);
}
m_lua_name = name_;
}
plugin_lua_t::~plugin_lua_t()
{
}
int plugin_lua_t::start()
{
if (load_lua_mod())
{
logerror((PLUGIN_IMPL, "can't find %s.lua\n", m_lua_name.c_str()));
return -1;
}
return 0;
}
int plugin_lua_t::stop()
{
return 0;
}
int plugin_lua_t::handle_broken(channel_ptr_t channel_)
{
m_channel_mgr.erase(long(channel_));
delete channel_;
return call_lua_handle_broken(long(channel_));
}
int plugin_lua_t::handle_msg(const message_t& msg_, channel_ptr_t channel_)
{
m_channel_mgr.insert(make_pair((long)channel_, channel_));
return call_lua_handle_msg((long)channel_, msg_.get_body());
}
int plugin_lua_t::load_lua_mod()
{
if (luaL_dofile(m_ls, m_lua_name.c_str()))
{
lua_pop(m_ls, 1);
return -1;
}
return 0;
}
int plugin_lua_t::call_lua_handle_msg(long val, const string& msg)
{
lua_checkstack(m_ls, 20);
lua_getglobal(m_ls, "handle_msg");
lua_pushnumber(m_ls, val);
lua_pushlstring(m_ls, msg.c_str(), msg.size());
if (lua_pcall(m_ls, 2, 0, 0) != 0)
{
lua_pop(m_ls, 1);
return -1;
}
return 0;
}
int plugin_lua_t::call_lua_handle_broken(long val)
{
lua_checkstack(m_ls, 20);
lua_getglobal(m_ls, "handle_broken");
lua_pushnumber(m_ls, val);
if (lua_pcall(m_ls, 1, 0, 0) != 0)
{
lua_pop(m_ls, 1);
return -1;
}
return 0;
}
channel_ptr_t plugin_lua_t::get_channel(long p)
{
map<long, channel_ptr_t>::iterator it = m_channel_mgr.find(p);
if (it != m_channel_mgr.end())
{
return it->second;
}
return NULL;
}