//MDSRank.cc
/***************************init初始化***************************/
void MDSRankDispatcher::init()
{
//Objecter初始化,并且添加到消息头部,然后启动
objecter->init();
messenger->add_dispatcher_head(objecter);
objecter->start();
//更新配置文件中log配置信息
update_log_config();
create_logger();
handle_osd_map();
progress_thread.create("mds_rank_progr");
purge_queue.init();
finisher->start();
}
/***************************************************************/
/***************************tick***************************/
void MDSRankDispatcher::tick()
{
//重置heartbeat超时时间,避免被monitor kill
heartbeat_reset();
if (beacon.is_laggy()) {
dout(5) << "tick bailing out since we seem laggy" << dendl;
return;
}
//从op_tracker中读取到所有in_flight的操作名称
check_ops_in_flight();
//唤醒progress_thread线程
progress_thread.signal();
// make sure mds log flushes, trims periodically
mdlog->flush();
//如果是active,stopping去除cache,client_leases,log
if (is_active() || is_stopping()) {
mdcache->trim();
mdcache->trim_client_leases();
mdcache->check_memory_usage();
mdlog->trim(); // NOT during recovery!
}
// 更新log
if (logger) {
logger->set(l_mds_subtrees, mdcache->num_subtrees());
mdcache->log_stat();
}
// ...
if (is_clientreplay() || is_active() || is_stopping()) {
server->find_idle_sessions();
locker->tick();
}
//如果处于reconnect 标记
if (is_reconnect())
server->reconnect_tick();
if (is_active()) {
balancer->tick();
mdcache->find_stale_fragment_freeze();
mdcache->migrator->find_stale_export_freeze();
if (snapserver)
snapserver->check_osd_map(false);
}
if (is_active() || is_stopping()) {
update_targets(ceph_clock_now());
}
// shut down?
if (is_stopping()) {
mdlog->trim();
if (mdcache->shutdown_pass()) {
uint64_t pq_progress = 0 ;
uint64_t pq_total = 0;
size_t pq_in_flight = 0;
if (!purge_queue.drain(&pq_progress, &pq_total, &pq_in_flight)) {
dout(7) << "shutdown_pass=true, but still waiting for purge queue"
<< dendl;
// This takes unbounded time, so we must indicate progress
// to the administrator: we do it in a slightly imperfect way
// by sending periodic (tick frequency) clog messages while
// in this state.
clog->info() << "MDS rank " << whoami << " waiting for purge queue ("
<< std::dec << pq_progress << "/" << pq_total << " " << pq_in_flight
<< " files purging" << ")";
} else {
dout(7) << "shutdown_pass=true, finished w/ shutdown, moving to "
"down:stopped" << dendl;
stopping_done();
}
}
else {
dout(7) << "shutdown_pass=false" << dendl;
}
}
// Expose ourselves to Beacon to update health indicators
beacon.notify_health(this);
}
/***********************************************************/
/***************************shutdown***************************/
void MDSRankDispatcher::shutdown()
{
// It should never be possible for shutdown to get called twice, because
// anyone picking up mds_lock checks if stopping is true and drops
// out if it is.
assert(stopping == false);
stopping = true;
dout(1) << __func__ << ": shutting down rank " << whoami << dendl;
//关闭定时器
timer.shutdown();
//关闭mdlog
mdlog->shutdown();
//关闭mdcache
mdcache->shutdown();
purge_queue.shutdown();
mds_lock.Unlock();
finisher->stop(); // no flushing
mds_lock.Lock();
//关闭objecter
if (objecter->initialized)
objecter->shutdown();
//关闭monclient
monc->shutdown();
//关闭op_tracker
op_tracker.on_shutdown();
//关闭progress_thread
progress_thread.shutdown();
// release mds_lock for finisher/messenger threads (e.g.
// MDSDaemon::ms_handle_reset called from Messenger).
mds_lock.Unlock();
//关闭messenger
messenger->shutdown();
mds_lock.Lock();
//删除handle
if (hb) {
g_ceph_context->get_heartbeat_map()->remove_worker(hb);
hb = NULL;
}
}
/***********************************************************/
/*****************************admin socket asok******************************/
bool MDSRankDispatcher::handle_asok_command();
//剔除用户
void MDSRankDispatcher::evict_clients(const SessionFilter &filter, MCommand *m);
bool MDSRank::evict_client(int64_t session_id, bool wait, bool blacklist, std::stringstream& err_ss,Context *on_killed);
//dump用户session
void MDSRankDispatcher::dump_sessions(const SessionFilter &filter, Formatter *f);
void MDSRankDispatcher::update_log_config();
Session *MDSRank::get_session(Message *m);
void MDSRank::command_scrub_path(Formatter *f, boost::string_view path, vector<string>& scrubop_vec);
void MDSRank::command_tag_path(Formatter *f, boost::string_view path, boost::string_view tag);
void MDSRank::command_flush_path(Formatter *f, boost::string_view path);
void MDSRank::command_flush_journal(Formatter *f);
void MDSRank::command_get_subtrees(Formatter *f);
void MDSRank::command_export_dir(Formatter *f, boost::string_view path, mds_rank_t target);
bool MDSRank::command_dirfrag_split(cmdmap_t cmdmap, std::ostream &ss);
bool MDSRank::command_dirfrag_merge(cmdmap_t cmdmap, std::ostream &ss);
bool MDSRank::command_dirfrag_ls(cmdmap_t cmdmap, std::ostream &ss, Formatter *f);
void MDSRank::dump_status(Formatter *f);
void MDSRank::dump_clientreplay_status(Formatter *f);
void MDSRank::create_logger();
/***************************************************************************/
/*****************************消息分发调度******************************/
bool MDSRankDispatcher::ms_dispatch(Message *m);
bool MDSRank::_dispatch(Message *m, bool new_msg)
{
//如果message不是mds发送过来,则直接返回
if (is_stale_message(m)) {
m->put();
return true;
}
//如果mds处于laggy状态,将消息放入waiting_for_nolaggy数组
if (beacon.is_laggy()) {
dout(10) << " laggy, deferring " << *m << dendl;
waiting_for_nolaggy.push_back(m);
}
//如果消息是新消息并且waiting_for_nolaggy数组不为空, 则放入waiting_for_nolaggy中
else if (new_msg && !waiting_for_nolaggy.empty()) {
dout(10) << " there are deferred messages, deferring " << *m << dendl;
waiting_for_nolaggy.push_back(m);
} else {
if (!handle_deferrable_message(m)) {
dout(0) << "unrecognized message " << *m << dendl;
return false;
}
heartbeat_reset();
}
...
//如果mds处于laggy状态,则直接返回
if (beacon.is_laggy()) {
// We've gone laggy during dispatch, don't do any
// more housekeeping
return true;
}
// done with all client replayed requests?
if (is_clientreplay() &&
mdcache->is_open() &&
replay_queue.empty() &&
beacon.get_want_state() == MDSMap::STATE_CLIENTREPLAY) {
int num_requests = mdcache->get_num_client_requests();
dout(10) << " still have " << num_requests << " active replay requests" << dendl;
if (num_requests == 0)
clientreplay_done();
}
...
update_mlogger();
return true;
}
//延期待处理的消息
bool MDSRank::handle_deferrable_message(Message *m)
{
int port = m->get_type() & 0xff00;
switch (port) {
//cache类型消息,由mdcache处理
case MDS_PORT_CACHE:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
mdcache->dispatch(m);
break;
//migrator类型消息,由migrator处理
case MDS_PORT_MIGRATOR:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
mdcache->migrator->dispatch(m);
break;
default:
//client session,slave消息,由server处理
switch (m->get_type()) {
// SERVER
case CEPH_MSG_CLIENT_SESSION:
case CEPH_MSG_CLIENT_RECONNECT:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
// fall-thru
case CEPH_MSG_CLIENT_REQUEST:
server->dispatch(m);
break;
case MSG_MDS_SLAVE_REQUEST:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
server->dispatch(m);
break;
//heartbeat消息,有balancer处理
case MSG_MDS_HEARTBEAT:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
balancer->proc_message(m);
break;
case MSG_MDS_TABLE_REQUEST:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
{
MMDSTableRequest *req = static_cast<MMDSTableRequest*>(m);
if (req->op < 0) {
MDSTableClient *client = get_table_client(req->table);
client->handle_request(req);
} else {
MDSTableServer *server = get_table_server(req->table);
server->handle_request(req);
}
}
break;
//lock消息,由locker处理
case MSG_MDS_LOCK:
case MSG_MDS_INODEFILECAPS:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
locker->dispatch(m);
break;
//client caps消息,由locker处理
case CEPH_MSG_CLIENT_CAPS:
case CEPH_MSG_CLIENT_CAPRELEASE:
case CEPH_MSG_CLIENT_LEASE:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
locker->dispatch(m);
break;
default:
return false;
}
}
return true;
}
void MDSRank::_advance_queues();
void MDSRank::heartbeat_reset();
/******************************************************************/
/*****************************消息发送******************************/
void MDSRank::send_message(Message *m, Connection *c);
void MDSRank::send_message_mds(Message *m, mds_rank_t mds);
void MDSRank::forward_message_mds(Message *m, mds_rank_t mds);
void MDSRank::send_message_client_counted(Message *m, client_t client);
void MDSRank::send_message_client_counted(Message *m, Connection *connection);
void MDSRank::send_message_client_counted(Message *m, Session *session);
void MDSRank::send_message_client(Message *m, Session *session);
/******************************************************************/
/*****************************类成员相关******************************/
int64_t MDSRank::get_metadata_pool();
MDSTableClient *MDSRank::get_table_client(int t);
MDSTableServer *MDSRank::get_table_server(int t);
utime_t MDSRank::get_laggy_until();
void MDSRank::request_state(MDSMap::DaemonState s);
/*******************************************************************/
/*****************************MDSRank状态相关******************************/
//自杀
void MDSRank::suicide();
//重生
void MDSRank::respawn();
//损坏
void MDSRank::damaged();
void MDSRank::damaged_unlocked();
void MDSRank::handle_write_error(int err)
{
//如果错误为-EBLACKLISTED,则重启MDS
if (err == -EBLACKLISTED) {
derr << "we have been blacklisted (fenced), respawning..." << dendl;
respawn();
return;
}
//如果mds_action_on_write_error大于等于2,则重启MDS
if (g_conf->mds_action_on_write_error >= 2) {
derr << "unhandled write error " << cpp_strerror(err) << ", suicide..." << dendl;
respawn();
} else if (g_conf->mds_action_on_write_error == 1) {
derr << "unhandled write error " << cpp_strerror(err) << ", force readonly..." << dendl;
mdcache->force_readonly();
} else {
// ignore;
derr << "unhandled write error " << cpp_strerror(err) << ", ignore..." << dendl;
}
}
//消息是否来着mds
bool MDSRank::is_stale_message(Message *m);
/********************************************************************/
/*****************************ProgressThread相关******************************/
void *MDSRank::ProgressThread::entry();
void MDSRank::ProgressThread::shutdown();
/***************************************************************************/
/*****************************boot相关******************************/
void MDSRank::boot_start(BootStep step, int r);
void MDSRank::validate_sessions();
void MDSRank::starting_done();
void MDSRank::boot_create();
oid MDSRank::creating_done();
/*****************************boot相关******************************/
/*****************************replay相关******************************/
void MDSRank::calc_recovery_set();
void MDSRank::replay_start();
void MDSRank::_standby_replay_restart_finish(int r, uint64_t old_read_pos);
void MDSRank::standby_replay_restart();
void MDSRank::replay_done();
/*******************************************************************/
/*****************************resolve相关******************************/
void MDSRank::reopen_log();
void MDSRank::resolve_start();
void MDSRank::resolve_done();
/*********************************************************************/
/*****************************reconnect相关******************************/
void MDSRank::reconnect_start();
void MDSRank::reconnect_done();
/***********************************************************************/
/*****************************rejoin相关******************************/
void MDSRank::rejoin_joint_start();
void MDSRank::rejoin_start();
void MDSRank::rejoin_done();
/********************************************************************/
/*****************************clientreplay相关******************************/
void MDSRank::clientreplay_start();
bool MDSRank::queue_one_replay();
void MDSRank::clientreplay_done();
/*************************************************************************/
/*****************************active相关******************************/
void MDSRank::active_start();
/********************************************************************/
/*****************************recovery相关******************************/
oid MDSRank::recovery_done(int oldstate);
/**********************************************************************/
/*****************************creating_相关******************************/
void MDSRank::creating_done();
/***********************************************************************/
/*****************************stopping相关******************************/
void MDSRank::stopping_start();
void MDSRank::stopping_done();
/**********************************************************************/
/*****************************handle_mds_map相关******************************/
void MDSRankDispatcher::handle_mds_map(MMDSMap *m, MDSMap *oldmap);
void MDSRank::handle_mds_recovery(mds_rank_t who);
void MDSRank::handle_mds_failure(mds_rank_t who);
/***************************************************************************/