您现在的位置是:主页 > news > 凡科网网站建设资料/怎么在百度上做广告推广

凡科网网站建设资料/怎么在百度上做广告推广

admin2025/6/17 2:58:01news

简介凡科网网站建设资料,怎么在百度上做广告推广,江苏省住房和城乡建设部网站,做网站编辑累吗dubbo网络层功能 dubbo网络通讯层主要实现了以下功能: 多种网络通讯框架的抽象封装(netty,mina,grizzly)每个客户端主机和服务端保存单个长链接通信异步调用转同步tcp链接的心跳和自动重连基于header头通讯协议,请求的编解码器dub…

凡科网网站建设资料,怎么在百度上做广告推广,江苏省住房和城乡建设部网站,做网站编辑累吗dubbo网络层功能 dubbo网络通讯层主要实现了以下功能: 多种网络通讯框架的抽象封装(netty,mina,grizzly)每个客户端主机和服务端保存单个长链接通信异步调用转同步tcp链接的心跳和自动重连基于header头通讯协议,请求的编解码器dub…

dubbo网络层功能

dubbo网络通讯层主要实现了以下功能:

  1. 多种网络通讯框架的抽象封装(netty,mina,grizzly)
  2. 每个客户端主机和服务端保存单个长链接通信
  3. 异步调用转同步
  4. tcp链接的心跳和自动重连
  5. 基于header头通讯协议,请求的编解码器

dubbo网络通讯框架抽象

dubbo的网络通信基于NIO框架,一般基于事件的NIO网络框架都涉及到 channel , channelHandle核心概念,网络数据buffer, 网络数据编解码器,dubbo为了能够适配多种NIO框架,将以上概念全部又抽象了一层接口。如果有netty开发经验或者了解netty helloworld demo程序对于理解这个章节非常有帮助。

