91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink的rpc組件有哪些

發布時間:2021-12-31 14:30:27 來源:億速云 閱讀:165 作者:iii 欄目:大數據

本篇內容介紹了“Flink的rpc組件有哪些”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

Flink采用akka來實現rpc服務。其中有這幾個重要組件:RpcServer、RpcService、AkkaRpcActor、RpcEndpoint。

Flink的rpc組件有哪些

這幾個組件作用如下:

(1)RpcEndpoint

提供具體rpc服務。主要實現有 ResourceManager 和 TaskExecutor,

①YarnResourceManager為AM容器中啟動的服務,持有ResourceManager和NodeManager的客戶端

②TaskExecutor為NM容器中啟動taskmanager的類

(2)AkkaRpcService

提供rpc的服務類。該類內部持有ActorSystem實例和Supervisor實例。Supervisor中含有SupervisorActor實例,SupervisorActor用于創建其他Actor,可以理解為根Actor。RpcEndpoint在構造時,通過AkkaRpcService的startServer()方法,獲取RpcServer實例。

	public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
		checkNotNull(rpcEndpoint, "rpc endpoint");

		final SupervisorActor.ActorRegistration actorRegistration = registerAkkaRpcActor(rpcEndpoint);
		final ActorRef actorRef = actorRegistration.getActorRef();
		final CompletableFuture<Void> actorTerminationFuture = actorRegistration.getTerminationFuture();

		LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());

		final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
		final String hostname;
		Option<String> host = actorRef.path().address().host();
		if (host.isEmpty()) {
			hostname = "localhost";
		} else {
			hostname = host.get();
		}

		Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));

		implementedRpcGateways.add(RpcServer.class);
		implementedRpcGateways.add(AkkaBasedEndpoint.class);

		final InvocationHandler akkaInvocationHandler;

		if (rpcEndpoint instanceof FencedRpcEndpoint) {
			// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
			akkaInvocationHandler = new FencedAkkaInvocationHandler<>(
				akkaAddress,
				hostname,
				actorRef,
				configuration.getTimeout(),
				configuration.getMaximumFramesize(),
				actorTerminationFuture,
				((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
				captureAskCallstacks);

			implementedRpcGateways.add(FencedMainThreadExecutable.class);
		} else {
			akkaInvocationHandler = new AkkaInvocationHandler(
				akkaAddress,
				hostname,
				actorRef,
				configuration.getTimeout(),
				configuration.getMaximumFramesize(),
				actorTerminationFuture,
				captureAskCallstacks);
		}

		// Rather than using the System ClassLoader directly, we derive the ClassLoader
		// from this class . That works better in cases where Flink runs embedded and all Flink
		// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
		ClassLoader classLoader = getClass().getClassLoader();

		@SuppressWarnings("unchecked")
		RpcServer server = (RpcServer) Proxy.newProxyInstance(
			classLoader,
			implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]),
			akkaInvocationHandler);

		return server;
	}

先創建RpcEndpoint對應的ActorRef,然后創建RpcServer的代理類AkkaInvocationHandler或FencedAkkaInvocationHandler,并將ActorRef實例賦給其成員屬性 rpcEndpoint:ActorRef。這里的ActorRef即為AkkaRpcActor或FencedAkkaRpcActor實例

(3)RpcServer

用來啟動rpc服務,通常不直接調用,而是調用其動態代理類AkkaInvocationHandler或FencedAkkaInvocationHandler的start()方法

(4)AkkaInvocationHandler或FencedAkkaInvocationHandler

RpcServer的動態代理類。start()方法用來啟動服務:

	public void start() {
		rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
	}

這里向rpcEndpoint,即AkkaRpcActor或FencedAkkaRpcActor實例發送一條ControlMessages.START消息

(5)AkkaRpcActor

響應rpc消息的actor。其createReceive():

	public Receive createReceive() {
		return ReceiveBuilder.create()
			.match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)
			.match(ControlMessages.class, this::handleControlMessage)
			.matchAny(this::handleMessage)
			.build();
	}

當消息為ControlMessages.START,調用StoppedState 的start()方法

		public State start(AkkaRpcActor<?> akkaRpcActor) {
			akkaRpcActor.mainThreadValidator.enterMainThread();

			try {
				akkaRpcActor.rpcEndpoint.internalCallOnStart();
			} catch (Throwable throwable) {
				akkaRpcActor.stop(
					RpcEndpointTerminationResult.failure(
						new AkkaRpcException(
							String.format("Could not start RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()),
							throwable)));
			} finally {
				akkaRpcActor.mainThreadValidator.exitMainThread();
			}

			return StartedState.STARTED;
		}

在start()方法中調用具體提供服務的RpcEndpoint實現類internalCallOnStart()方法來啟動服務。internalCallOnStart()方法中會調用onStart()方法。

“Flink的rpc組件有哪些”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

宁波市| 桃园市| 巴塘县| 繁昌县| 青岛市| 沅江市| 南木林县| 安国市| 汉阴县| 桂东县| 深州市| 勐海县| 云梦县| 昭苏县| 河南省| 屏东市| 霍山县| 新竹市| 桦甸市| 资源县| 民勤县| 前郭尔| 固阳县| 佳木斯市| 富民县| 榆社县| 明水县| 平潭县| 钟祥市| 临颍县| 泰安市| 福鼎市| 渭源县| 工布江达县| 个旧市| 阿坝县| 上虞市| 紫阳县| 灵璧县| 恩平市| 彩票|