91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Dubbo線程池有哪些優點

發布時間:2021-10-13 10:49:43 來源:億速云 閱讀:161 作者:iii 欄目:編程語言

本篇內容介紹了“Dubbo線程池有哪些優點”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

1 基礎知識

1.1 DUBBO線程模型

1.1.1 基本概念

DUBBO底層網絡通信采用Netty框架,我們編寫一個Netty服務端進行觀察:

public class NettyServer {     public static void main(String[] args) throws Exception {         EventLoopGroup bossGroup = new NioEventLoopGroup(1);         EventLoopGroup workerGroup = new NioEventLoopGroup(8);         try {             ServerBootstrap bootstrap = new ServerBootstrap();             bootstrap.group(bossGroup, workerGroup)             .channel(NioServerSocketChannel.class)             .option(ChannelOption.SO_BACKLOG, 128)             .childOption(ChannelOption.SO_KEEPALIVE, true)             .childHandler(new ChannelInitializer<SocketChannel>() {                 @Override                 protected void initChannel(SocketChannel ch) throws Exception {                     ch.pipeline().addLast(new NettyServerHandler());                 }             });             ChannelFuture channelFuture = bootstrap.bind(7777).sync();             System.out.println("服務端準備就緒");             channelFuture.channel().closeFuture().sync();         } catch (Exception ex) {             System.out.println(ex.getMessage());         } finally {             bossGroup.shutdownGracefully();             workerGroup.shutdownGracefully();         }     } }

BossGroup線程組只有一個線程處理客戶端連接請求,連接完成后將完成三次握手的SocketChannel連接分發給WorkerGroup處理讀寫請求,這兩個線程組被稱為「IO線程」。

我們再引出「業務線程」這個概念。服務生產者接收到請求后,如果處理邏輯可以快速處理完成,那么可以直接放在IO線程處理,從而減少線程池調度與上下文切換。但是如果處理邏輯非常耗時,或者會發起新IO請求例如查詢數據庫,那么必須派發到業務線程池處理。

DUBBO提供了多種線程模型,選擇線程模型需要在配置文件指定dispatcher屬性:

<dubbo:protocol name="dubbo" dispatcher="all" /> <dubbo:protocol name="dubbo" dispatcher="direct" /> <dubbo:protocol name="dubbo" dispatcher="message" /> <dubbo:protocol name="dubbo" dispatcher="execution" /> <dubbo:protocol name="dubbo" dispatcher="connection" />

不同線程模型在選擇是使用IO線程還是業務線程,DUBBO官網文檔說明:

all 所有消息都派發到業務線程池,包括請求,響應,連接事件,斷開事件,心跳  direct 所有消息都不派發到業務線程池,全部在IO線程直接執行  message 只有請求響應消息派發到業務線程池,其它連接斷開事件,心跳等消息直接在IO線程執行  execution 只有請求消息派發到業務線程池,響應和其它連接斷開事件,心跳等消息直接在IO線程執行  connection 在IO線程上將連接斷開事件放入隊列,有序逐個執行,其它消息派發到業務線程池

1.1.2 確定時機

生產者和消費者在初始化時確定線程模型:

// 生產者 public class NettyServer extends AbstractServer implements Server {     public NettyServer(URL url, ChannelHandler handler) throws RemotingException {         super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));     } }  // 消費者 public class NettyClient extends AbstractClient {     public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {      super(url, wrapChannelHandler(url, handler));     } }

生產者和消費者默認線程模型都會使用AllDispatcher,ChannelHandlers.wrap方法可以獲取Dispatch自適應擴展點。如果我們在配置文件中指定dispatcher,擴展點加載器會從URL獲取屬性值加載對應線程模型。本文以生產者為例進行分析:

