Spark 中有很多异步处理的例子,每一个地方都值得好好去审视一番,对辅助理解 spark 的机理以及为自己写出优雅的代码都会有很大的帮助。
NettyRpcEnv
是RpcEnv
的在 spark 中的唯一一个实现。RpcEnv
是什么呢,可以先看一下它的 class 头信息
/** * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote * nodes, and deliver them to corresponding [[RpcEndpoint]]s. For uncaught exceptions caught by * [[RpcEnv]], [[RpcEnv]] will use [[RpcCallContext.sendFailure]] to send exceptions back to the * sender, or logging them if no such sender or `NotSerializableException`. * * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. */
复制代码
就是一句话,RPC 的环境。在这里,最重要的 2 个操作莫过于
RpcEndpoint
RpcEndpointRef
而RpcEndpoint
和RpcEndpointRef
是什么呢,在这里不做详细赘述,其他的文章中会详细说明,简单来讲一下
RpcEndpoint
和RpcEndpointRef
RpcEndpoint
executor
,driver
等角色,他们之间的通信都采用利用 Netty,在 executor 或者 driver 上并不是只启动 1 个 Netty 的服务,针对不同的功能会有多个 Netty 的 RPC 服务开启,利用不同的端口号进行区分。服务间通信后,通的“信”被很多种逻辑单元来处理,如Inbox
,如EventLoop
等,这些都是工具级别的单元,而被抽象出来作为可插拔可扩展的大的逻辑功能模块在 Spark 中就叫做RpcEndpoint
,它是用来处理从其他client
端发送或者server
端返回过来的message
的模块。RpcEndpoint
本身是一个 trait,它可以有多种的实现RpcEndpointRef
RpcEndpoint
的 Ref,即RpcEndpointRef
。这个概念乍一看有点懵,试想,从 A 发送消息到 B,能发送的前提是 A 先拥有了一个 B 的”引用“,这在普通的 Http 服务中貌似很不能被理解,我想访问某一台机器按说只需要知道对方的 IP 和 Port 不就 OK 了,现在还需要对方的一个“替身”?这是什么鬼?带着问题我们可以持续往下看即可,这里你只需要这样意识即可:RpcEndpointRef
你理解成就是 B machine 的 IP 和 Port 的一个被包装后的实例即可RpcEndpoint
和RpcEndpointRef
RpcEndpointRef
,通过它发送消息到 B machinek,顾名思义——问。可能是打个招呼,看看在不在,询问一下,等等。这个就是 NettyRpcEnv.ask 的作用所在。为了讲 NettyRpcEnv.ask 的作用,还需要简单的串一下一下概念和流程
首先,需要明确两个事情,在 yarn 环境下
Driver
是在ApplicatioMaster
进程中执行的一个线程Driver
其实是在用户的 class 的时候,在形成sparkContext
上下文环境的一个产物,本身执行的其实是用户 class 线程,在这个线程中建立了SparkEnv
以及RpcEnv
等等,并且建立了Driver
的 Netty 的 Service 等等,与Executor
相互通信Executor
则是一个个的进程,通过 java 命令在每一个节点上启动的Yarn 系列以及 ApplicatioMaster
是什么这里不做赘述,其他文章中会细讲。
其次,在这里只需要了解到,Driver
本身是一个协调调度节点,它可以去分配任务给Executor
,并且掌握着Executor
的情况,分配就是把 Task 发送给Executor
,掌握则指的是需要知道Executor
的运行情况等等。
Driver
和 2 个Executor
进行交互通信,Driver
手握 2 个Executor
(一个叫做 E1,一个叫做 E2)的RpcEndpointRef
,姑且简称为 E1Ref 和 E2Ref,通过这 2 个 Ref 发送 msg 到 E1 节点和 E2 节点,这 2 个节点本身通过自身的RpcEndpoint
来处理 msg。而 E1 和 E2 本身还要定期起的向 Driver 汇报自身的情况,这里叫做 heartbeat 心跳,那么反过来则是利用各自内部掌握的DriverEpcEndpointRef
来发送 heartbeat 到Driver
,而Driver
利用其自己的DriverRpcEndpoint
来处理 heartbeat 的 msg。所有节点的上面的组建则都在自身的NettyRpcEnv
中,也就是RpcEnv
的实现。终于要说到本篇的内容了,NettyRpcEnv.ask 的解读,需要有一个场景调用 NettyRpcEnv.ask 的方法才可以,那可以在题中所述的RpcEnv
中建立一个DriverRpcEndpointRef
这个场景中描述
上面的【图 4】介绍了一个 Driver 和和 Executor 之间通信的过程。其实,在ApplicationMaster
中构建Driver
线程的时候,有一部分的通信是需要通过DriverRpcEndpointRef
进行的,即利用DriverRpcEndpointRef
发送 msg 给DriverRpcEndpoint
,DriverRpcEndpoint
做出处理并响应
ApplicationMaster
中启动【Run】Driver 的线程后,从 Driver 线程中拿到了NettyRpcEnv
,NettyRpcEnv
的setupEndpointRef
方法【Get】到两个DriverEndpointRef
DriverEndpointRef
去访问 Driver 的DriverEndpoint
ApplicationMaster
的节点本身也是Driver
的节点,其实访问Driver
的DriverEndpoint
按说是可以直接访问的(Spark 源代码中没有这样实现,还是为了隔离和封装的更好,减少耦合,今后Driver
如果作为进程执行,不再ApplicationMaster
上运行也会修改的较为简单),但是这里还是采用了 Netty 的 Rpc 访问方式这部分代码在ApplicationMaster.scala
中,关注方法runDriver
即可
ApplicationMaster
ApplicationMaster
节点上继续调用 RpcEnv.setupEndpointRef,目的是 setup 一个Driver
的DriverEndpointRef
到RpcEnv
中,这个 setup 的过程就是去 10.1.2.5:13200 访问一下,如果服务通了,则构建出DriverEndpointRef
,这个“访问一下”即本文所述要用到的 NettyRpcEnv.ask 的方法。 private def runDriver(): Unit = { addAmIpFilter(None)
/* 这里,调用startUserApplication方法来执行用户的class,也就是我们的jar包, invoke我们的main方法,从而启动了sparkContext,内部启动一系列的scheduler以及 backend,以及taskscheduler等等等等core的内容,其他篇章会详细讲解 */ userClassThread = startUserApplication()
// This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. logInfo("Waiting for spark context initialization...") val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME) try {
/* 这里,阻塞的等待SparkContext从Driver线程中返回回来 */ val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS)) if (sc != null) { rpcEnv = sc.env.rpcEnv
val userConf = sc.getConf val host = userConf.get("spark.driver.host") val port = userConf.get("spark.driver.port").toInt registerAM(host, port, userConf, sc.ui.map(_.webUrl))
/* **这里,上演了好戏,通过NettyRpcEnv的setupEndpointRef方法来获取到driverRef 这个里面其实是去ask一下Driver你在吗?是否存在这个Driver的服务,如果存在,则 返回OK,构建出Driver的Ref** */ val driverRef = rpcEnv.setupEndpointRef( RpcAddress(host, port), YarnSchedulerBackend.ENDPOINT_NAME) createAllocator(driverRef, userConf) } else { // Sanity check; should never happen in normal operation, since sc should only be null // if the user app did not create a SparkContext. throw new IllegalStateException("User did not initialize spark context!") } resumeDriver() userClassThread.join() } catch { case e: SparkException if e.getCause().isInstanceOf[TimeoutException] => logError( s"SparkContext did not initialize after waiting for $totalWaitTime ms. " + "Please check earlier log output for errors. Failing the application.") finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_SC_NOT_INITED, "Timed out waiting for SparkContext.") } finally { resumeDriver() } }
复制代码
如何理解 Future 呢,从字面意思可以很好的理解,Future 即未来,也是期货的意思。
说到期货,就充满了不确定性,因为毕竟没有发生,谁也不知道未来会怎样。所以,定义一个 Future 就是定义了一个不在现在这个时空(线程)发生的(未来)的另一个(另一个线程的)事件,相比 java 的鸡肋的 Future,scala 的 Future 可谓是非常优雅且完美,搜索我的博客可以看到针对 scala 的 Future 的详细介绍。
import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future
/** * 解读Future的基础 */ object DocFutureTest {
def apply(): Unit = { println("I am DocFutureTest") }
def main(args: Array[String]): Unit = {
val sleeping = 3000; val main_thread = Thread.currentThread().getName; /* 定义另一个线程发生的事件 这个事件相当于java中的如下的代码块: 从整体的间接性上看,scala的更为优雅一些,直接一个Future可以包裹住左右需要处理的内容 后续如果需要进行异常处理的话还可以根据Success和Failture进行模式匹配 public class JavaThreading { public static void main(String[] args) throws InterruptedException { new Thread( () -> System.out.println("这是一条发生在另一个叫做叫做" + Thread.currentThread().getName() + " 线程的故事") ).start(); System.out.println(Thread.currentThread().getName()); Thread.sleep(3000); } } */ var future_run = Future { Thread.sleep(1000) println("这是一条发生在另一个叫做叫做" + Thread.currentThread().getName +" 线程的故事") }
// 主线程休息3000ms // 如果不休息的话,main线程会先停止,导致上面的Future定义的thread还没有被执行到就结束了 Thread.sleep(sleeping) println(s"$main_thread 线程休息 $sleeping 毫秒")
}
}
复制代码
case class ExceptionError(error: String) extends Exception(error)
def main(args: Array[String]): Unit = {
val sleeping = 3000; val main_thread = Thread.currentThread().getName;
// 定义另一个线程发生的事件 var future_run = Future { Thread.sleep(1000) prntln("这是一条发生在另一个叫做叫做" + Thread.currentThread().getName + " 线程的故事") // 如果需要onFailure的话 则释放此句 // throw ExceptionError("error")
future_run onFailure { case t => println("exception " + t.getMessage) }
future_run onSuccess { case _ => println("success") }
复制代码
start()
一样onFailure
的处理,那么可能看不到异常被抛出来,这点和 java 有较大区别从皮毛简单的说完了 Future,那 Promise 又是什么呢?其实在 Future 的实现中包含了 Promise 的实现,也就是说没有 Promise,Future 是无法被运行的。从字面的理解,Promise 是承诺,有了 Future 的未来的定义,那么需要给出一个确切的承诺才可以进行,否则都是空口无凭天马行空无法兑现的大话。
说到现在,包括看完以上的 Future 的介绍,很多人肯定还是懵 b 状态,因为我刚开始接触的时候也是这样,但我喜欢的就是用最直观的图和想象来描述一个抽象的问题,二话不说,继续上图
map
操作打印出一句话 future:.... import scala.concurrent.Promise import scala.util.{Failure, Success}
object PromiseTest { def main(args: Array[String]): Unit = { import scala.concurrent.ExecutionContext.Implicits.global val promise = Promise[String] promise.future.onComplete(v => println("onComplete " + v)) promise.future.map(str => println("future: " + str + " ==> " + Thread.currentThread().getName)) promise.future.failed.foreach(e => println(e + " ==> " + Thread.currentThread().getName)) Thread.sleep(3000) }
}
复制代码
promise.trySuccess
的处理promise.future.onComplete
onComplete
处理promise.future.map
promise.future.onComplete
之后对 Future 进行的继续的 map 处理promise.trySuccess
import scala.concurrent.Promise import scala.util.{Failure, Success}
object PromiseTest { def main(args: Array[String]): Unit = { import scala.concurrent.ExecutionContext.Implicits.global val promise = Promise[String] promise.future.onComplete(v => println("onComplete " + v)) promise.future.map(str => println("future: " + str + " ==> " + Thread.currentThread().getName)) promise.future.failed.foreach(e => println(e + " ==> " + Thread.currentThread().getName)) **promise.trySuccess("try success " + " --> " + Thread.currentThread().getName)** Thread.sleep(3000) }
}
复制代码
其实,讲完了上面的所有的内容后,ask 的代码感觉几句话就可以讲解完毕了。
ask 本身返回的是 Future,本身是异步处理
TransportResponseHandler
中进行判断处理,并且调用 listener 的 onSuccess 方法,这个 onSuccess 方法则是下面的 ask 代码中定义的方法。在这个方法中本身又去执行了 promise 的 tryComplete,从而触发了 promise 的 future 之路执行private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = { // 定义了一个Any的promise val promise = Promise[Any]() val remoteAddr = message.receiver.address
def onFailure(e: Throwable): Unit = { if (!promise.tryFailure(e)) { e match { case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e") case _ => logWarning(s"Ignored failure: $e") } } }
/* 这里声明的onSuccess会被填充到RpcResponseCallback的onSuccess中,这个 RpcResponseCallback就是上面【图9】中的listener,当我们从Server端获取到response后 注意,获取的不是RpcFailure类型的response,则都会进入到【图9】的 else if (message instanceof RpcResponse) { 分支中
*/ def onSuccess(reply: Any): Unit = reply match { case RpcFailure(e) => onFailure(e) case rpcReply => /* 当返回的response是OK的没有问题后,onSuccess被callback,这里promise的trySuccess也 进行call操作,这里就是上面所说的,为了一个promise铺设了一条future,从而可以执行 这个Future的线程了 */ if (!promise.trySuccess(rpcReply)) { logWarning(s"Ignored message: $reply") } }
try { if (remoteAddr == address) { val p = Promise[Any]() p.future.onComplete { case Success(response) => onSuccess(response) case Failure(e) => onFailure(e) }(ThreadUtils.sameThread) dispatcher.postLocalMessage(message, p) } else { val rpcMessage = RpcOutboxMessage(message.serialize(this), onFailure, (client, response) => **onSuccess**(deserialize[Any](client, response))) postToOutbox(message.receiver, rpcMessage) /* 如果是callback了Failure,则这里会被执行 */ promise.future.failed.foreach { case _: TimeoutException => rpcMessage.onTimeout() case _ => }(ThreadUtils.sameThread) }
val timeoutCancelable = timeoutScheduler.schedule(new Runnable { override def run(): Unit = { onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " + s"in ${timeout.duration}")) } }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
/* 当promise的future执行后,会调用这里的onComplete方法 */ promise.future.onComplete { v => timeoutCancelable.cancel(true) }(ThreadUtils.sameThread) } catch { case NonFatal(e) => onFailure(e) }
/* 利用RpcTimeout中的addMessageIfTimeout的偏函数再去模式匹配一下产生的Throwable内容 如果是RpcTimeoutException 则 直接throw这个ex 如果是TimeoutException 则包装成RpcTimeoutException后再throw出去 */ promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread) }
复制代码
本篇用小篇幅讲解了一下*o.a.s.rpc.netty.NettyRpcEnv.ask()*的方法,简单描述了一个 spark 的异步处理的小 case,这个小 case 需要不少的先验知识点,可能突然间看到这里有点懵,学习需要融会贯通一点点的来积累才可以,如果不明白可以慢慢积累其他模块的知识再来这里看流水账会更有收获。
领取专属 10元无门槛券
私享最新 技术干货