ExevEnv{
AdmissionControllor(Scheduler)
}
ImpalaServer{
QueryHandle{
ClientRequestStatePointer... Init By ClientRequest...
QueryDriver{
ClientRequestState{
TExecRequet(Query Paln From Fe)Init By RunFrontendPlanner
Coordinator..
}
}
}
}
void ImpalaServer::query(beeswax::QueryHandle& beeswax_handle, const Query& query)
void ImpalaServer::executeAndWait(beeswax::QueryHandle& beeswax_handle,
const Query& query, const LogContextId& client_ctx)
void ImpalaServer::ExecuteStatementCommon(TExecuteStatementResp& return_val,
const TExecuteStatementReq& request, const TExecRequest* external_exec_request)
QueryHandle query_handle;
Status ImpalaServer::Execute(TQueryCtx* query_ctx, shared_ptr<SessionState> session_state,
QueryHandle* query_handle,
const TExecRequest* external_exec_request)
->Status ImpalaServer::ExecuteInternal(const TQueryCtx& query_ctx, const TExecRequest* external_exec_request, shared_ptr<SessionState> session_state, bool* registered_query, QueryHandle* query_handle)
//driver in query_handle
-> 1 Satic void QueryDriver::CreateNewDriver(ImpalaServer* impala_server, QueryHandle* query_handle,
const TQueryCtx& query_ctx, shared_ptr<ImpalaServer::SessionState> session_state) {
query_handle->query_driver_ = std::make_shared<QueryDriver>(impala_server);
query_handle->query_driver_->CreateClientRequestState( query_ctx, session_state, query_handle);
-> void QueryDriver::CreateClientRequestState(const TQueryCtx& query_ctx,
shared_ptr<ImpalaServer::SessionState> session_state, QueryHandle* query_handle) {
ExecEnv* exec_env = ExecEnv::GetInstance();
lock_guard<SpinLock> l(client_request_state_lock_);
exec_request_ = make_unique<TExecRequest>();
//exec_request_ not update , update behind RunFrontednPlanner
//ClientRequestState
client_request_state_ =
make_unique<ClientRequestState>(query_ctx, exec_env->frontend(), parent_server_,
session_state, exec_request_.get(), query_handle->query_driver().get());
-> {
exec_request_(exec_request),
frontend_(frontend),
parent_server_(server);
// 创建AdmissionControlClient 调用 ExecEnv AdminssionControl
AdmissionControlClient::Create(query_ctx_, &admission_control_client_)
void AdmissionControlClient::Create(
const TQueryCtx& query_ctx, unique_ptr<AdmissionControlClient>* client) {
if (ExecEnv::GetInstance()->AdmissionServiceEnabled()) {
client->reset(new RemoteAdmissionControlClient(query_ctx));
} else {
client->reset(new LocalAdmissionControlClient(query_ctx.query_id));
}
}
}
DCHECK(query_handle != nullptr);
//set client_request_state_ pointer to query_handle
(*query_handle).SetClientRequestState(client_request_state_.get());
}
}
->2 lock_guard<mutex> l(*(*query_handle)->lock());
RETURN_IF_ERROR(RegisterQuery(query_ctx.query_id, session_state, query_handle));
->3 RETURN_IF_ERROR(query_handle->query_driver()->RunFrontendPlanner(query_ctx)); Init ClientRequest TExecRequest ..
Status QueryDriver::RunFrontendPlanner(const TQueryCtx& query_ctx) {
RETURN_IF_ERROR(client_request_state_->UpdateQueryStatus(
ExecEnv::GetInstance()->frontend()->GetExecRequest(
query_ctx, exec_request_.get())));
return JniUtil::CallJniMethod(fe_, create_exec_request_id_, query_ctx, result);
->4 RETURN_IF_ERROR((*query_handle)->Exec()); //ClientRequetState Exec
-> RETURN_IF_ERROR(
ExecQueryOrDmlRequest(exec_request_->query_exec_request, true /*async*/ } {
if (isAsync) {
// Don't transition to PENDING inside the FinishExecQueryOrDmlRequest thread because
// the query should be in the PENDING state before the Exec RPC returns.
-> UpdateNonErrorExecState(ExecState::PENDING);
RETURN_IF_ERROR(Thread::Create("query-exec-state", "async-exec-thread",
&ClientRequestState::FinishExecQueryOrDmlRequest, this, &async_exec_thread_, true));
//admission_control_client_ SubmitForAdmission
-> Status admit_status = admission_control_client_->SubmitForAdmission(
{query_id_pb, ExecEnv::GetInstance()->backend_id(),
exec_request_->query_exec_request, exec_request_->query_options,
summary_profile_, blacklisted_executor_addresses_},
query_events_, &schedule_); {
//member ship
ClusterMembershipMgr::SnapshotPtr membership_snapshot = cluster_membership_mgr_->GetSnapshot();
//FindGroup
bool must_reject =
!FindGroupToAdmitOrReject(membership_snapshot, queue_node->pool_cfg,
/* admit_from_queue=*/false, stats, queue_node, unused_bool);
{
Status ret = ComputeGroupScheduleStates(membership_snapshot, queue_node);
{
vector<const ExecutorGroup*> executor_groups = GetExecutorGroupsForQuery(membership_snapshot->executor_groups, request);
//Scheduler -> Schedule
const Scheduler::ExecutorConfig group_config = {*executor_group, coord_desc};
RETURN_IF_ERROR(scheduler_->Schedule(group_config, group_state.get()));
{
RETURN_IF_ERROR(DebugAction(state->query_options(), "SCHEDULER_SCHEDULE"));
RETURN_IF_ERROR(ComputeScanRangeAssignment(executor_config, state));
ComputeFragmentExecParams(executor_config, state);
ComputeBackendExecParams(executor_config, state);
#ifndef NDEBUG
state->Validate();
#endif
state->set_executor_group(executor_config.group.name());
return Status::OK();
}
}
}
}
->//Create Coordinator And Exec
coord_.reset(new Coordinator(this, *exec_request_, *schedule_.get(), query_events_));
Status exec_status;
exec_status = coord_->Exec(); {
//CreateQueryState
query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(
query_ctx(), exec_params_.query_schedule().coord_backend_mem_limit());
//MemTracker
filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
-1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(),
//Coordinator 于 Scheduler 匹配...
InitFragmentStats();
// create BackendStates and per-instance state, including profiles, and install
// the latter in the FragmentStats' root profile
InitBackendStates();
exec_summary_.Init(exec_params..)
//StartBackend
RETURN_IF_ERROR(StartBackendExec());
//...
RETURN_IF_ERROR(FinishBackendStartup());
}
} else {
// Update query_status_ as necessary.
FinishExecQueryOrDmlRequest();
return query_status_;
}
}
// Wait
status = query_handle->WaitAsync();
// Unregister
discard_result(UnregisterQuery(query_handle->query_id(), false, &status));
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。