本节的主要内容:
一、ReceiverTracker的架构设计
二、消息循环系统
三、ReceiverTracker具体实现
Spark Streaming作为Spark Core基础 架构之上的一个应用程序,其中的ReceiverTracker接收到数据之后,具体该怎么进行数据处理呢?看源码ReceiverSupervisorImpl这个类:
/**
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]] * which provides all the necessary functionality for handling the data received by * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]] * object that is used to divide the received data stream into blocks of data. */ private[streaming] class ReceiverSupervisorImpl( receiver: Receiver[_], env: SparkEnv, hadoopConf: Configuration, checkpointDirOption: Option[String] ) extends ReceiverSupervisor(receiver, env.conf) with Logging { private val host = SparkEnv.get.blockManager.blockManagerId.host private val executorId = SparkEnv.get.blockManager.blockManagerId.executorId private val receivedBlockHandler: ReceivedBlockHandler = { if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { if (checkpointDirOption.isEmpty) { throw new SparkException( "Cannot enable receiver write-ahead log without checkpoint directory set. " + "Please use streamingContext.checkpoint() to set the checkpoint directory. " + "See documentation for more details.") } new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) } else { new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) } }
从源码中可以看出,写接收后的数据是通过ReceivedBlockHandler的对象receivedBlockHandler写的,写的数据的过程有二种方式:
1、基于WAL方式进行容错写
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
2、直接写(相对不安全)
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
然后存储数据完成后并报告给Driver进行存储,如下所示:
def pushAndReportBlock( receivedBlock: ReceivedBlock, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { val blockId = blockIdOption.getOrElse(nextBlockId) val time = System.currentTimeMillis val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId") }
用于汇报给Driver的消息类、如下所示:
/** Remote RpcEndpointRef for the ReceiverTracker */
private val trackerEndpoint = RpcUtils.makeDriverRef("ReceiverTracker", env.conf, env.rpcEnv) Driver端接收消息处理类为ReceiverTracker,通过如下方法接收元数据信息并存储的代码如下:
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { // Remote messages case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) => val successful = registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress) context.reply(successful) case AddBlock(receivedBlockInfo) => if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) { walBatchingThreadPool.execute(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { if (active) { context.reply(addBlock(receivedBlockInfo)) } else { throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.") } } }) } else { context.reply(addBlock(receivedBlockInfo)) } case DeregisterReceiver(streamId, message, error) => deregisterReceiver(streamId, message, error) context.reply(true) // Local messages case AllReceiverIds => context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq) case StopAllReceivers => assert(isTrackerStopping || isTrackerStopped) stopReceivers() context.reply(true) }
根据上述代码总结如下:
1、ReceiverSupervisorImpl中有ReceiverTracker的消息通信体,能进行与ReceiverTracker的通信。
2、ReceiverSupervisorImpl将数据的元数据信息汇报给ReceiverTracker。
接下来,我们再进入ReceiverTracker:
Receiver的三种状态:非活跃状态,正在执行调度任务状态,活跃状态。
/** Enumeration to identify current state of a Receiver 几种不同的状态 */ private[streaming] object ReceiverState extends Enumeration { type ReceiverState = Value val INACTIVE, SCHEDULED, ACTIVE = Value }
调用ReceiverBlockTracker的getReceivedBlockQueue方法,其中streamIdToUnallocatedBlockQueues为HashMap,Key为StreamID,Value为ReceivedBlockQueue。而ReceivedBlockQueue 的定义为private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
ReceiverBlockTracker类,可以从源码中看出,他会记录所有接收到的Block信息,根据需要把Block分配给Batch。如果设置了checkpoint,开启WAL,则会把所有的操作保存到预写日志中,因此当Driver失败后就可以从checkpoint和WAL中恢复ReceiverTracker的状态。
ReceiverBlockTracker类中重要的方法,allocateBlocksToBatch。private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]存储批处理时刻,分配到的Blocks数据。
eceiver接收到数据,然后合并并存储数据之后,ReceiverSupervisorImpl会把Block的元数据汇报给ReceiverTracker内部的消息通信体ReceiverTrackerEndpoint。
ReceiverTracker接收到Block的元数据信息之后,由ReceivedBlockTracker管理Block的元数据的分配,JobGenerator会将每个Batch,从ReceivedBlockTracker中获取属于该Batch的Block元数据信息来生成RDD。
从设计模式来讲:ReceiverTrackerEndpoint和ReceivedBlockTracker是门面设计模式,内部实际干事情的是ReceivedBlockTracker,外部通信体或者代表者就是ReceiverTrackerEndpoint。
/** * 管理receiver的:启动、执行、重新启动 * 确定所有的输入流记录,有成员记录所有输入来源 * 需要输入流,为每个输入流启动一个receiver * This class manages the execution of the receivers of ReceiverInputDStreams. Instance of * this class must be created after all input streams have been added and StreamingContext.start() * has been called because it needs the final set of input streams at the time of instantiation. *dirver端 * @param skipReceiverLaunch Do not launch the receiver. This is useful for testing. */ private[streaming] class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging { private val receiverInputStreams = ssc.graph.getReceiverInputStreams() private val receiverInputStreamIds = receiverInputStreams.map { _.id } private val receivedBlockTracker = new ReceivedBlockTracker( ssc.sparkContext.conf, ssc.sparkContext.hadoopConfiguration, receiverInputStreamIds, ssc.scheduler.clock, ssc.isCheckpointPresent, Option(ssc.checkpointDir) ) private val listenerBus = ssc.scheduler.listenerBus /** Enumeration to identify current state of the ReceiverTracker */ object TrackerState extends Enumeration { type TrackerState = Value val Initialized, Started, Stopping, Stopped = Value } import TrackerState._ /** State of the tracker. Protected by "trackerStateLock" */ @volatile private var trackerState = Initialized // endpoint is created when generator starts. // This not being null means the tracker has been started and not stopped private var endpoint: RpcEndpointRef = null private val schedulingPolicy = new ReceiverSchedulingPolicy() // Track the active receiver job number. When a receiver job exits ultimately, countDown will // be called. private val receiverJobExitLatch = new CountDownLatch(receiverInputStreams.size) /** * Track all receivers' information. The key is the receiver id, the value is the receiver info. * It's only accessed in ReceiverTrackerEndpoint. */ private val receiverTrackingInfos = new HashMap[Int, ReceiverTrackingInfo] /** * Store all preferred locations for all receivers. We need this information to schedule * receivers. It's only accessed in ReceiverTrackerEndpoint. */ private val receiverPreferredLocations = new HashMap[Int, Option[String]] /** Start the endpoint and receiver execution thread. */ def start(): Unit = synchronized { if (isTrackerStarted) { throw new SparkException("ReceiverTracker already started") } if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) if (!skipReceiverLaunch) launchReceivers() logInfo("ReceiverTracker started") trackerState = Started } } /** Stop the receiver execution thread. */ def stop(graceful: Boolean): Unit = synchronized { if (isTrackerStarted) { // First, stop the receivers trackerState = Stopping if (!skipReceiverLaunch) { // Send the stop signal to all the receivers endpoint.askWithRetry[Boolean](StopAllReceivers) // Wait for the Spark job that runs the receivers to be over // That is, for the receivers to quit gracefully. receiverJobExitLatch.await(10, TimeUnit.SECONDS) if (graceful) { logInfo("Waiting for receiver job to terminate gracefully") receiverJobExitLatch.await() logInfo("Waited for receiver job to terminate gracefully") } // Check if all the receivers have been deregistered or not val receivers = endpoint.askWithRetry[Seq[Int]](AllReceiverIds) if (receivers.nonEmpty) { logWarning("Not all of the receivers have deregistered, " + receivers) } else { logInfo("All of the receivers have deregistered successfully") } } // Finally, stop the endpoint ssc.env.rpcEnv.stop(endpoint) endpoint = null receivedBlockTracker.stop() logInfo("ReceiverTracker stopped") trackerState = Stopped } } /** Allocate all unallocated blocks to the given batch. */ def allocateBlocksToBatch(batchTime: Time): Unit = { if (receiverInputStreams.nonEmpty) { receivedBlockTracker.allocateBlocksToBatch(batchTime) } } /** Get the blocks for the given batch and all input streams. */ def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = { receivedBlockTracker.getBlocksOfBatch(batchTime) } /** Get the blocks allocated to the given batch and stream. */ def getBlocksOfBatchAndStream(batchTime: Time, streamId: Int): Seq[ReceivedBlockInfo] = { receivedBlockTracker.getBlocksOfBatchAndStream(batchTime, streamId) } /** * Clean up the data and metadata of blocks and batches that are strictly * older than the threshold time. Note that this does not */ def cleanupOldBlocksAndBatches(cleanupThreshTime: Time) { // Clean up old block and batch metadata receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false) // Signal the receivers to delete old block data if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { logInfo(s"Cleanup old received batch data: $cleanupThreshTime") synchronized { if (isTrackerStarted) { endpoint.send(CleanupOldBlocks(cleanupThreshTime)) } } } } /** Register a receiver */ private def registerReceiver( streamId: Int, typ: String, host: String, executorId: String, receiverEndpoint: RpcEndpointRef, senderAddress: RpcAddress ): Boolean = { if (!receiverInputStreamIds.contains(streamId)) { throw new SparkException("Register received for unexpected id " + streamId) } if (isTrackerStopping || isTrackerStopped) { return false } val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations val acceptableExecutors = if (scheduledLocations.nonEmpty) { // This receiver is registering and it's scheduled by // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it. scheduledLocations.get } else { // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it. scheduleReceiver(streamId) } def isAcceptable: Boolean = acceptableExecutors.exists { case loc: ExecutorCacheTaskLocation => loc.executorId == executorId case loc: TaskLocation => loc.host == host } if (!isAcceptable) { // Refuse it since it's scheduled to a wrong executor false } else { val name = s"${typ}-${streamId}" val receiverTrackingInfo = ReceiverTrackingInfo( streamId, ReceiverState.ACTIVE, scheduledLocations = None, runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)), name = Some(name), endpoint = Some(receiverEndpoint)) receiverTrackingInfos.put(streamId, receiverTrackingInfo) listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo)) logInfo("Registered receiver for stream " + streamId + " from " + senderAddress) true } } /** Deregister a receiver */ private def deregisterReceiver(streamId: Int, message: String, error: String) { val lastErrorTime = if (error == null || error == "") -1 else ssc.scheduler.clock.getTimeMillis() val errorInfo = ReceiverErrorInfo( lastErrorMessage = message, lastError = error, lastErrorTime = lastErrorTime) val newReceiverTrackingInfo = receiverTrackingInfos.get(streamId) match { case Some(oldInfo) => oldInfo.copy(state = ReceiverState.INACTIVE, errorInfo = Some(errorInfo)) case None => logWarning("No prior receiver info") ReceiverTrackingInfo( streamId, ReceiverState.INACTIVE, None, None, None, None, Some(errorInfo)) } receiverTrackingInfos(streamId) = newReceiverTrackingInfo listenerBus.post(StreamingListenerReceiverStopped(newReceiverTrackingInfo.toReceiverInfo)) val messageWithError = if (error != null && !error.isEmpty) { s"$message - $error" } else { s"$message" } logError(s"Deregistered receiver for stream $streamId: $messageWithError") } /** Update a receiver's maximum ingestion rate */ def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized { if (isTrackerStarted) { endpoint.send(UpdateReceiverRateLimit(streamUID, newRate)) } } /** Add new blocks for the given stream */ private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { receivedBlockTracker.addBlock(receivedBlockInfo) } /** Report error sent by a receiver */ private def reportError(streamId: Int, message: String, error: String) { val newReceiverTrackingInfo = receiverTrackingInfos.get(streamId) match { case Some(oldInfo) => val errorInfo = ReceiverErrorInfo(lastErrorMessage = message, lastError = error, lastErrorTime = oldInfo.errorInfo.map(_.lastErrorTime).getOrElse(-1L)) oldInfo.copy(errorInfo = Some(errorInfo)) case None => logWarning("No prior receiver info") val errorInfo = ReceiverErrorInfo(lastErrorMessage = message, lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis()) ReceiverTrackingInfo( streamId, ReceiverState.INACTIVE, None, None, None, None, Some(errorInfo)) } receiverTrackingInfos(streamId) = newReceiverTrackingInfo listenerBus.post(StreamingListenerReceiverError(newReceiverTrackingInfo.toReceiverInfo)) val messageWithError = if (error != null && !error.isEmpty) { s"$message - $error" } else { s"$message" } logWarning(s"Error reported by receiver for stream $streamId: $messageWithError") } private def scheduleReceiver(receiverId: Int): Seq[TaskLocation] = { val preferredLocation = receiverPreferredLocations.getOrElse(receiverId, None) val scheduledLocations = schedulingPolicy.rescheduleReceiver( receiverId, preferredLocation, receiverTrackingInfos, getExecutors) updateReceiverScheduledExecutors(receiverId, scheduledLocations) scheduledLocations } private def updateReceiverScheduledExecutors( receiverId: Int, scheduledLocations: Seq[TaskLocation]): Unit = { val newReceiverTrackingInfo = receiverTrackingInfos.get(receiverId) match { case Some(oldInfo) => oldInfo.copy(state = ReceiverState.SCHEDULED, scheduledLocations = Some(scheduledLocations)) case None => ReceiverTrackingInfo( receiverId, ReceiverState.SCHEDULED, Some(scheduledLocations), runningExecutor = None) } receiverTrackingInfos.put(receiverId, newReceiverTrackingInfo) } /** Check if any blocks are left to be processed */ def hasUnallocatedBlocks: Boolean = { receivedBlockTracker.hasUnallocatedReceivedBlocks } /** * Get the list of executors excluding driver */ private def getExecutors: Seq[ExecutorCacheTaskLocation] = { if (ssc.sc.isLocal) { val blockManagerId = ssc.sparkContext.env.blockManager.blockManagerId Seq(ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId)) } else { ssc.sparkContext.env.blockManager.master.getMemoryStatus.filter { case (blockManagerId, _) => blockManagerId.executorId != SparkContext.DRIVER_IDENTIFIER // Ignore the driver location }.map { case (blockManagerId, _) => ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId) }.toSeq } } /** * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the * receivers to be scheduled on the same node. * * TODO Should poll the executor number and wait for executors according to * "spark.scheduler.minRegisteredResourcesRatio" and * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job. */ private def runDummySparkJob(): Unit = { if (!ssc.sparkContext.isLocal) { ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() } assert(getExecutors.nonEmpty) } /** * Get the receivers from the ReceiverInputDStreams, distributes them to the * worker nodes as a parallel collection, and runs them. */ private def launchReceivers(): Unit = { val receivers = receiverInputStreams.map(nis => { val rcvr = nis.getReceiver() rcvr.setReceiverId(nis.id) rcvr }) runDummySparkJob() logInfo("Starting " + receivers.length + " receivers") endpoint.send(StartAllReceivers(receivers)) } /** Check if tracker has been marked for starting */ private def isTrackerStarted: Boolean = trackerState == Started /** Check if tracker has been marked for stopping */ private def isTrackerStopping: Boolean = trackerState == Stopping /** Check if tracker has been marked for stopped */ private def isTrackerStopped: Boolean = trackerState == Stopped /** RpcEndpoint to receive messages from the receivers. */ private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint { // TODO Remove this thread pool after https://github.com/apache/spark/issues/7385 is merged private val submitJobThreadPool = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("submit-job-thread-pool")) private val walBatchingThreadPool = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("wal-batching-thread-pool")) @volatile private var active: Boolean = true override def receive: PartialFunction[Any, Unit] = { // Local messages case StartAllReceivers(receivers) => val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) for (receiver <- receivers) { val executors = scheduledLocations(receiver.streamId) updateReceiverScheduledExecutors(receiver.streamId, executors) receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation startReceiver(receiver, executors) } case RestartReceiver(receiver) => // Old scheduled executors minus the ones that are not active any more val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId) val scheduledLocations = if (oldScheduledExecutors.nonEmpty) { // Try global scheduling again oldScheduledExecutors } else { val oldReceiverInfo = receiverTrackingInfos(receiver.streamId) // Clear "scheduledLocations" to indicate we are going to do local scheduling val newReceiverInfo = oldReceiverInfo.copy( state = ReceiverState.INACTIVE, scheduledLocations = None) receiverTrackingInfos(receiver.streamId) = newReceiverInfo schedulingPolicy.rescheduleReceiver( receiver.streamId, receiver.preferredLocation, receiverTrackingInfos, getExecutors) } // Assume there is one receiver restarting at one time, so we don't need to update // receiverTrackingInfos startReceiver(receiver, scheduledLocations) case c: CleanupOldBlocks => receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c)) case UpdateReceiverRateLimit(streamUID, newRate) => for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) { eP.send(UpdateRateLimit(newRate)) } // Remote messages case ReportError(streamId, message, error) => reportError(streamId, message, error) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { // Remote messages case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) => val successful = registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress) context.reply(successful) case AddBlock(receivedBlockInfo) => if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) { walBatchingThreadPool.execute(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { if (active) { context.reply(addBlock(receivedBlockInfo)) } else { throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.") } } }) } else { context.reply(addBlock(receivedBlockInfo)) } case DeregisterReceiver(streamId, message, error) => deregisterReceiver(streamId, message, error) context.reply(true) // Local messages case AllReceiverIds => context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq) case StopAllReceivers => assert(isTrackerStopping || isTrackerStopped) stopReceivers() context.reply(true) } /** * Return the stored scheduled executors that are still alive. */ private def getStoredScheduledExecutors(receiverId: Int): Seq[TaskLocation] = { if (receiverTrackingInfos.contains(receiverId)) { val scheduledLocations = receiverTrackingInfos(receiverId).scheduledLocations if (scheduledLocations.nonEmpty) { val executors = getExecutors.toSet // Only return the alive executors scheduledLocations.get.filter { case loc: ExecutorCacheTaskLocation => executors(loc) case loc: TaskLocation => true } } else { Nil } } else { Nil } } /** * Start a receiver along with its scheduled executors */ private def startReceiver( receiver: Receiver[_], scheduledLocations: Seq[TaskLocation]): Unit = { def shouldStartReceiver: Boolean = { // It's okay to start when trackerState is Initialized or Started !(isTrackerStopping || isTrackerStopped) } val receiverId = receiver.streamId if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) return } val checkpointDirOption = Option(ssc.checkpointDir) val serializableHadoopConf = new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration) // Function to start the receiver on the worker node val startReceiverFunc: Iterator[Receiver[_]] => Unit = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") } if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next() assert(iterator.hasNext == false) val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() } else { // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. } } // Create the RDD using the scheduledLocations to run the receiver in a Spark job val receiverRDD: RDD[Receiver[_]] = if (scheduledLocations.isEmpty) { ssc.sc.makeRDD(Seq(receiver), 1) } else { val preferredLocations = scheduledLocations.map(_.toString).distinct ssc.sc.makeRDD(Seq(receiver -> preferredLocations)) } receiverRDD.setName(s"Receiver $receiverId") ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId") ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite())) val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ()) // We will keep restarting the receiver job until ReceiverTracker is stopped future.onComplete { case Success(_) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } case Failure(e) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logError("Receiver has been stopped. Try to restart it.", e) logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } }(submitJobThreadPool) logInfo(s"Receiver ${receiver.streamId} started") } override def onStop(): Unit = { submitJobThreadPool.shutdownNow() active = false walBatchingThreadPool.shutdown() } /** * Call when a receiver is terminated. It means we won't restart its Spark job. */ private def onReceiverJobFinish(receiverId: Int): Unit = { receiverJobExitLatch.countDown() receiverTrackingInfos.remove(receiverId).foreach { receiverTrackingInfo => if (receiverTrackingInfo.state == ReceiverState.ACTIVE) { logWarning(s"Receiver $receiverId exited but didn't deregister") } } } /** Send stop signal to the receivers. */ private def stopReceivers() { receiverTrackingInfos.values.flatMap(_.endpoint).foreach { _.send(StopReceiver) } logInfo("Sent stop signal to all " + receiverTrackingInfos.size + " receivers") } }
Spark发行版笔记11