/*
*以下是dubbo对于channel , channelHandle封装的抽象接口
*/
public interface Endpoint {/*** get url.* * @return url*/URL getUrl();/*** get channel handler.* * @return channel handler*/ChannelHandler getChannelHandler();/*** get local address.* * @return local address.*/InetSocketAddress getLocalAddress();/*** send message.* * @param message* @throws RemotingException*/void send(Object message) throws RemotingException;/*** send message.* * @param message* @param sent 是否已发送完成*/void send(Object message, boolean sent) throws RemotingException;/*** close the channel.*/void close();/*** Graceful close the channel.*/void close(int timeout);/*** is closed.* * @return closed*/boolean isClosed();}public interface Channel extends Endpoint {/*** get remote address.* * @return remote address.*/InetSocketAddress getRemoteAddress();/*** is connected.* * @return connected*/boolean isConnected();/*** has attribute.* * @param key key.* @return has or has not.*/boolean hasAttribute(String key);/*** get attribute.* * @param key key.* @return value.*/Object getAttribute(String key);/*** set attribute.* * @param key key.* @param value value.*/void setAttribute(String key,Object value);/*** remove attribute.* * @param key key.*/void removeAttribute(String key);}public interface Client extends Endpoint, Channel, Resetable {/*** reconnect.*/void reconnect() throws RemotingException;@Deprecatedvoid reset(com.alibaba.dubbo.common.Parameters parameters);}@SPI
public interface ChannelHandler {/*** on channel connected.* * @param channel channel.*/void connected(Channel channel) throws RemotingException;/*** on channel disconnected.* * @param channel channel.*/void disconnected(Channel channel) throws RemotingException;/*** on message sent.* * @param channel channel.* @param message message.*/void sent(Channel channel, Object message) throws RemotingException;/*** on message received.接收到对方消息和回调* * @param channel channel.* @param message message.*/void received(Channel channel, Object message) throws RemotingException;/*** on exception caught.* * @param channel channel.* @param exception exception.*/void caught(Channel channel, Throwable exception) throws RemotingException;}
/*
*信息交换client,对Client对象接口包装,实现发送请求返回结果Future以及长链接心跳定时发送
*/
public class HeaderExchangeClient implements ExchangeClient {private static final Logger logger = LoggerFactory.getLogger( HeaderExchangeClient.class );private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));// 心跳定时器private ScheduledFuture<?> heatbeatTimer;// 心跳超时,毫秒。缺省0,不会执行心跳。private int heartbeat;private int heartbeatTimeout;private final Client client;private final ExchangeChannel channel;public HeaderExchangeClient(Client client){if (client == null) {throw new IllegalArgumentException("client == null");}this.client = client;this.channel = new HeaderExchangeChannel(client);String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);this.heartbeat = client.getUrl().getParameter( Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0 );this.heartbeatTimeout = client.getUrl().getParameter( Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3 );if ( heartbeatTimeout < heartbeat * 2 ) {throw new IllegalStateException( "heartbeatTimeout < heartbeatInterval * 2" );}startHeatbeatTimer();}//最核心方法,实现发送请求后返回ResponseFuture对象,通过该Future可以获取RPC请求结果public ResponseFuture request(Object request) throws RemotingException {return channel.request(request);}public URL getUrl() {return channel.getUrl();}public InetSocketAddress getRemoteAddress() {return channel.getRemoteAddress();}public ResponseFuture request(Object request, int timeout) throws RemotingException {return channel.request(request, timeout);}public ChannelHandler getChannelHandler() {return channel.getChannelHandler();}public boolean isConnected() {return channel.isConnected();}public InetSocketAddress getLocalAddress() {return channel.getLocalAddress();}public ExchangeHandler getExchangeHandler() {return channel.getExchangeHandler();}public void send(Object message) throws RemotingException {channel.send(message);}public void send(Object message, boolean sent) throws RemotingException {channel.send(message, sent);}public boolean isClosed() {return channel.isClosed();}public void close() {doClose();channel.close();}public void close(int timeout) {doClose();channel.close(timeout);}public void reset(URL url) {client.reset(url);}@Deprecatedpublic void reset(com.alibaba.dubbo.common.Parameters parameters){reset(getUrl().addParameters(parameters.getParameters()));}public void reconnect() throws RemotingException {client.reconnect();}public Object getAttribute(String key) {return channel.getAttribute(key);}public void setAttribute(String key, Object value) {channel.setAttribute(key, value);}public void removeAttribute(String key) {channel.removeAttribute(key);}public boolean hasAttribute(String key) {return channel.hasAttribute(key);}private void startHeatbeatTimer() {stopHeartbeatTimer();if ( heartbeat > 0 ) {heatbeatTimer = scheduled.scheduleWithFixedDelay(new HeartBeatTask( new HeartBeatTask.ChannelProvider() {public Collection<Channel> getChannels() {return Collections.<Channel>singletonList( HeaderExchangeClient.this );}}, heartbeat, heartbeatTimeout),heartbeat, heartbeat, TimeUnit.MILLISECONDS );}}private void stopHeartbeatTimer() {if (heatbeatTimer != null && ! heatbeatTimer.isCancelled()) {try {heatbeatTimer.cancel(true);scheduled.purge();} catch ( Throwable e ) {if (logger.isWarnEnabled()) {logger.warn(e.getMessage(), e);}}}heatbeatTimer =null;}private void doClose() {stopHeartbeatTimer();}@Overridepublic String toString() {return "HeaderExchangeClient [channel=" + channel + "]";}
}
/*
*基于Header Request信息交换Channel,对于真实底层Chanel实现比如Netty *Channel进行了一层包装,实现将RPC请求参数Invocation封装为Request对象,
*Request对象描述了RPC请求全部信息。协议头数据以及协议body数据。
*/
final class HeaderExchangeChannel implements ExchangeChannel {private static final Logger logger      = LoggerFactory.getLogger(HeaderExchangeChannel.class);private static final String CHANNEL_KEY = HeaderExchangeChannel.class.getName() + ".CHANNEL";private final Channel       channel;private volatile boolean    closed      = false;HeaderExchangeChannel(Channel channel){if (channel == null) {throw new IllegalArgumentException("channel == null");}this.channel = channel;}static HeaderExchangeChannel getOrAddChannel(Channel ch) {if (ch == null) {return null;}HeaderExchangeChannel ret = (HeaderExchangeChannel) ch.getAttribute(CHANNEL_KEY);if (ret == null) {ret = new HeaderExchangeChannel(ch);if (ch.isConnected()) {ch.setAttribute(CHANNEL_KEY, ret);}}return ret;}static void removeChannelIfDisconnected(Channel ch) {if (ch != null && ! ch.isConnected()) {ch.removeAttribute(CHANNEL_KEY);}}public void send(Object message) throws RemotingException {send(message, getUrl().getParameter(Constants.SENT_KEY, false));}public void send(Object message, boolean sent) throws RemotingException {if (closed) {throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");}if (message instanceof Request|| message instanceof Response|| message instanceof String) {channel.send(message, sent);} else {Request request = new Request();request.setVersion("2.0.0");request.setTwoWay(false);request.setData(message);channel.send(request, sent);}}public ResponseFuture request(Object request) throws RemotingException {return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));}//request 入参类型是Invocation invocationpublic ResponseFuture request(Object request, int timeout) throws RemotingException {if (closed) {throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}// create request.将Invocation封装为Request对象,request对象创建的时候会生成递增的请求id,private static final AtomicLong INVOKE_ID = new AtomicLong(0);Request req = new Request();req.setVersion("2.0.0");req.setTwoWay(true);req.setData(request);//发送请求数据是异步的,返回future对象,调用方用于同步等待响应,类似于提交到线程池返回Future对象用于等待返回结果DefaultFuture future = new DefaultFuture(channel, req, timeout);try{channel.send(req);}catch (RemotingException e) {future.cancel();throw e;}return future;}public class DefaultFuture implements ResponseFuture {//存储所有requestId 和 通道映射,用于判断当前通道是否有未完成的请求private static final Map<Long, Channel>       CHANNELS   = new ConcurrentHashMap<Long, Channel>();//存储请求id和future对象映射,用于收到响应唤醒等待线程private static final Map<Long, DefaultFuture> FUTURES   = new ConcurrentHashMap<Long, DefaultFuture>();// invoke id.private final long                            id;private final Channel                         channel;private final Request                         request;private final int                             timeout;private final Lock                            lock = new ReentrantLock();private final Condition                       done = lock.newCondition();private final long                            start = System.currentTimeMillis();private volatile long                         sent;private volatile Response                     response;private volatile ResponseCallback             callback;public DefaultFuture(Channel channel, Request request, int timeout){this.channel = channel;this.request = request;this.id = request.getId();this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);// put into waiting map.FUTURES.put(id, this);CHANNELS.put(id, channel);}public Object get() throws RemotingException {return get(timeout);}public Object get(int timeout) throws RemotingException {if (timeout <= 0) {timeout = Constants.DEFAULT_TIMEOUT;}if (! isDone()) {long start = System.currentTimeMillis();lock.lock();try {while (! isDone()) {done.await(timeout, TimeUnit.MILLISECONDS);if (isDone() || System.currentTimeMillis() - start > timeout) {break;}}} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}if (! isDone()) {throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));}}return returnFromResponse();}public boolean isDone() {return response != null;}private Object returnFromResponse() throws RemotingException {Response res = response;if (res == null) {throw new IllegalStateException("response cannot be null");}if (res.getStatus() == Response.OK) {return res.getResult();}if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());}throw new RemotingException(channel, res.getErrorMessage());}//channelHandle收到服务端响应后调用该方法唤醒调用方等待线程public static void received(Channel channel, Response response) {try {DefaultFuture future = FUTURES.remove(response.getId());if (future != null) {future.doReceived(response);} else {logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress()));}} finally {CHANNELS.remove(response.getId());}}private void doReceived(Response res) {lock.lock();try {response = res;if (done != null) {//发生收到相应信号,通知等待线程done.signal();}} finally {lock.unlock();}if (callback != null) {invokeCallback(callback);}}
}public interface ChannelHandlerDelegate extends ChannelHandler {public ChannelHandler getHandler();
}
/*
*HeaderExchangeHandler实现了ChannelHandler接口。实现了NIO消息回调处理,
*本类最关键的方法在于received方法,同时在各个回调方法设置Chanel的读写数据时间用于心跳定时器任务发送心跳判断条件
*/
public class HeaderExchangeHandler implements ChannelHandlerDelegate {protected static final Logger logger              = LoggerFactory.getLogger(HeaderExchangeHandler.class);public static String          KEY_READ_TIMESTAMP  = HeartbeatHandler.KEY_READ_TIMESTAMP;public static String          KEY_WRITE_TIMESTAMP = HeartbeatHandler.KEY_WRITE_TIMESTAMP;private final ExchangeHandler handler;public HeaderExchangeHandler(ExchangeHandler handler){if (handler == null) {throw new IllegalArgumentException("handler == null");}this.handler = handler;}void handlerEvent(Channel channel, Request req) throws RemotingException {if (req.getData() != null && req.getData().equals(Request.READONLY_EVENT)) {channel.setAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY, Boolean.TRUE);}}Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {Response res = new Response(req.getId(), req.getVersion());if (req.isBroken()) {Object data = req.getData();String msg;if (data == null) msg = null;else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);else msg = data.toString();res.setErrorMessage("Fail to decode request due to: " + msg);res.setStatus(Response.BAD_REQUEST);return res;}// find handler by message class.Object msg = req.getData();try {// handle data.Object result = handler.reply(channel, msg);res.setStatus(Response.OK);res.setResult(result);} catch (Throwable e) {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(e));}return res;}static void handleResponse(Channel channel, Response response) throws RemotingException {if (response != null && !response.isHeartbeat()) {//非常关键的调用,用于将client收到的响应唤醒等待响应结果的用户线程DefaultFuture.received(channel, response);}}public void connected(Channel channel) throws RemotingException {channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);try {handler.connected(exchangeChannel);} finally {HeaderExchangeChannel.removeChannelIfDisconnected(channel);}}public void disconnected(Channel channel) throws RemotingException {channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);try {handler.disconnected(exchangeChannel);} finally {HeaderExchangeChannel.removeChannelIfDisconnected(channel);}}public void sent(Channel channel, Object message) throws RemotingException {Throwable exception = null;try {channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);try {handler.sent(exchangeChannel, message);} finally {HeaderExchangeChannel.removeChannelIfDisconnected(channel);}} catch (Throwable t) {exception = t;}if (message instanceof Request) {Request request = (Request) message;DefaultFuture.sent(channel, request);}if (exception != null) {if (exception instanceof RuntimeException) {throw (RuntimeException) exception;} else if (exception instanceof RemotingException) {throw (RemotingException) exception;} else {throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(),exception.getMessage(), exception);}}}private static boolean isClientSide(Channel channel) {InetSocketAddress address = channel.getRemoteAddress();URL url = channel.getUrl();return url.getPort() == address.getPort() && NetUtils.filterLocalHost(url.getIp()).equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress()));}//关键方法,实现client端Response响应对应处理以及server端Request请求处理public void received(Channel channel, Object message) throws RemotingException {channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);try {if (message instanceof Request) {// handle request.Request request = (Request) message;if (request.isEvent()) {handlerEvent(channel, request);} else {if (request.isTwoWay()) {Response response = handleRequest(exchangeChannel, request);channel.send(response);} else {handler.received(exchangeChannel, request.getData());}}} else if (message instanceof Response) {handleResponse(channel, (Response) message);} else if (message instanceof String) {if (isClientSide(channel)) {Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());logger.error(e.getMessage(), e);} else {String echo = handler.telnet(channel, (String) message);if (echo != null && echo.length() > 0) {channel.send(echo);}}} else {handler.received(exchangeChannel, message);}} finally {HeaderExchangeChannel.removeChannelIfDisconnected(channel);}}public void caught(Channel channel, Throwable exception) throws RemotingException {if (exception instanceof ExecutionException) {ExecutionException e = (ExecutionException) exception;Object msg = e.getRequest();if (msg instanceof Request) {Request req = (Request) msg;if (req.isTwoWay() && ! req.isHeartbeat()) {Response res = new Response(req.getId(), req.getVersion());res.setStatus(Response.SERVER_ERROR);res.setErrorMessage(StringUtils.toString(e));channel.send(res);return;}}}ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);try {handler.caught(exchangeChannel, exception);} finally {HeaderExchangeChannel.removeChannelIfDisconnected(channel);}}public ChannelHandler getHandler() {if (handler instanceof ChannelHandlerDelegate) {return ((ChannelHandlerDelegate) handler).getHandler();} else {return handler;}}
}/*
*千呼万唤始出来,是时候展示dubbo协议的入口函数了
*/
public class DubboProtocol extends AbstractProtocol {//根据服务提供者URL和服务接口类型创建Invoker对象public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {// modified by lishenoptimizeSerialization(url);// create rpc invoker.DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);invokers.add(invoker);return invoker;}private ExchangeClient[] getClients(URL url){//默认使用共享链接boolean service_share_connect = false;int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);//如果connections不配置,则共享连接,否则每服务每连接if (connections == 0){service_share_connect = true;connections = 1;}ExchangeClient[] clients = new ExchangeClient[connections];for (int i = 0; i < clients.length; i++) {if (service_share_connect){clients[i] = getSharedClient(url);} else {clients[i] = initClient(url);}}return clients;}/***获取共享连接 */private ExchangeClient getSharedClient(URL url){String key = url.getAddress();ReferenceCountExchangeClient client = referenceClientMap.get(key);if ( client != null ){if ( !client.isClosed()){client.incrementAndGetCount();return client;} else {
//                logger.warn(new IllegalStateException("client is closed,but stay in clientmap .client :"+ client));referenceClientMap.remove(key);}}ExchangeClient exchagneclient = initClient(url);client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap);referenceClientMap.put(key, client);ghostClientMap.remove(key);return client; }/*** 创建新连接.*/private ExchangeClient initClient(URL url) {// client type setting.String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));String version = url.getParameter(Constants.DUBBO_VERSION_KEY);boolean compatible = (version != null && version.startsWith("1.0."));url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);//默认开启heartbeaturl = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));// BIO存在严重性能问题,暂时不允许使用if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {throw new RpcException("Unsupported client type: " + str + "," +" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));}ExchangeClient client ;try {//设置连接应该是lazy的 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){client = new LazyConnectExchangeClient(url ,requestHandler);} else {//该工厂方法会创建之前介绍的HeaderExchangeClient实例client = Exchangers.connect(url ,requestHandler);}} catch (RemotingException e) {throw new RpcException("Fail to create remoting client for service(" + url+ "): " + e.getMessage(), e);}return client;}
}
/*
*通过DubboProtocol创建好的ExchangeClient对象来创建DubboInvoker
*DubboInvoker可以支持异步调用,同步调用,仅发送请求3种模式
*/
public class DubboInvoker<T> extends AbstractInvoker<T> {private final ExchangeClient[]      clients;private final AtomicPositiveInteger index = new AtomicPositiveInteger();private final String                version;private final ReentrantLock     destroyLock = new ReentrantLock();private final Set<Invoker<?>> invokers;public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients){this(serviceType, url, clients, null);}public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers){super(serviceType, url, new String[] {Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});this.clients = clients;// get version.this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0");this.invokers = invokers; }@Overrideprotected Result doInvoke(final Invocation invocation) throws Throwable {RpcInvocation inv = (RpcInvocation) invocation;final String methodName = RpcUtils.getMethodName(invocation);inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());inv.setAttachment(Constants.VERSION_KEY, version);ExchangeClient currentClient;if (clients.length == 1) {currentClient = clients[0];} else {currentClient = clients[index.getAndIncrement() % clients.length];}try {boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);if (isOneway) {boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);currentClient.send(inv, isSent);RpcContext.getContext().setFuture(null);return new RpcResult();} else if (isAsync) {ResponseFuture future = currentClient.request(inv, timeout) ;RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));return new RpcResult();} else {RpcContext.getContext().setFuture(null);return (Result) currentClient.request(inv, timeout).get();}} catch (TimeoutException e) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (RemotingException e) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}}@Overridepublic boolean isAvailable() {if (!super.isAvailable())return false;for (ExchangeClient client : clients){if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)){//cannot write == not Available ?return true ;}}return false;}public void destroy() {//防止client被关闭多次.在connect per jvm的情况下,client.close方法会调用计数器-1,当计数器小于等于0的情况下,才真正关闭if (super.isDestroyed()){return ;} else {//dubbo check ,避免多次关闭destroyLock.lock();try{if (super.isDestroyed()){return ;}super.destroy();if (invokers != null){invokers.remove(this);}for (ExchangeClient client : clients) {try {client.close();} catch (Throwable t) {logger.warn(t.getMessage(), t);}}}finally {destroyLock.unlock();}}}
}复制代码

转载于:https://juejin.im/post/5bf29697f265da6172651692