首页 Dubbo中Filter过滤器拦截器的实现原理实现自定义的Filter过滤器

Dubbo中Filter过滤器拦截器的实现原理实现自定义的Filter过滤器

举报
开通vip

Dubbo中Filter过滤器拦截器的实现原理实现自定义的Filter过滤器     Dubbo中Filter过滤器,拦截器的实现原理,实现自定义的Filter过滤器                  我们知道Dubbo中大部分的实现类加载都是通过SPI实现,同样Dubbo也提供了Filter机制,这个部分研究下怎么实现了,是怎样的一个调用逻辑。首先我们看下Dubbo中Filter的定义:*FilterChainin3.x**->Filter->Invoker**Proxy->ClusterFilter->ClusterInvoker->Filter->Invoker**->Filter-...

Dubbo中Filter过滤器拦截器的实现原理实现自定义的Filter过滤器
     Dubbo中Filter过滤器,拦截器的实现原理,实现自定义的Filter过滤器                  我们知道Dubbo中大部分的实现类加载都是通过SPI实现,同样Dubbo也提供了Filter机制,这个部分研究下怎么实现了,是怎样的一个调用逻辑。首先我们看下Dubbo中Filter的定义:*FilterChainin3.x**->Filter->Invoker**Proxy->ClusterFilter->ClusterInvoker->Filter->Invoker**->Filter->Invoker@SPIpublicinterfaceFilterextendsBaseFilter{}Dubbo中的filter可以在provider和consumer工作,而其注入的时机,我们需要回忆下之前说的Dubbo中的SPI机制实现,自定义对接SPI我们知道,当我们加载Protocol的时候,如果有WrapperClass的话,先加载Wrapper,多个wrapper会嵌套,然后里面在嵌套一个真实的需要加载的类同样对于Filter过滤器是基于Wrapper实现的服务端在Dubbo中默认的Protocol的Wrapper有下面几个:ProtocolFilterWrapperProtocolListenerWrapperQosProtocolWrapper这里我们说下ProtocolFilterWrapper这里就是Dubbo中Filter过滤器实现的地方:@Activate(order=100)publicclassProtocolFilterWrapperimplementsProtocol{privatefinalProtocolprotocol;privatestaticfinalFilterChainBuilderbuilder=ExtensionLoader.getExtensionLoader(FilterChainBuilder.class).getDefaultExtension();publicProtocolFilterWrapper(Protocolprotocol){if(protocol==null){thrownewIllegalArgumentException("protocol==null");}this.protocol=protocol;}@OverridepublicintgetDefaultPort(){returnprotocol.getDefaultPort();}@OverridepublicExporterexport(Invokerinvoker)throwsRpcException{if(UrlUtils.isRegistry(invoker.getUrl())){returnprotocol.export(invoker);}returnprotocol.export(builder.buildInvokerChain(invoker,SERVICE_FILTER_KEY,CommonConstants.PROVIDER));}.....}而这里在export的时候,如果不是注册中心方式,会通过FilterChainBuilder来构造一个FilterChainNode(注册中心方式的话,后面会修改url中的protocol,最后还是走这个逻辑),FilterChainBuilder的Dubbo默认实现为DefaultFilterChainBuilder:publicInvokerbuildInvokerChain(finalInvokeroriginalInvoker,Stringkey,Stringgroup){Invokerlast=originalInvoker;Listfilters=ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(originalInvoker.getUrl(),key,group);if(!filters.isEmpty()){for(inti=filters.size()-1;i>=0;i--){finalFilterfilter=filters.get(i);finalInvokernext=last;last=newFilterChainNode<>(originalInvoker,next,filter);}}returnlast;}这里通过ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(originalInvoker.getUrl(),key,group);会获取到一个Filter的集合列表,这里的key和group分别为:SERVICE_FILTER_KEY,CommonConstants.PROVIDER表示的是服务端的过滤和分组。获取到Filter列表之后,构造了一个FilterChainNode链表,并返回链表的头部,注意这个FilterChainNode也是一个Invoker这样在服务端我们进行export的暴露服务的时候,生成的是一个FilterChainNode的Invoker链表,而这里的Filter加载的顺序,还是之前基于SPI的 分析 定性数据统计分析pdf销售业绩分析模板建筑结构震害分析销售进度分析表京东商城竞争战略分析 ,是根据Activate注解中order的顺序,从大到小顺序排序。我们看下Dubbo中默认的Filter有哪些:echo=org.apache.dubbo.rpc.filter.EchoFiltergeneric=org.apache.dubbo.rpc.filter.GenericFiltergenericimpl=org.apache.dubbo.rpc.filter.GenericImplFiltertoken=org.apache.dubbo.rpc.filter.TokenFilteraccesslog=org.apache.dubbo.rpc.filter.AccessLogFilterclassloader=org.apache.dubbo.rpc.filter.ClassLoaderFiltercontext=org.apache.dubbo.rpc.filter.ContextFilterexception=org.apache.dubbo.rpc.filter.ExceptionFilterexecutelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilterdeprecated=org.apache.dubbo.rpc.filter.DeprecatedFiltercompatible=org.apache.dubbo.rpc.filter.CompatibleFiltertimeout=org.apache.dubbo.rpc.filter.TimeoutFiltertps=org.apache.dubbo.rpc.filter.TpsLimitFilter返回的Invoker是一个FilterChainNode,我们看下其invoke方法时怎么执行的:publicResultinvoke(Invocationinvocation)throwsRpcException{ResultasyncResult;try{asyncResult=filter.invoke(nextNode,invocation);}catch(Exceptione){if(filterinstanceofListenableFilter){ListenableFilterlistenableFilter=((ListenableFilter)filter);try{Filter.Listenerlistener=listenableFilter.listener(invocation);if(listener!=null){listener.onError(e,originalInvoker,invocation);}}finally{listenableFilter.removeListener(invocation);}}elseif(filterinstanceofFILTER.Listener){FILTER.Listenerlistener=(FILTER.Listener)filter;listener.onError(e,originalInvoker,invocation);}throwe;}finally{}returnasyncResult.whenCompleteWithContext((r,t)->{if(filterinstanceofListenableFilter){ListenableFilterlistenableFilter=((ListenableFilter)filter);Filter.Listenerlistener=listenableFilter.listener(invocation);try{if(listener!=null){if(t==null){listener.onResponse(r,originalInvoker,invocation);}else{listener.onError(t,originalInvoker,invocation);}}}finally{listenableFilter.removeListener(invocation);}}elseif(filterinstanceofFILTER.Listener){FILTER.Listenerlistener=(FILTER.Listener)filter;if(t==null){listener.onResponse(r,originalInvoker,invocation);}else{listener.onError(t,originalInvoker,invocation);}}});}可以看到,调用的是filter.invoke(nextNode,invocation);并且会把下一个节点传递过去,而开始创建的那个filer中,nextNode就是原始的Invoker,这样就会形成一个调用链,即:最先开始调用的都是Filter的逻辑,Filter都执行完之后没有问 快递公司问题件快递公司问题件货款处理关于圆的周长面积重点题型关于解方程组的题及答案关于南海问题 ,执行原始的Invoker我们举个ExecuteLimitFilter为例:publicResultinvoke(Invokerinvoker,Invocationinvocation)throwsRpcException{URLurl=invoker.getUrl();StringmethodName=invocation.getMethodName();intmax=url.getMethodParameter(methodName,EXECUTES_KEY,0);if(!RpcStatus.beginCount(url,methodName,max)){thrownewRpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,"Failedtoinvokemethod"+invocation.getMethodName()+"inprovider"+url+",cause:Theserviceusingthreadsgreaterthanlimited.");}invocation.put(EXECUTE_LIMIT_FILTER_START_TIME,System.currentTimeMillis());try{returninvoker.invoke(invocation);}catch(Throwablet){if(tinstanceofRuntimeException){throw(RuntimeException)t;}else{thrownewRpcException("unexpectedexceptionwhenExecuteLimitFilter",t);}}}可以看到这里在最终执行之前,会判断当前执行的方法正在执行的数量,如果大于给定的值,这里就会抛出异常,否则继续向后执行。消费端对于消费端,如果不是注册中心方式,和服务提供端完全一致,如果是基于注册中心的,这时候消费端和服务端则不一样了,消费端调用的是Protocol.refer,而这个方法返回的是一个Cluster,我们实现的几个Cluster都是基于AbstractCluster,而基于前面的分析Dubbo消费端启动流程、处理逻辑,方法调用实现,Cluster会和服务目录进行join之后返回一个Invoker,这里AbstractCluster.join实现如下:publicInvokerjoin(Directorydirectory)throwsRpcException{if(directoryinstanceofStaticDirectory){returndoJoin(directory);}returnbuildClusterInterceptors(doJoin(directory),directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY));}注册中心的Directory都不是StaticDirectory,会走第二个逻辑,而第二个逻辑则是构建Filter链的关键:privateInvokerbuildClusterInterceptors(AbstractClusterInvokerclusterInvoker,Stringkey){AbstractClusterInvokerlast=buildInterceptorInvoker(newClusterFilterInvoker<>(clusterInvoker));if(Boolean.parseBoolean(ConfigurationUtils.getProperty(CLUSTER_INTERCEPTOR_COMPATIBLE_KEY,"false"))){returnbuild27xCompatibleClusterInterceptors(clusterInvoker,last);}returnlast;}publicClusterFilterInvoker(AbstractClusterInvokerinvoker){Listbuilders=ExtensionLoader.getExtensionLoader(FilterChainBuilder.class).getActivateExtensions();if(CollectionUtils.isEmpty(builders)){filterInvoker=invoker;}else{ClusterInvokertmpInvoker=invoker;for(FilterChainBuilderbuilder:builders){tmpInvoker=builder.buildClusterInvokerChain(tmpInvoker,REFERENCE_FILTER_KEY,CommonConstants.CONSUMER);}filterInvoker=tmpInvoker;}}@OverridepublicResultinvoke(Invocationinvocation)throwsRpcException{returnfilterInvoker.invoke(invocation);}可以看到,这里生成了一个ClusterFilterInvoker,而在ClusterFilterInvoker中通过FilterChainBuilder.buildClusterInvokerChain.生成了ClusterFilterChainNode链,而Dubbo中默认FilterChainBuilder就是DefaultFilterChainBuilder:publicClusterInvokerbuildClusterInvokerChain(finalClusterInvokeroriginalInvoker,Stringkey,Stringgroup){ClusterInvokerlast=originalInvoker;Listfilters=ExtensionLoader.getExtensionLoader(ClusterFilter.class).getActivateExtension(originalInvoker.getUrl(),key,group);if(!filters.isEmpty()){for(inti=filters.size()-1;i>=0;i--){finalClusterFilterfilter=filters.get(i);finalInvokernext=last;last=newClusterFilterChainNode<>(originalInvoker,next,filter);}}returnlast;}这里除了生成Filter链还会判断是否需要支持ClusterInterceptor,开启条件是dubbo.application.cluster.interceptor.compatible=true但是请注意这里的Filter类型是ClusterFilter,返回的是一个ClusterFilterChainNode。然后根据我们之前的研究Dubbo中服务注册与发现实现原理,消费端基于服务注册中心回去注册中心中获取对应的注册服务的的信息,然后进行初始化实际的Invoker,在ServiceDiscoveryRegistryDirectory中会进行实际的Invoker的初始化,这时候获取的protocol就是实际的protocl(实际上就是和服务端的一样,是一个Wrapper),而获取的这个protocol也是一个wrapper,还是和服务端一样的逻辑。从这里能够看出,可能的处理链路如下:Proxy->Filter->InvokerProxy->ClusterFilter->ClusterInvoker->Filter->Invoker第一种一般在没有配置中心的时候,获取到的Protocol就是一个Wrapper的Protocol,这其中就有Filter的包装类第二种一般是配有配置中心的情况,这个时候首先生成的是ClusterFilter然后从配置中心获取配置之后再次调用实际的Protocol进行处理,这时候获取的和第一种一样也是一个Wrapper的Protocol。实现自定义Filter如果我们要想实现自定义的Filter,那么我们可进行如下处理:实现Filter接口,且接口需要加上Activate注解,不加上该注解Filter不会被激活在当前的classpath下建立META-INF/dubbo/internal/、META-INF/dubbo/、META-INF/services/其中一个文件夹就行,然后在该目录下建立一个文件,名称为:org.apache.dubbo.rpc.Filter,内容为:xxx=my.custom.XxxFilter完成。 -全文完-
本文档为【Dubbo中Filter过滤器拦截器的实现原理实现自定义的Filter过滤器】,请使用软件OFFICE或WPS软件打开。作品中的文字与图均可以修改和编辑, 图片更改请在作品中右键图片并更换,文字修改请直接点击文字进行修改,也可以新增和删除文档中的内容。
该文档来自用户分享,如有侵权行为请发邮件ishare@vip.sina.com联系网站客服,我们会及时删除。
[版权声明] 本站所有资料为用户分享产生,若发现您的权利被侵害,请联系客服邮件isharekefu@iask.cn,我们尽快处理。
本作品所展示的图片、画像、字体、音乐的版权可能需版权方额外授权,请谨慎使用。
网站提供的党政主题相关内容(国旗、国徽、党徽..)目的在于配合国家政策宣传,仅限个人学习分享使用,禁止用于任何广告和商用目的。
下载需要: 免费 已有0 人下载
最新资料
资料动态
专题动态
个人认证用户
资教之佳
暂无简介~
格式:doc
大小:41KB
软件:Word
页数:16
分类:互联网
上传时间:2023-06-20
浏览量:15