前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Impala ImpalaServer QueryHander, ClientRequestState, Coordinator, Scheduler 关系

Impala ImpalaServer QueryHander, ClientRequestState, Coordinator, Scheduler 关系

原创
作者头像
jasong
修改2024-06-06 16:08:40
1400
修改2024-06-06 16:08:40
举报
文章被收录于专栏:ImpalaImpala

一 关系

代码语言:c
复制
ExevEnv{
  AdmissionControllor(Scheduler)
}
ImpalaServer{
  QueryHandle{
    ClientRequestStatePointer... Init By ClientRequest...
    QueryDriver{
      ClientRequestState{
        TExecRequet(Query Paln From Fe)Init By RunFrontendPlanner
        Coordinator..
      }
    }
  }
}

二 SQL 发起函数

代码语言:cpp
复制
void ImpalaServer::query(beeswax::QueryHandle& beeswax_handle, const Query& query)
void ImpalaServer::executeAndWait(beeswax::QueryHandle& beeswax_handle,
    const Query& query, const LogContextId& client_ctx)
void ImpalaServer::ExecuteStatementCommon(TExecuteStatementResp& return_val,
    const TExecuteStatementReq& request, const TExecRequest* external_exec_request)

三 执行过程

代码语言:cpp
复制
QueryHandle query_handle;
Status ImpalaServer::Execute(TQueryCtx* query_ctx, shared_ptr<SessionState> session_state,
    QueryHandle* query_handle,
    const TExecRequest* external_exec_request) 
  ->Status ImpalaServer::ExecuteInternal(const TQueryCtx& query_ctx, const TExecRequest* external_exec_request, shared_ptr<SessionState> session_state, bool* registered_query, QueryHandle* query_handle)
     //driver in query_handle 
    -> 1 Satic void QueryDriver::CreateNewDriver(ImpalaServer* impala_server, QueryHandle* query_handle,
          const TQueryCtx& query_ctx, shared_ptr<ImpalaServer::SessionState> session_state) {
          query_handle->query_driver_ = std::make_shared<QueryDriver>(impala_server);
          query_handle->query_driver_->CreateClientRequestState( query_ctx, session_state, query_handle);
          	-> void QueryDriver::CreateClientRequestState(const TQueryCtx& query_ctx,
              shared_ptr<ImpalaServer::SessionState> session_state, QueryHandle* query_handle) {
              ExecEnv* exec_env = ExecEnv::GetInstance();
              lock_guard<SpinLock> l(client_request_state_lock_);
              exec_request_ = make_unique<TExecRequest>();
              //exec_request_ not update , update behind RunFrontednPlanner
              //ClientRequestState
              client_request_state_ =
                  make_unique<ClientRequestState>(query_ctx, exec_env->frontend(), parent_server_,
                      session_state, exec_request_.get(), query_handle->query_driver().get());
                -> {
                   exec_request_(exec_request),
                    frontend_(frontend),
                    parent_server_(server);
                 
                    // 创建AdmissionControlClient 调用 ExecEnv AdminssionControl
                    AdmissionControlClient::Create(query_ctx_, &admission_control_client_)
                    void AdmissionControlClient::Create(
                      const TQueryCtx& query_ctx, unique_ptr<AdmissionControlClient>* client) {
                    if (ExecEnv::GetInstance()->AdmissionServiceEnabled()) {
                      client->reset(new RemoteAdmissionControlClient(query_ctx));
                    } else {
                      client->reset(new LocalAdmissionControlClient(query_ctx.query_id));
                    }
                  }
                }
              DCHECK(query_handle != nullptr);
              //set client_request_state_ pointer to query_handle 
              (*query_handle).SetClientRequestState(client_request_state_.get());
            }
        }


   ->2 lock_guard<mutex> l(*(*query_handle)->lock());
      RETURN_IF_ERROR(RegisterQuery(query_ctx.query_id, session_state, query_handle));


   ->3 RETURN_IF_ERROR(query_handle->query_driver()->RunFrontendPlanner(query_ctx)); Init ClientRequest TExecRequest ..
      Status QueryDriver::RunFrontendPlanner(const TQueryCtx& query_ctx) {
        RETURN_IF_ERROR(client_request_state_->UpdateQueryStatus(
        ExecEnv::GetInstance()->frontend()->GetExecRequest(
            query_ctx, exec_request_.get())));
      return JniUtil::CallJniMethod(fe_, create_exec_request_id_, query_ctx, result);

    
    
   ->4  RETURN_IF_ERROR((*query_handle)->Exec()); //ClientRequetState Exec 
      -> RETURN_IF_ERROR(
          ExecQueryOrDmlRequest(exec_request_->query_exec_request, true /*async*/ } {
             if (isAsync) {
              // Don't transition to PENDING inside the FinishExecQueryOrDmlRequest thread because
              // the query should be in the PENDING state before the Exec RPC returns.
              -> UpdateNonErrorExecState(ExecState::PENDING);
              RETURN_IF_ERROR(Thread::Create("query-exec-state", "async-exec-thread",
                  &ClientRequestState::FinishExecQueryOrDmlRequest, this, &async_exec_thread_, true));
                 //admission_control_client_ SubmitForAdmission
                 -> Status admit_status = admission_control_client_->SubmitForAdmission(
                      {query_id_pb, ExecEnv::GetInstance()->backend_id(),
                          exec_request_->query_exec_request, exec_request_->query_options,
                          summary_profile_, blacklisted_executor_addresses_},
                      query_events_, &schedule_); {
                   //member ship 
                    ClusterMembershipMgr::SnapshotPtr membership_snapshot = cluster_membership_mgr_->GetSnapshot();
                   //FindGroup 
                     bool must_reject =
                      !FindGroupToAdmitOrReject(membership_snapshot, queue_node->pool_cfg,
            /* admit_from_queue=*/false, stats, queue_node, unused_bool);
                   {
                       Status ret = ComputeGroupScheduleStates(membership_snapshot, queue_node);
                     {
                         vector<const ExecutorGroup*> executor_groups = GetExecutorGroupsForQuery(membership_snapshot->executor_groups,                               request);
                          //Scheduler -> Schedule 
                          const Scheduler::ExecutorConfig group_config = {*executor_group, coord_desc};
                          RETURN_IF_ERROR(scheduler_->Schedule(group_config, group_state.get())); 
                       {

                          RETURN_IF_ERROR(DebugAction(state->query_options(), "SCHEDULER_SCHEDULE"));
                          RETURN_IF_ERROR(ComputeScanRangeAssignment(executor_config, state));
                          ComputeFragmentExecParams(executor_config, state);
                          ComputeBackendExecParams(executor_config, state);
                        #ifndef NDEBUG
                          state->Validate();
                        #endif
                          state->set_executor_group(executor_config.group.name());
                          return Status::OK();
                       }
                     }
                   }
                 }
                   
                  
                 ->//Create Coordinator  And Exec  
                  coord_.reset(new Coordinator(this, *exec_request_, *schedule_.get(), query_events_));
                  Status exec_status;
   
                    exec_status = coord_->Exec(); {
                    //CreateQueryState
                    query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(
                        query_ctx(), exec_params_.query_schedule().coord_backend_mem_limit());
                    //MemTracker 
                    filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
                        -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), 
                      
                      
                    //Coordinator 于 Scheduler 匹配...  
                    InitFragmentStats();
                    // create BackendStates and per-instance state, including profiles, and install
                    // the latter in the FragmentStats' root profile
                    InitBackendStates();
                    exec_summary_.Init(exec_params..)
                      //StartBackend
                  	RETURN_IF_ERROR(StartBackendExec());
                      //...
                    RETURN_IF_ERROR(FinishBackendStartup());
                  }
            } else {
              // Update query_status_ as necessary.
              FinishExecQueryOrDmlRequest();
              return query_status_;
            }
          }
                         
     
     

 // Wait 
 status = query_handle->WaitAsync();
    
 // Unregister
 discard_result(UnregisterQuery(query_handle->query_id(), false, &status));

简化图

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一 关系
  • 二 SQL 发起函数
  • 三 执行过程
  • 简化图
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档