本篇文章為大家展示了dubbo中ConnectionOrderedDispatcher的作用是什么,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。
創(chuàng)新互聯(lián)公司專注于新絳企業(yè)網(wǎng)站建設(shè),成都響應(yīng)式網(wǎng)站建設(shè)公司,商城網(wǎng)站制作。新絳網(wǎng)站建設(shè)公司,為新絳等地區(qū)提供建站服務(wù)。全流程按需求定制設(shè)計(jì),專業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,創(chuàng)新互聯(lián)公司專業(yè)和態(tài)度為您提供的服務(wù)
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedDispatcher.java
public class ConnectionOrderedDispatcher implements Dispatcher { public static final String NAME = "connection"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new ConnectionOrderedChannelHandler(handler, url); } }
ConnectionOrderedDispatcher實(shí)現(xiàn)了Dispatcher接口,其dispatch方法返回的是ConnectionOrderedChannelHandler
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedChannelHandler.java
public class ConnectionOrderedChannelHandler extends WrappedChannelHandler { protected final ThreadPoolExecutor connectionExecutor; private final int queuewarninglimit; public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) { super(handler, url); String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); connectionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)), new NamedThreadFactory(threadName, true), new AbortPolicyWithReport(threadName, url) ); // FIXME There's no place to release connectionExecutor! queuewarninglimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE); } @Override public void connected(Channel channel) throws RemotingException { try { checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } @Override public void disconnected(Channel channel) throws RemotingException { try { checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t); } } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //fix, reject exception can not be sent to consumer because thread pool is full, resulting in consumers waiting till timeout. if (message instanceof Request && t instanceof RejectedExecutionException) { Request request = (Request) message; if (request.isTwoWay()) { String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } @Override public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); } } private void checkQueueLength() { if (connectionExecutor.getQueue().size() > queuewarninglimit) { logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit)); } } }
ConnectionOrderedChannelHandler繼承了WrappedChannelHandler,其構(gòu)造器創(chuàng)建了corePoolSize及maximumPoolSize均為1,queue為LinkedBlockingQueue的connectionExecutor
其connected、disconnected方法均是使用connectionExecutor來執(zhí)行新創(chuàng)建的ChannelEventRunnable;這兩個(gè)方法均會(huì)先執(zhí)行checkQueueLength來判斷queue大小是否大于queuewarninglimit,大于的話則打印warn日志
其received、caught均是通過父類的getExecutorService獲取線程池,然后執(zhí)行創(chuàng)建的ChannelEventRunnable;received方法在捕獲到異常時(shí)RejectedExecutionException且message是Request,而且request是twoWay的時(shí)候會(huì)返回SERVER_THREADPOOL_EXHAUSTED_ERROR
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/handler/ConnectChannelHandlerTest.java
public class ConnectChannelHandlerTest extends WrappedChannelHandlerTest { @BeforeEach public void setUp() throws Exception { handler = new ConnectionOrderedChannelHandler(new BizChannelHander(true), url); } @Test public void test_Connect_Blocked() throws RemotingException { handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url); ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "connectionExecutor", 1); Assertions.assertEquals(1, executor.getMaximumPoolSize()); int runs = 20; int taskCount = runs * 2; for (int i = 0; i < runs; i++) { handler.connected(new MockedChannel()); handler.disconnected(new MockedChannel()); Assertions.assertTrue(executor.getActiveCount() <= 1, executor.getActiveCount() + " must <=1"); } //queue.size Assertions.assertEquals(taskCount - 1, executor.getQueue().size()); for (int i = 0; i < taskCount; i++) { if (executor.getCompletedTaskCount() < taskCount) { sleep(100); } } Assertions.assertEquals(taskCount, executor.getCompletedTaskCount()); } @Test //biz error should not throw and affect biz thread. public void test_Connect_Biz_Error() throws RemotingException { handler = new ConnectionOrderedChannelHandler(new BizChannelHander(true), url); handler.connected(new MockedChannel()); } @Test //biz error should not throw and affect biz thread. public void test_Disconnect_Biz_Error() throws RemotingException { handler = new ConnectionOrderedChannelHandler(new BizChannelHander(true), url); handler.disconnected(new MockedChannel()); } @Test public void test_Connect_Execute_Error() throws RemotingException { Assertions.assertThrows(ExecutionException.class, () -> { handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url); ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "connectionExecutor", 1); executor.shutdown(); handler.connected(new MockedChannel()); }); } @Test public void test_Disconnect_Execute_Error() throws RemotingException { Assertions.assertThrows(ExecutionException.class, () -> { handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url); ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "connectionExecutor", 1); executor.shutdown(); handler.disconnected(new MockedChannel()); }); } //throw ChannelEventRunnable.runtimeExeception(int logger) not in execute exception @Test//(expected = RemotingException.class) public void test_MessageReceived_Biz_Error() throws RemotingException { handler.received(new MockedChannel(), ""); } //throw ChannelEventRunnable.runtimeExeception(int logger) not in execute exception @Test public void test_Caught_Biz_Error() throws RemotingException { handler.caught(new MockedChannel(), new BizException()); } @Test public void test_Received_InvokeInExecuter() throws RemotingException { Assertions.assertThrows(ExecutionException.class, () -> { handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url); ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "SHARED_EXECUTOR", 1); executor.shutdown(); executor = (ThreadPoolExecutor) getField(handler, "executor", 1); executor.shutdown(); handler.received(new MockedChannel(), ""); }); } /** * Events do not pass through the thread pool and execute directly on the IO */ @SuppressWarnings("deprecation") @Disabled("Heartbeat is processed in HeartbeatHandler not WrappedChannelHandler.") @Test public void test_Received_Event_invoke_direct() throws RemotingException { handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url); ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "SHARED_EXECUTOR", 1); executor.shutdown(); executor = (ThreadPoolExecutor) getField(handler, "executor", 1); executor.shutdown(); Request req = new Request(); req.setHeartbeat(true); final AtomicInteger count = new AtomicInteger(0); handler.received(new MockedChannel() { @Override public void send(Object message) throws RemotingException { Assertions.assertTrue(((Response) message).isHeartbeat(), "response.heartbeat"); count.incrementAndGet(); } }, req); Assertions.assertEquals(1, count.get(), "channel.send must be invoke"); } }
ConnectChannelHandlerTest在setup時(shí)創(chuàng)建的是ConnectionOrderedChannelHandler,然后進(jìn)行了test_Connect_Blocked、test_Connect_Biz_Error、test_Disconnect_Biz_Error、test_Connect_Execute_Error、test_Disconnect_Execute_Error、test_MessageReceived_Biz_Error、test_Caught_Biz_Error、test_Received_InvokeInExecuter、test_Received_Event_invoke_direct
ConnectionOrderedDispatcher實(shí)現(xiàn)了Dispatcher接口,其dispatch方法返回的是ConnectionOrderedChannelHandler;ConnectionOrderedChannelHandler繼承了WrappedChannelHandler,其構(gòu)造器創(chuàng)建了corePoolSize及maximumPoolSize均為1,queue為LinkedBlockingQueue的connectionExecutor
ConnectionOrderedChannelHandler的connected、disconnected方法均是使用connectionExecutor來執(zhí)行新創(chuàng)建的ChannelEventRunnable;這兩個(gè)方法均會(huì)先執(zhí)行checkQueueLength來判斷queue大小是否大于queuewarninglimit,大于的話則打印warn日志
ConnectionOrderedChannelHandler的received、caught均是通過父類的getExecutorService獲取線程池,然后執(zhí)行創(chuàng)建的ChannelEventRunnable;received方法在捕獲到異常時(shí)RejectedExecutionException且message是Request,而且request是twoWay的時(shí)候會(huì)返回SERVER_THREADPOOL_EXHAUSTED_ERROR
上述內(nèi)容就是dubbo中ConnectionOrderedDispatcher的作用是什么,你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲(chǔ)備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。
當(dāng)前題目:dubbo中ConnectionOrderedDispatcher的作用是什么
本文來源:http://www.yahangbao.cn/article4/pspoie.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供建站公司、小程序開發(fā)、網(wǎng)站制作、品牌網(wǎng)站制作、全網(wǎng)營銷推廣、微信公眾號
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)