public class NettyServer extends AbstractServer implements Server {     public NettyServer(URL url, ChannelHandler handler) throws RemotingException {         // ChannelHandlers.wrap確定線程策略         super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));     } }  public class ChannelHandlers {     protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {         return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));     } }  @SPI(AllDispatcher.NAME) public interface Dispatcher {     @Adaptive({Constants.DISPATCHER_KEY, "channel.handler"})     ChannelHandler dispatch(ChannelHandler handler, URL url); }

1.1.3 源碼分析

我們分析其中兩個線程模型源碼,其它線程模型請閱讀DUBBO源碼。AllDispatcher模型所有消息都派發到業務線程池,包括請求,響應,連接事件,斷開事件,心跳:

public class AllDispatcher implements Dispatcher {      // 線程模型名稱     public static final String NAME = "all";      // 具體實現策略     @Override     public ChannelHandler dispatch(ChannelHandler handler, URL url) {         return new AllChannelHandler(handler, url);     } }   public class AllChannelHandler extends WrappedChannelHandler {      @Override     public void connected(Channel channel) throws RemotingException {         // 連接完成事件交給業務線程池         ExecutorService cexecutor = getExecutorService();         try {             cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));         } catch (Throwable t) {             throw new ExecutionException("connect event", channel, getClass() + " error when process connected event", t);         }     }      @Override     public void disconnected(Channel channel) throws RemotingException {         // 斷開連接事件交給業務線程池         ExecutorService cexecutor = getExecutorService();         try {             cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));         } catch (Throwable t) {             throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event", t);         }     }      @Override     public void received(Channel channel, Object message) throws RemotingException {         // 請求響應事件交給業務線程池         ExecutorService cexecutor = getExecutorService();         try {             cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));         } catch (Throwable t) {             if(message instanceof Request && t instanceof RejectedExecutionException) {                 Request request = (Request)message;                 if(request.isTwoWay()) {                     String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();                     Response response = new Response(request.getId(), request.getVersion());                     response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);                     response.setErrorMessage(msg);                     channel.send(response);                     return;                 }             }             throw new ExecutionException(message, channel, getClass() + " error when process received event", t);         }     }      @Override     public void caught(Channel channel, Throwable exception) throws RemotingException {         // 異常事件交給業務線程池         ExecutorService cexecutor = getExecutorService();         try {             cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));         } catch (Throwable t) {             throw new ExecutionException("caught event", channel, getClass() + " error when process caught event", t);         }     } }

DirectDispatcher策略所有消息都不派發到業務線程池,全部在IO線程直接執行:

public class DirectDispatcher implements Dispatcher {      // 線程模型名稱     public static final String NAME = "direct";      // 具體實現策略     @Override     public ChannelHandler dispatch(ChannelHandler handler, URL url) {         // 直接返回handler表示所有事件都交給IO線程處理         return handler;     } }

1.2 DUBBO線程池策略

1.2.1 基本概念

上個章節分析了線程模型,我們知道不同的線程模型會選擇使用還是IO線程還是業務線程。如果使用業務線程池,那么使用什么線程池策略是本章節需要回答的問題。DUBBO官網線程派發模型圖展示了線程模型和線程池策略的關系:

Dubbo線程池有哪些優點

DUBBO提供了多種線程池策略,選擇線程池策略需要在配置文件指定threadpool屬性:

<dubbo:protocol name="dubbo" threadpool="fixed" threads="100" /> <dubbo:protocol name="dubbo" threadpool="cached" threads="100" /> <dubbo:protocol name="dubbo" threadpool="limited" threads="100" /> <dubbo:protocol name="dubbo" threadpool="eager" threads="100" />

不同線程池策略會創建不同特性的線程池:

fixed 包含固定個數線程  cached 線程空閑一分鐘會被回收,當新請求到來時會創建新線程  limited 線程個數隨著任務增加而增加,但不會超過最大閾值。空閑線程不會被回收  eager 當所有核心線程數都處于忙碌狀態時,優先創建新線程執行任務,而不是立即放入隊列

1.2.2 確定時機

本文我們以AllDispatcher為例分析線程池策略在什么時候確定:

public class AllDispatcher implements Dispatcher {     public static final String NAME = "all";      @Override     public ChannelHandler dispatch(ChannelHandler handler, URL url) {         return new AllChannelHandler(handler, url);     } }  public class AllChannelHandler extends WrappedChannelHandler {     public AllChannelHandler(ChannelHandler handler, URL url) {         super(handler, url);     } }

在WrappedChannelHandler構造函數中如果配置指定了threadpool屬性,擴展點加載器會從URL獲取屬性值加載對應線程池策略,默認策略為fixed:

public class WrappedChannelHandler implements ChannelHandlerDelegate {      public WrappedChannelHandler(ChannelHandler handler, URL url) {         this.handler = handler;         this.url = url;         // 獲取線程池自適應擴展點         executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);         String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;         if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {             componentKey = Constants.CONSUMER_SIDE;         }         DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();         dataStore.put(componentKey, Integer.toString(url.getPort()), executor);     } }  @SPI("fixed") public interface ThreadPool {     @Adaptive({Constants.THREADPOOL_KEY})     Executor getExecutor(URL url); }

1.2.3 源碼分析

(1) FixedThreadPool

public class FixedThreadPool implements ThreadPool {      @Override     public Executor getExecutor(URL url) {          // 線程名稱         String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);          // 線程個數默認200         int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);          // 隊列容量默認0         int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);          // 隊列容量等于0使用阻塞隊列SynchronousQueue         // 隊列容量小于0使用無界阻塞隊列LinkedBlockingQueue         // 隊列容量大于0使用有界阻塞隊列LinkedBlockingQueue         return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,                                       queues == 0 ? new SynchronousQueue<Runnable>()                                       : (queues < 0 ? new LinkedBlockingQueue<Runnable>()                                          : new LinkedBlockingQueue<Runnable>(queues)),                                       new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));     } }

(2) CachedThreadPool

public class CachedThreadPool implements ThreadPool {      @Override     public Executor getExecutor(URL url) {          // 獲取線程名稱         String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);          // 核心線程數默認0         int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);          // 最大線程數默認Int最大值         int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);          // 隊列容量默認0         int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);          // 線程空閑多少時間被回收默認1分鐘         int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);          // 隊列容量等于0使用阻塞隊列SynchronousQueue         // 隊列容量小于0使用無界阻塞隊列LinkedBlockingQueue         // 隊列容量大于0使用有界阻塞隊列LinkedBlockingQueue         return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,                                       queues == 0 ? new SynchronousQueue<Runnable>()                                       : (queues < 0 ? new LinkedBlockingQueue<Runnable>()                                          : new LinkedBlockingQueue<Runnable>(queues)),                                       new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));     } }

(3) LimitedThreadPool

public class LimitedThreadPool implements ThreadPool {      @Override     public Executor getExecutor(URL url) {          // 獲取線程名稱         String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);          // 核心線程數默認0         int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);          // 最大線程數默認200         int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);          // 隊列容量默認0         int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);          // 隊列容量等于0使用阻塞隊列SynchronousQueue         // 隊列容量小于0使用無界阻塞隊列LinkedBlockingQueue         // 隊列容量大于0使用有界阻塞隊列LinkedBlockingQueue         // keepalive時間設置Long.MAX_VALUE表示不回收空閑線程         return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,                                       queues == 0 ? new SynchronousQueue<Runnable>()                                       : (queues < 0 ? new LinkedBlockingQueue<Runnable>()                                          : new LinkedBlockingQueue<Runnable>(queues)),                                       new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));     } }

(4) EagerThreadPool

我們知道ThreadPoolExecutor是普通線程執行器。當線程池核心線程達到閾值時新任務放入隊列,當隊列已滿開啟新線程處理,當前線程數達到最大線程數時執行拒絕策略。

但是EagerThreadPool自定義線程執行策略,當線程池核心線程達到閾值時,新任務不會放入隊列而是開啟新線程進行處理(要求當前線程數沒有超過最大線程數)。當前線程數達到最大線程數時任務放入隊列。

public class EagerThreadPool implements ThreadPool {      @Override     public Executor getExecutor(URL url) {          // 線程名         String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);          // 核心線程數默認0         int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);          // 最大線程數默認Int最大值         int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);          // 隊列容量默認0         int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);          // 線程空閑多少時間被回收默認1分鐘         int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);          // 初始化自定義線程池和隊列重寫相關方法         TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);         EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,                 threads,                 alive,                 TimeUnit.MILLISECONDS,                 taskQueue,                 new NamedInternalThreadFactory(name, true),                 new AbortPolicyWithReport(name, url));         taskQueue.setExecutor(executor);         return executor;     } }

1.3 一個公式

現在我們知道DUBBO會選擇線程池策略進行業務處理,那么應該如何估算可能產生的線程數呢?我們首先分析一個問題:一個公司有7200名員工,每天上班打卡時間是早上8點到8點30分,每次打卡時間系統執行時長為5秒。請問RT、QPS、并發量分別是多少?

RT表示響應時間,問題已經告訴了我們答案:

RT = 5

QPS表示每秒查詢量,假設簽到行為平均分布:

QPS = 7200 / (30 * 60) = 4

并發量表示系統同時處理的請求數量:

并發量 = QPS x RT = 4 x 5 = 20

根據上述實例引出如下公式:

并發量 = QPS x RT

如果系統為每一個請求分配一個處理線程,那么并發量可以近似等于線程數。基于上述公式不難看出并發量受QPS和RT影響,這兩個指標任意一個上升就會導致并發量上升。

但是這只是理想情況,因為并發量受限于系統能力而不可能持續上升,例如DUBBO線程池就對線程數做了限制,超出最大線程數限制則會執行拒絕策略,而拒絕策略會提示線程池已滿,這就是DUBBO線程池打滿問題的根源。下面我們分析RT上升和QPS上升這兩個原因。

2 RT上升

2.1 生產者發生慢服務

2.1.1 原因分析

(1) 生產者配置

<beans>     <dubbo:registry address="zookeeper://127.0.0.1:2181" />     <dubbo:protocol name="dubbo" port="9999" />     <dubbo:service interface="com.java.front.dubbo.demo.provider.HelloService" ref="helloService" /> </beans>

(2) 生產者業務

package com.java.front.dubbo.demo.provider; public interface HelloService {     public String sayHello(String name) throws Exception; }  public class HelloServiceImpl implements HelloService {     public String sayHello(String name) throws Exception {         String result = "hello[" + name + "]";         // 模擬慢服務        Thread.sleep(10000L);         System.out.println("生產者執行結果" + result);        return result;     } }

(3) 消費者配置

<beans>     <dubbo:registry address="zookeeper://127.0.0.1:2181" />     <dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" /> </beans>

(4) 消費者業務

public class Consumer {      @Test     public void testThread() {         ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:METAINF/spring/dubbo-consumer.xml" });         context.start();         for (int i = 0; i < 500; i++) {             new Thread(new Runnable() {                 @Override                 public void run() {                     HelloService helloService = (HelloService) context.getBean("helloService");                     String result;                     try {                         result = helloService.sayHello("微信公眾號「JAVA前線」");                         System.out.println("客戶端收到結果" + result);                     } catch (Exception e) {                         System.out.println(e.getMessage());                     }                 }             }).start();         }     } }

依次運行生產者和消費者代碼,會發現日志中出現報錯信息。生產者日志會打印線程池已滿:

Caused by: java.util.concurrent.RejectedExecutionException: Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-x.x.x.x:9999, Pool Size: 200 (active: 200, core: 200, max: 200, largest: 200), Task: 201 (completed: 1), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://x.x.x.x:9999! at org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport.rejectedExecution(AbortPolicyWithReport.java:67) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.caught(AllChannelHandler.java:88)

消費者日志不僅會打印線程池已滿,還會打印服務提供者信息和調用方法,我們可以根據日志找到哪一個方法有問題:

Failed to invoke the method sayHello in the service com.java.front.dubbo.demo.provider.HelloService.  Tried 3 times of the providers [x.x.x.x:9999] (1/1) from the registry 127.0.0.1:2181 on the consumer x.x.x.x  using the dubbo version 2.7.0-SNAPSHOT. Last error is: Failed to invoke remote method: sayHello,  provider: dubbo://x.x.x.x:9999/com.java.front.dubbo.demo.provider.HelloService?anyhost=true&application=xpz-consumer1&check=false&dubbo=2.0.2&generic=false&group=&interface=com.java.front.dubbo.demo.provider.HelloService&logger=log4j&methods=sayHello&pid=33432&register.ip=x.x.x.x&release=2.7.0-SNAPSHOT&remote.application=xpz-provider&remote.timestamp=1618632597509&side=consumer&timeout=100000000&timestamp=1618632617392,  cause: Server side(x.x.x.x,9999) threadpool is exhausted ,detail msg:Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-x.x.x.x:9999, Pool Size: 200 (active: 200, core: 200, max: 200, largest: 200), Task: 401 (completed: 201), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://x.x.x.x:9999!

2.1.2 解決方案

(1) 找出慢服務

DUBBO線程池打滿時會執行拒絕策略:

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {     protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);     private final String threadName;     private final URL url;     private static volatile long lastPrintTime = 0;     private static Semaphore guard = new Semaphore(1);      public AbortPolicyWithReport(String threadName, URL url) {         this.threadName = threadName;         this.url = url;     }      @Override     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {         String msg = String.format("Thread pool is EXHAUSTED!" +                                    " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +                                    " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",                                    threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),                                    e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),                                    url.getProtocol(), url.getIp(), url.getPort());         logger.warn(msg);         // 打印線程快照         dumpJStack();         throw new RejectedExecutionException(msg);     }      private void dumpJStack() {         long now = System.currentTimeMillis();          // 每10分鐘輸出線程快照         if (now - lastPrintTime < 10 * 60 * 1000) {             return;         }         if (!guard.tryAcquire()) {             return;         }          ExecutorService pool = Executors.newSingleThreadExecutor();         pool.execute(() -> {             String dumpPath = url.getParameter(Constants.DUMP_DIRECTORY, System.getProperty("user.home"));             System.out.println("AbortPolicyWithReport dumpJStack directory=" + dumpPath);             SimpleDateFormat sdf;             String os = System.getProperty("os.name").toLowerCase();              // linux文件位置/home/xxx/Dubbo_JStack.log.2021-01-01_20:50:15             // windows文件位置/user/xxx/Dubbo_JStack.log.2020-01-01_20-50-15             if (os.contains("win")) {                 sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");             } else {                 sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");             }             String dateStr = sdf.format(new Date());             try (FileOutputStream jStackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {                 JVMUtil.jstack(jStackStream);             } catch (Throwable t) {                 logger.error("dump jStack error", t);             } finally {                 guard.release();             }             lastPrintTime = System.currentTimeMillis();         });         pool.shutdown();     } }

拒絕策略會輸出線程快照文件,在分析線程快照文件時BLOCKED和TIMED_WAITING線程狀態需要我們重點關注。如果發現大量線程阻塞或者等待狀態則可以定位到具體代碼行:

DubboServerHandler-x.x.x.x:9999-thread-200 Id=230 TIMED_WAITING at java.lang.Thread.sleep(Native Method) at com.java.front.dubbo.demo.provider.HelloServiceImpl.sayHello(HelloServiceImpl.java:13) at org.apache.dubbo.common.bytecode.Wrapper1.invokeMethod(Wrapper1.java) at org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory$1.doInvoke(JavassistProxyFactory.java:56) at org.apache.dubbo.rpc.proxy.AbstractProxyInvoker.invoke(AbstractProxyInvoker.java:85) at org.apache.dubbo.config.invoker.DelegateProviderMetaDataInvoker.invoke(DelegateProviderMetaDataInvoker.java:56) at org.apache.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:56)

(2) 優化慢服務

現在已經找到了慢服務,此時我們就可以優化慢服務了。優化慢服務就需要具體問題具體分析了,這不是本文的重點在此不進行展開。

2.2 生產者預熱不充分

2.2.1 原因分析

還有一種RT上升的情況是我們不能忽視的,這種情況就是提供者重啟后預熱不充分即被調用。因為當生產者剛啟動時需要預熱,需要和其它資源例如數據庫、緩存等建立連接,建立連接是需要時間的。如果此時大量消費者請求到未預熱的生產者,鏈路時間增加了連接時間,RT時間必然會增加,從而也會導致DUBBO線程池打滿問題。

2.2.2 解決方案

(1) 等待生產者充分預熱

因為生產者預熱不充分導致線程池打滿問題,最容易發生在系統發布時。例如發布了一臺機器后發現線上出現線程池打滿問題,千萬不要著急重啟機器,而是給機器一段時間預熱,等連接建立后問題大概率消失。同時我們在發布時也要分多批次發布,不要一次發布太多機器導致服務因為預熱問題造成大面積影響。

(2) DUBBO升級版本大于等于2.7.4

DUBBO消費者在調用選擇生產者時本身就會執行預熱邏輯,為什么還會出現預熱不充分問題?這是因為2.5.5之前版本以及2.7.2版本預熱機制是有問題的,簡而言之就是獲取啟動時間不正確,2.7.4版本徹底解決了這個問題,所以我們要避免使用問題版本。下面我們閱讀2.7.0版本預熱機制源碼,看看預熱機制如何生效:

public class RandomLoadBalance extends AbstractLoadBalance {      public static final String NAME = "random";      @Override     protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {          // invokers數量         int length = invokers.size();          // 權重是否相同         boolean sameWeight = true;          // invokers權重數組         int[] weights = new int[length];          // 第一個invoker權重         int firstWeight = getWeight(invokers.get(0), invocation);         weights[0] = firstWeight;          // 權重值之和         int totalWeight = firstWeight;         for (int i = 1; i < length; i++) {             // 計算權重值             int weight = getWeight(invokers.get(i), invocation);             weights[i] = weight;             totalWeight += weight;              // 任意一個invoker權重值不等于第一個invoker權重值則sameWeight設置為FALSE             if (sameWeight && weight != firstWeight) {                 sameWeight = false;             }         }         // 權重值不等則根據總權重值計算         if (totalWeight > 0 && !sameWeight) {             int offset = ThreadLocalRandom.current().nextInt(totalWeight);             // 不斷減去權重值當小于0時直接返回             for (int i = 0; i < length; i++) {                 offset -= weights[i];                 if (offset < 0) {                     return invokers.get(i);                 }             }         }         // 所有服務權重值一致則隨機返回         return invokers.get(ThreadLocalRandom.current().nextInt(length));     } }  public abstract class AbstractLoadBalance implements LoadBalance {      static int calculateWarmupWeight(int uptime, int warmup, int weight) {         // uptime/(warmup*weight)         // 如果當前服務提供者沒過預熱期,用戶設置的權重將通過uptime/warmup減小         // 如果服務提供者設置權重很大但是還沒過預熱時間,重新計算權重會很小         int ww = (int) ((float) uptime / ((float) warmup / (float) weight));         return ww < 1 ? 1 : (ww > weight ? weight : ww);     }      protected int getWeight(Invoker<?> invoker, Invocation invocation) {          // 獲取invoker設置權重值默認權重=100         int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);          // 如果權重大于0         if (weight > 0) {              // 服務提供者發布服務時間戳             long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);             if (timestamp > 0L) {                  // 服務已經發布多少時間                 int uptime = (int) (System.currentTimeMillis() - timestamp);                  // 預熱時間默認10分鐘                 int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);                  // 生產者發布時間大于0但是小于預熱時間                 if (uptime > 0 && uptime < warmup) {                      // 重新計算權重值                     weight = calculateWarmupWeight(uptime, warmup, weight);                 }             }         }         // 服務發布時間大于預熱時間直接返回設置權重值         return weight >= 0 ? weight : 0;     } }

3 QPS上升

上面章節大篇幅討論了由于RT上升造成的線程池打滿問題,現在我們討論另一個參數QPS。當上游流量激增會導致創建大量線程池,也會造成線程池打滿問題。這時如果發現QPS超出了系統承受能力,我們不得不采用降級方案保護系統

“Dubbo線程池有哪些優點”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

仁怀市| 抚顺市| 桃园市| 保康县| 邯郸县| 新竹市| 建水县| 荔浦县| 阿鲁科尔沁旗| 横峰县| 保康县| 汉中市| 尉氏县| 南投市| 贵溪市| 石嘴山市| 淮南市| 广南县| 外汇| 安福县| 汉川市| 绥德县| 精河县| 三门县| 尤溪县| 理塘县| 织金县| 长沙县| 祁阳县| 拜泉县| 大石桥市| 盱眙县| 尉氏县| 徐水县| 绥德县| 同仁县| 儋州市| 上犹县| 金昌市| 镇康县| 仪征市|