跳至主要內容

01-Dubbo核心基础

holic-x...大约 150 分钟架构RPC

01-Dubbo核心基础

学习核心

  • ① dubbo 源码环境搭建
  • ② dubbo 的配置总线
  • ③ dubbo SPI 剖析
  • ④ 海量定时任务:时间轮
  • ⑤ Zookeeper & Curator
  • ⑥ 代理模式设计与实现
  • ⑦ Netty 入门核心
  • ⑧ 简易版RPC框架实现

学习资料

环境搭建

① dubbo 源码环境搭建

学习目标

  • 整理梳理Dubbo架构,帮助了解Dubbo基本功能和核心角色
  • 搭建Dubbo源码环境,构建一个Demo实例可运行的最简环境
  • 深入学习Dubbo源码中各个核心模块的功能
  • 基于Dubbo源码自带的3个demo实例,回顾dubbo的基础用法(debug剖析源码的入口)
    • xml
    • annotation
    • api

1.Dubbo 架构

​ Dubbo 架构核心:Registry 注册中心、Provider 服务提供者、Consumer 服务消费者、Monitor 监控中心

image-20241225152016533
  • Registry:注册中心。 负责服务地址的注册与查找,服务的 Provider 和 Consumer 只在启动时与注册中心交互。注册中心通过长连接感知 Provider 的存在,在 Provider 出现宕机的时候,注册中心会立即推送相关事件通知 Consumer。
  • Provider:服务提供者。 在它启动的时候,会向 Registry 进行注册操作,将自己服务的地址和相关配置信息封装成 URL 添加到 ZooKeeper 中。
  • Consumer:服务消费者。 在它启动的时候,会向 Registry 进行订阅操作。订阅操作会从 ZooKeeper 中获取 Provider 注册的 URL,并在 ZooKeeper 中添加相应的监听器。获取到 Provider URL 之后,Consumer 会根据负载均衡算法从多个 Provider 中选择一个 Provider 并与其建立连接,最后发起对 Provider 的 RPC 调用。 如果 Provider URL 发生变更,Consumer 将会通过之前订阅过程中在注册中心添加的监听器,获取到最新的 Provider URL 信息,进行相应的调整,比如断开与宕机 Provider 的连接,并与新的 Provider 建立连接。Consumer 与 Provider 建立的是长连接,且 Consumer 会缓存 Provider 信息,所以一旦连接建立,即使注册中心宕机,也不会影响已运行的 Provider 和 Consumer。
  • Monitor:监控中心。 用于统计服务的调用次数和调用时间。Provider 和 Consumer 在运行过程中,会在内存中统计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。监控中心在上面的架构图中并不是必要角色,监控中心宕机不会影响 Provider、Consumer 以及 Registry 的功能,只会丢失监控数据而已。

2.Dubbo 源码环境搭建

⚽ Dubbo 源码环境搭建

​ 可以直接从dubbo官方源码仓库open in new window直接fork到自己的仓库(参考dubbo-2.7.7版本)

① 下载源码:git clone git@github.com:xxxxxxxx/dubbo.git

② 切换分支:git checkout -b dubbo-2.7.7 dubbo-2.7.7 (切换到指定的版本)

mvn编译:mvn clean install -Dmaven.test.skip=true

mvn转化为IDEA项目:mvn idea:idea (要是执行报错,就执行这个 mvn idea:workspace )

⑤ 在idea中导入源码

⚽ Dubbo 源码核心模块

(1)dubbo-common 模块

​ Dubbo 的一个公共模块,其中有很多工具类以及公共逻辑,包括后续要学习的 Dubbo SPI 实现、时间轮实现、动态编译器等

image-20241225153723292
(2)dubbo-remoting 模块

​ Dubbo 的远程通信模块,其中的子模块依赖各种开源组件实现远程通信。在 dubbo-remoting-api 子模块中定义该模块的抽象概念,在其他子模块中依赖其他开源组件进行实现,例如,dubbo-remoting-netty4 子模块依赖 Netty 4 实现远程通信,dubbo-remoting-zookeeper 通过 Apache Curator 实现与 ZooKeeper 集群的交互。

image-20241225153917884

(3)dubbo-rpc 模块

​ Dubbo 中对远程调用协议进行抽象的模块,其中抽象了各种协议,依赖于 dubbo-remoting 模块的远程调用功能。dubbo-rpc-api 子模块是核心抽象,其他子模块是针对具体协议的实现,例如,dubbo-rpc-dubbo 子模块是对 Dubbo 协议的实现,依赖了 dubbo-remoting-netty4 等 dubbo-remoting 子模块。 dubbo-rpc 模块的实现中只包含一对一的调用,不关心集群的相关内容

image-20241225154018188

(4)dubbo-cluster 模块

​ Dubbo 中负责管理集群的模块,提供了负载均衡、容错、路由等一系列集群相关的功能,最终的目的是将多个 Provider 伪装为一个 Provider,这样 Consumer 就可以像调用一个 Provider 那样调用 Provider 集群了

(5)dubbo-registry 模块

​ Dubbo 中负责与多种开源注册中心进行交互的模块,提供注册中心的能力。其中, dubbo-registry-api 子模块是顶层抽象,其他子模块是针对具体开源注册中心组件的具体实现,例如,dubbo-registry-zookeeper 子模块是 Dubbo 接入 ZooKeeper 的具体实现

image-20241225154214986

(6)dubbo-monitor 模块

​ Dubbo 的监控模块,主要用于统计服务调用次数、调用时间以及实现调用链跟踪的服务。

(7)dubbo-config 模块

​ Dubbo 对外暴露的配置都是由该模块进行解析的。例如,dubbo-config-api 子模块负责处理 API 方式使用时的相关配置,dubbo-config-spring 子模块负责处理与 Spring 集成使用时的相关配置方式。有了 dubbo-config 模块,用户只需要了解 Dubbo 配置的规则即可,无须了解 Dubbo 内部的细节。

image-20241225154320841

(8)dubbo-metadata 模块

​ Dubbo 的元数据模块。dubbo-metadata 模块的实现套路也是有一个 api 子模块进行抽象,然后其他子模块进行具体实现

image-20241225154411619

(9)dubbo-configcenter 模块

​ Dubbo 的动态配置模块,主要负责外部化配置以及服务治理规则的存储与通知,提供了多个子模块用来接入多种开源的服务发现组件

image-20241225155031958

(10)dubbo-demo

​ Dubbo 源码提供的demo示例,共包括三个非常基础 的 Dubbo 示例项目,分别是: 使用 XML 配置的 Demo 示例、使用注解配置的 Demo 示例 以及 直接使用 API 的 Demo 示例 。从这三个示例的角度,简单介绍 Dubbo 的基本使用。同时,这三个项目也将作为后续 Debug Dubbo 源码的入口,在后续的学习中会根据需要在其之上进行修改 。不过在这儿之前,需要先启动 ZooKeeper 作为注册中心,然后编写一个业务接口作为 Provider 和 Consumer 的公约。

image-20241225155141159

⚽ Dubbo-demo 测试

环境配置说明

  • Zookeeper(3.4.14版本)
  • JDK 8(如果demo启动错误,考虑版本兼容性问题,此处用JDK8进行版本测试)
  • dubbo - 2.7.x
(1)本地Zookeeper环境搭建

​ 结合Dubbo架构图可知, Provider 的地址以及配置信息是通过注册中心传递给 Consumer 的。 Dubbo 支持的注册中心尽管有很多, 但在生产环境中, 基本都是用 ZooKeeper 作为注册中心 。因此,在调试 Dubbo 源码时,自然需要在本地启动 ZooKeeper。

本地Zookeeper环境搭建(windows)

  • ① 下载并解压 zookeeper-3.4.14.tar.gzopen in new window 包(tar -zxf zookeeper-3.4.14.tar.gz

  • ② 进入zk目录,将conf/zoo_sample.cfgcopy一份并重命名conf/zoo.cfg,随后启动Zookeeper./bin/zkServer.sh start

    • window 下可以直接执行zkServer.cmd(注意windows下不是zkServer.cmd start,而是zkServer.cmd(否则启动提示报错))

      image-20241225160704500

​ 如果在demo启动的时候提示zookeeper not connected则尝试关闭窗口重启zookeeper后尝试(有可能是zookeeper没有启动成功,也有可能是连接zookeeper超时导致异常触发)。正常来说,如果demo(例如provider启动连接zookeeper)启动尝试连接zookeeper时,相应cmd窗口会有信息提示,如果没有(可能程序一直卡在那里),则需要关闭窗口尝试重新启动zookeeper

image-20241225171221774

(2)业务接口

​ 在使用 Dubbo 之前,还需要一个业务接口,这个业务接口可以认为是 Dubbo Provider 和 Dubbo Consumer 的公约,反映出很多信息:

  • Provider ,如何提供服务、提供的服务名称是什么、需要接收什么参数、需要返回什么响应;
  • Consumer ,如何使用服务、使用的服务名称是什么、需要传入什么参数、会得到什么响应;

dubbo-demo-interface 模块就是定义业务接口的地方,如下图所示:

image-20241225160944587

​ 且需要将这些公约接口导入到仓库中,执行指令mvn clean install -Dmaven.test.skip=true进行编译

(3)不同形式的 Provider & Consumer 实现
① 基于XML配置dubbo-demo-xml

​ 在 dubbo-demo 模块下的 dubbo-demo-xml 模块,提供了基于 Spring XML 的 Provider 和 Consumer

  • Provider 配置核心
    • pom.xml配置:依赖dubbo-demo-interface公共接口(接口公约)
    • ② 实现服务接口:DemoServiceImpl接口实现(以构建服务接口DemoService为例)
    • dubbo-provider.xml配置:spring 配置文件,基于xml形式配置接口服务相关内容:注册中心配置、要暴露的接口定义
    • ④ 启动类:Application配置,加载指定配置文件并启动接口服务
  • Consumer 配置核心
    • pom.xml配置:依赖dubbo-demo-interface公共接口(接口公约)
    • dubbo-consumer.xml配置:spring 配置文件,基于xml形式配置服务发现相关内容:配置要引用的接口服务
    • ③ 启动类:Application配置,加载指定配置文件并启动消费服务

Provider 实现:dubbo-demo-xml-provider

  • pom.xml配置:除了一堆dubbo的依赖之外,还依赖dubbo-demo-interface这个公共接口

    <dependency>
        <groupId>org.apache.dubbo</groupId>
        <artifactId>dubbo-demo-interface</artifactId>
        <version>${project.parent.version}</version>
    </dependency>
    
  • DemoServiceImpl接口实现:自定义DemoServiceImpl实现DemoService接口的sayHello同步方法和sayHelloAsync异步方法

    public class DemoServiceImpl implements DemoService {
        private static final Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);
    
        @Override
        public String sayHello(String name) {
            logger.info("Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
            return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress();
        }
    
        @Override
        public CompletableFuture<String> sayHelloAsync(String name) {
            CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
                return "async result";
            });
            return cf;
        }
    }
    
  • dubbo-provider.xml配置:

    • bean 配置:在dubbo-provider.xmlDemoServiceImpl配置为一个Spring bean 并将其作为DemoService服务暴露出去

      <!-- 配置Spring Bean -->
      <bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>
      <!-- 将demoService作为服务发布出去-->
      <dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService"/>
      
    • 注册中心配置:配置Zookeeper注册中心地址,将要暴露的服务注册到ZK中

      <!-- zookeeper 注册中心地址配置 -->
      <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
      
  • Application配置:在启动类main方法中指定Spring配置文件并启动ClassPathXmlApplicationContext

    public class Application {
        public static void main(String[] args) throws Exception {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-provider.xml");
            context.start();
            System.in.read();
        }
    }
    

Consumer 实现:dubbo-demo-xml-consumer

  • pom.xml配置:除了一堆dubbo的依赖之外,还依赖dubbo-demo-interface这个公共接口(服务提供方和服务调用方的接口公约)

    <dependency>
        <groupId>org.apache.dubbo</groupId>
        <artifactId>dubbo-demo-interface</artifactId>
        <version>${project.parent.version}</version>
    </dependency>
    
  • dubbo-consumer.xml配置:

    • dubbo-reference引入服务

      <!--引入DemoService服务,并配置成Spring Bean--> 
      <dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService"/> 
      
    • 配置zookeeper注册中心地址

      <!-- Zookeeper地址 --> 
      <dubbo:registry address="zookeeper://127.0.0.1:2181"/> 
      
  • Application配置:在启动类main方法中指定Spring配置文件并启动ClassPathXmlApplicationContext

    public class Application {
        /**
         * In order to make sure multicast registry works, need to specify '-Djava.net.preferIPv4Stack=true' before
         * launch the application
         */
        public static void main(String[] args) throws Exception {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-consumer.xml");
            context.start();
            DemoService demoService = context.getBean("demoService", DemoService.class);
            String hello = demoService.sayHello("world");
            System.out.println("result: " + hello);
        }
    }
    

启动测试:先Provider、后Consumer

① Provider 启动成功:信息提示

[25/12/24 17:21:13:660 CST] main  INFO metadata.DynamicConfigurationServiceNameMapping:  [DUBBO] Dubbo service[null] mapped to interface name[org.apache.dubbo.demo.DemoService]., dubbo version: , current host: 172.19.30.91
[25/12/24 17:21:13:670 CST] main  INFO bootstrap.DubboBootstrap:  [DUBBO] DubboBootstrap is ready., dubbo version: , current host: 172.19.30.91
[25/12/24 17:21:13:670 CST] main  INFO bootstrap.DubboBootstrap:  [DUBBO] DubboBootstrap has started., dubbo version: , current host: 172.19.30.91

​ 可以启动ZkCli.cmd通过ls /查看节点是否注册成功

image-20241225173438315

DubboAdmin

② Consumer 启动成功:信息提示(可以正常调用服务接口并响应结果)

[26/12/24 16:21:57:504 CST] NettyClientWorker-1-1  INFO netty4.NettyClientHandler:  [DUBBO] The connection of /172.19.30.91:50188 -> /172.19.30.91:20880 is established., dubbo version: , current host: 172.19.30.91
[26/12/24 16:21:57:634 CST] main  INFO config.ReferenceConfig:  [DUBBO] Refer dubbo service org.apache.dubbo.demo.DemoService from url zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?anyhost=true&application=demo-consumer&check=false&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&init=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=21624&qos.port=33333&register.ip=172.19.30.91&release=&remote.application=demo-provider&side=consumer&sticky=false&timestamp=1735201316676, dubbo version: , current host: 172.19.30.91
result: Hello world, response from provider: 172.19.30.91:20880
[26/12/24 16:21:58:196 CST] DubboShutdownHook  INFO config.DubboShutdownHook:  [DUBBO] Run shutdown hook now., dubbo version: , current host: 172.19.30.91

Provier 启动常见问题:

(1)Can't create adaptive extension interface org.apache.dubbo.rpc.Protocol

​ 解决方案:JDK 版本兼容问题(JDK17运行报错,调整为JDK1.8版本)

Exception in thread "main" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.dubbo.demo.DemoService': Instantiation of bean failed; nested exception is java.lang.ExceptionInInitializerError
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateBean(AbstractAutowireCapableBeanFactory.java:1159)

.........

Caused by: java.lang.IllegalStateException: Failed to create adaptive instance: java.lang.IllegalStateException: Can't create adaptive extension interface org.apache.dubbo.rpc.Protocol, cause: Unable to make protected final java.lang.Class java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain) throws java.lang.ClassFormatError accessible: module java.base does not "opens java.lang" to unnamed module @61322f9d
	at org.apache.dubbo.common.extension.ExtensionLoader.getAdaptiveExtension(ExtensionLoader.java:605)
	at org.apache.dubbo.config.ServiceConfig.<clinit>(ServiceConfig.java:118)
	... 21 more

(2)zookeeper not connected

​ 解决方案:确认Zookeeper是否正常启动,是否因连接超时导致的错误

Exception in thread "main" java.lang.IllegalStateException: zookeeper not connected
	at org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperClient.<init>(CuratorZookeeperClient.java:91)
	at org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperTransporter.createZookeeperClient(CuratorZookeeperTransporter.java:27)
	at org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperTransporter.connect(AbstractZookeeperTransporter.java:70)
	at org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter$Adaptive.connect(ZookeeperTransporter$Adaptive.java)

(3)No application config found

​ 解决方案:在sring配置文件(dubbo-provider.xml)中配置<dubbo:application name="demo-provider"/>

Exception in thread "main" java.lang.IllegalStateException: No application config found or it's not a valid config! Please add <dubbo:application name="..." /> to your spring config.
	at org.apache.dubbo.config.utils.ConfigValidationUtils.validateApplicationConfig(ConfigValidationUtils.java:371)
	at org.apache.dubbo.config.bootstrap.DubboBootstrap.checkGlobalConfigs(DubboBootstrap.java:528)
	at org.apache.dubbo.config.bootstrap.DubboBootstrap.initialize(DubboBootstrap.java:515)
	at org.apache.dubbo.config.bootstrap.DubboBootstrap.start(DubboBootstrap.java:744)

Consumer 启动常见问题

(1)Service invoke 失败

Exception in thread "main" org.apache.dubbo.rpc.RpcException: Failed to invoke service */org.apache.dubbo.demo.DemoService: org.apache.dubbo.remoting.RemotingException: org.apache.dubbo.remoting.RemotingException: Not found exported service: */org.apache.dubbo.demo.DemoService:20880 in [org.apache.dubbo.demo.DemoService:20880], may be version or group mismatch , channel: consumer: /172.19.30.91:49387 --> provider: /172.19.30.91:20880, message:RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[], attachments={input=281, path=org.apache.dubbo.demo.DemoService, async=true, remote.application=demo-consumer, application=demo-consumer, dubbo=2.0.2, id=0, interface=org.apache.dubbo.demo.DemoService, version=0.0.0, group=*}]
org.apache.dubbo.remoting.RemotingException: Not found exported service: */org.apache.dubbo.demo.DemoService:20880 in [org.apache.dubbo.demo.DemoService:20880], may be version or group mismatch , channel: consumer: /172.19.30.91:49387 --> provider: /172.19.30.91:20880, message:RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[], attachments={input=281, path=org.apache.dubbo.demo.DemoService, async=true, remote.application=demo-consumer, application=demo-consumer, dubbo=2.0.2, id=0, interface=org.apache.dubbo.demo.DemoService, version=0.0.0, group=*}]
	at org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol.getInvoker(DubboProtocol.java:271)

​ 确认对应接口是否存在于注册中心,检查消费方配置<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService"/>

​ 如果接口提供方没有指定group,对于消费方则不需要指定group定位接口服务,否则可能导致分组检索时找不到已经发布的服务接口

② 基于注解配置dubbo-demo-annotation
  • Provider:基于注解配置概念,将自定义的服务发布发布出去(@EnableDubbo
  • Consumer:基于注解配置概念,引入服务接口(@Reference

dubbo-demo-provider

  • ① 通过注解配置ProviderConfiguration, Application启动基于该配置初始化加载
public class Application {
    public static void main(String[] args) throws Exception {
        // 使用AnnotationConfigApplicationContext初始化Spring容器,从ProviderConfiguration这个类的注解上拿相关配置
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ProviderConfiguration.class);
        context.start();
        System.in.read();
    }

    @Configuration // 配置类
    @EnableDubbo(scanBasePackages = "org.apache.dubbo.demo.provider") // @EnableDubbo注解指定包下的Bean都会被扫描,并作为Dubbo服务暴露出去
    @PropertySource("classpath:/spring/dubbo-provider.properties") // @PropertySource指定配置信息
    static class ProviderConfiguration {
        @Bean
        public RegistryConfig registryConfig() {
            RegistryConfig registryConfig = new RegistryConfig();
            registryConfig.setAddress("zookeeper://127.0.0.1:2181");
            return registryConfig;
        }
    }
}
  • spring/dubbo-provider.proterites 配置dubbo基础配置
dubbo.application.name=dubbo-demo-annotation-provider
dubbo.protocol.name=dubbo
dubbo.protocol.port=20880

dubbo-demo-consumer

  • ① 配置服务发现组件,填充业务逻辑(调用服务接口)(DemoServiceComponent
@Component("demoServiceComponent")
public class DemoServiceComponent implements DemoService {
    @Reference // 注入dubbo服务
    private DemoService demoService;

    @Override
    public String sayHello(String name) {
        return demoService.sayHello(name);
    }

    @Override
    public CompletableFuture<String> sayHelloAsync(String name) {
        return null;
    }
}
  • Application启动类配置,启动消费者
public class Application {
    /**
     * In order to make sure multicast registry works, need to specify '-Djava.net.preferIPv4Stack=true' before
     * launch the application
     */
    public static void main(String[] args) {
        // 加载ConsumerConfiguration配置,启动服务
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfiguration.class);
        context.start();
        // 测试服务接口调用逻辑
        DemoService service = context.getBean("demoServiceComponent", DemoServiceComponent.class);
        String hello = service.sayHello("world");
        System.out.println("result :" + hello);
    }

    @Configuration
    @EnableDubbo(scanBasePackages = "org.apache.dubbo.demo.consumer.comp")
    @PropertySource("classpath:/spring/dubbo-consumer.properties")
    @ComponentScan(value = {"org.apache.dubbo.demo.consumer.comp"})
    static class ConsumerConfiguration {

    }
}
  • spring/dubbo-consumer.proterites 配置dubbo基础配置
dubbo.application.name=dubbo-demo-annotation-consumer
dubbo.registry.address=zookeeper://127.0.0.1:2181

启动测试(先Provider后Consumer)

  • ① Provider 启动测试
[26/12/24 16:52:25:517 CST] main  INFO metadata.DynamicConfigurationServiceNameMapping:  [DUBBO] Dubbo service[null] mapped to interface name[org.apache.dubbo.demo.DemoService]., dubbo version: , current host: 172.19.30.91
[26/12/24 16:52:25:522 CST] main  INFO bootstrap.DubboBootstrap:  [DUBBO] DubboBootstrap is ready., dubbo version: , current host: 172.19.30.91
[26/12/24 16:52:25:522 CST] main  INFO bootstrap.DubboBootstrap:  [DUBBO] DubboBootstrap has started., dubbo version: , current host: 172.19.30.91
  • ② Consumer 启动测试
[26/12/24 16:53:42:932 CST] main  INFO bootstrap.DubboBootstrap:  [DUBBO] DubboBootstrap is ready., dubbo version: , current host: 172.19.30.91
[26/12/24 16:53:42:932 CST] main  INFO bootstrap.DubboBootstrap:  [DUBBO] DubboBootstrap has started., dubbo version: , current host: 172.19.30.91
result :Hello world, response from provider: 172.19.30.91:20880
[26/12/24 16:53:43:236 CST] DubboShutdownHook  INFO config.DubboShutdownHook:  [DUBBO] Run shutdown hook now., dubbo version: , current host: 172.19.30.91
[26/12/24 16:53:43:237 CST] Thread-0  INFO annotation.AnnotationConfigApplicationContext: Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@610694f1: startup date [Thu Dec 26 16:53:41 CST 2024]; root of context hierarchy
③ 基于API配置dubbo-demo-api

​ 在一些场景中并没有依赖Spring模块,因此没有办法通过上述xmlannotation方式接入dubbo(构建Dubbo ProviderDubbo Consumer),比较典型的一种场景就是构建SDK的时候

dubbo-demo-api-provider

ServiceConfig配置

public class Application {
    public static void main(String[] args) throws Exception {
        if (isClassic(args)) {
            startWithExport();
        } else {
            startWithBootstrap();
        }
    }

    private static boolean isClassic(String[] args) {
        return args.length > 0 && "classic".equalsIgnoreCase(args[0]);
    }

    private static void startWithBootstrap() {
        // ServiceConfig 配置 服务接口
        ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
        service.setInterface(DemoService.class);
        service.setRef(new DemoServiceImpl());

        // 基于DubboBootstrap启动:生成ApplicationConfig实例、指定ZK地址以及ServiceConfig实例
        DubboBootstrap bootstrap = DubboBootstrap.getInstance();
        bootstrap.application(new ApplicationConfig("dubbo-demo-api-provider"))
                .registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
                .service(service)
                .start()
                .await();
    }

    private static void startWithExport() throws InterruptedException {
        // ServiceConfig 配置 服务接口、注册中心配置、暴露服务
        ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
        service.setInterface(DemoService.class);
        service.setRef(new DemoServiceImpl());
        service.setApplication(new ApplicationConfig("dubbo-demo-api-provider"));
        service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
        service.export();

        System.out.println("dubbo service started");
        new CountDownLatch(1).await();
    }
}

dubbo-demo-api-consumer

public class Application {
    public static void main(String[] args) {
        if (isClassic(args)) {
            runWithRefer();
        } else {
            runWithBootstrap();
        }
    }

    private static boolean isClassic(String[] args) {
        return args.length > 0 && "classic".equalsIgnoreCase(args[0]);
    }

    private static void runWithBootstrap() {
        ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
        reference.setInterface(DemoService.class);
        reference.setGeneric("true");

        DubboBootstrap bootstrap = DubboBootstrap.getInstance();
        bootstrap.application(new ApplicationConfig("dubbo-demo-api-consumer"))
                .registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
                .reference(reference)
                .start();

        DemoService demoService = ReferenceConfigCache.getCache().get(reference);
        String message = demoService.sayHello("dubbo");
        System.out.println(message);

        // generic invoke
        GenericService genericService = (GenericService) demoService;
        Object genericInvokeResult = genericService.$invoke("sayHello", new String[] { String.class.getName() },
                new Object[] { "dubbo generic invoke" });
        System.out.println(genericInvokeResult);
    }

    private static void runWithRefer() {
        ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
        reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer"));
        reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
        reference.setInterface(DemoService.class);
        DemoService service = reference.get();
        String message = service.sayHello("dubbo");
        System.out.println(message);
    }
}

启动测试(先Provider后Consumer)

  • Provider
[26/12/24 17:22:53:054 CST] main  INFO migration.MigrationRuleListener:  [DUBBO] INIT, dubbo version: , current host: 172.19.30.91
[26/12/24 17:22:53:152 CST] main  INFO bootstrap.DubboBootstrap:  [DUBBO] DubboBootstrap is ready., dubbo version: , current host: 172.19.30.91
[26/12/24 17:22:53:154 CST] main  INFO bootstrap.DubboBootstrap:  [DUBBO] DubboBootstrap has started., dubbo version: , current host: 172.19.30.91
[26/12/24 17:22:53:154 CST] main  INFO bootstrap.DubboBootstrap:  [DUBBO] DubboBootstrap awaiting ..., dubbo version: , current host: 172.19.30.91
  • Consumer
[26/12/24 17:23:15:610 CST] main  INFO config.ReferenceConfig:  [DUBBO] Refer dubbo service org.apache.dubbo.rpc.service.GenericService from url dubbo://172.19.30.91/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-consumer&check=false&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=true&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=9624&register.ip=172.19.30.91&release=release&remote.application=dubbo-demo-api-provider&service.name=ServiceBean:/org.apache.dubbo.demo.DemoService&side=consumer&sticky=false&timestamp=1735204995024, dubbo version: , current host: 172.19.30.91
[26/12/24 17:23:15:623 CST] main  INFO bootstrap.DubboBootstrap:  [DUBBO] DubboBootstrap is ready., dubbo version: , current host: 172.19.30.91
[26/12/24 17:23:15:625 CST] main  INFO bootstrap.DubboBootstrap:  [DUBBO] DubboBootstrap has started., dubbo version: , current host: 172.19.30.91
Hello dubbo, response from provider: 172.19.30.91:20880
Hello dubbo generic invoke, response from provider: 172.19.30.91:20880
[26/12/24 17:23:15:864 CST] DubboShutdownHook  INFO config.DubboShutdownHook:  [DUBBO] Run shutdown hook now., dubbo version: , current host: 172.19.30.91

② dubbo 的配置总线

学习目标

  • dubbo url 核心解读 及 应用
    • URL:dubbo的配置总线,在dubbo的设计场景中起到核心作用,其结构简单清晰易处理,在上下文传递和参数处理过程中非常适用,简化开发沟通成本
    • URL在 dubbo 的场景设计应用:SPI扩展类接口设计应用、服务发现设计应用(服务暴露、服务订阅等通过URL给注册中心传递关键信息)

1.URL(dubbo 的配置总线)

​ 在互联网领域,每个信息资源都有统一的且在网上唯一的地址,该地址就叫 URL(Uniform Resource Locator,统一资源定位符),它是互联网的统一资源定位标志,也就是指网络地址。URL 本质上就是一个特殊格式的字符串。一个标准的 URL 格式可以包含如下的几个部分:

protocol://username:password@host:port/path?key=value&key=value
  • protocol:URL 的协议。常见的就是 HTTP 协议和 HTTPS 协议。还有其他协议,如 FTP 协议、SMTP 协议等
  • username/password:用户名/密码。 HTTP Basic Authentication 中多会使用在 URL 的协议之后直接携带用户名和密码的方式
  • host/port:主机/端口。在实践中一般会使用域名,而不是使用具体的 host 和 port
  • path:请求的路径
  • parameters:参数键值对。一般在 GET 请求中会将参数放到 URL 中,POST 请求会将参数放到请求体中

​ URL 是整个 Dubbo 中非常基础,也是非常核心的一个组件,阅读源码的过程中会发现很多方法都是以 URL 作为参数的,在方法内部解析传入的 URL 得到有用的参数,所以有人将 URL 称为Dubbo 的配置总线

​ 例如在 Dubbo SPI 核心实现中, URL 参与了扩展实现的确定;

​ 例如在注册中心实现中,Provider 将自身的信息封装成 URL 注册到 ZooKeeper 中,从而暴露自己的服务, Consumer 也是通过 URL 来确定自己订阅了哪些 Provider 的

​ 由此可见,URL 之于 Dubbo 是非常重要的,所以说“抓住 URL,就理解了半个 Dubbo”

2.Dubbo 中 的 URL

​ Dubbo 中任意的一个实现都可以抽象为一个 URL,Dubbo 使用 URL 来统一描述了所有对象和配置信息,并贯穿在整个 Dubbo 框架之中。可以来看 Dubbo 中一个典型 URL 的示例,如下(可以从服务的日志中得到URL信息):

dubbo://172.17.32.91:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=32508&release=&side=provider&timestamp=1593253404714dubbo://172.17.32.91:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=32508&release=&side=provider&timestamp=1593253404714

这个 Demo Provider 注册到 ZooKeeper 上的 URL 信息,简单解析一下这个 URL 的各个部分:

  • protocol:dubbo 协议。
  • username/password:没有用户名和密码。
  • host/port:172.17.32.91:20880。
  • path:org.apache.dubbo.demo.DemoService。
  • parameters:参数键值对,这里是问号后面的参数。

下面是 URL 的构造方法,可以看到其核心字段与前文分析的 URL 基本一致:

public URL(String protocol, 

           String username, 

           String password, 

           String host, 

           int port, 

           String path, 

           Map<String, String> parameters, 

           Map<String, Map<String, String>> methodParameters) { 

    if (StringUtils.isEmpty(username) 

        && StringUtils.isNotEmpty(password)) { 

        throw new IllegalArgumentException("Invalid url"); 

    } 

    this.protocol = protocol; 

    this.username = username; 

    this.password = password; 

    this.host = host; 

    this.port = Math.max(port, 0); 

    this.address = getAddress(this.host, this.port); 

    while (path != null && path.startsWith("/")) { 

        path = path.substring(1); 

    } 

    this.path = path; 

    if (parameters == null) { 

        parameters = new HashMap<>(); 

    } else { 

        parameters = new HashMap<>(parameters); 

    } 

    this.parameters = Collections.unmodifiableMap(parameters); 

    this.methodParameters = Collections.unmodifiableMap(methodParameters); 

}

另外,在 dubbo-common 包中还提供了 URL 的辅助类:

  • URLBuilder, 辅助构造 URL;
  • URLStrParser, 将字符串解析成 URL 对象

契约的力量

​ 对于 Dubbo 中的 URL,很多人称之为“配置总线”,也有人称之为“统一配置模型”。虽然说法不同,但都是在表达一个意思,URL 在 Dubbo 中被当作是“公共的契约”。一个 URL 可以包含非常多的扩展点参数,URL 作为上下文信息贯穿整个扩展点设计体系。

​ 其实,一个优秀的开源产品都有一套灵活清晰的扩展契约,不仅是第三方可以按照这个契约进行扩展,其自身的内核也可以按照这个契约进行搭建。如果没有一个公共的契约,只是针对每个接口或方法进行约定,就会导致不同的接口甚至同一接口中的不同方法,以不同的参数类型进行传参,一会儿传递 Map,一会儿传递字符串,而且字符串的格式也不确定,需要自己进行解析,这就多了一层没有明确表现出来的隐含的约定。

所以说,在 Dubbo 中使用 URL 的好处多多,增加了便捷性:

  • ① 上下文信息传递:代码更加易读、易懂,不用花大量时间去揣测传递数据的格式和含义,进而形成一个统一的规范,使得代码易写、易读
  • ② 方法入参:使用 URL 作为方法的入参(相当于一个Map<String,String> ),它所表达的含义比单个参数更丰富,当代码需要扩展的时候,可以将新的参数以 Key/Value 的形式追加到 URL 之中,而不需要改变入参或是返回值的结构
  • ③ 简化沟通:使用 URL 这种“公共的契约”可以简化沟通,人与人之间的沟通消耗是非常大的,信息传递的效率非常低,使用统一的契约、术语、词汇范围,可以省去很多沟通成本,尽可能地提高沟通效率

3.Dubbo中的URL示例

① URL 在 SPI 中的应用

​ Dubbo SPI 中有一个依赖 URL 的重要场景——适配器方法,是被 @Adaptive 注解标注的, URL 一个很重要的作用就是与 @Adaptive 注解一起选择合适的扩展实现类。

​ 例如,在 dubbo-registry-api 模块中的 RegistryFactory这个接口,其中的 getRegistry() 方法上有@Adaptive({"protocol"})注解,说明这是一个适配器方法,Dubbo 在运行时会为其动态生成相应的 $Adaptive 类型,如下所示:

/**
 * RegistryFactory. (SPI, Singleton, ThreadSafe)
 *
 * @see org.apache.dubbo.registry.support.AbstractRegistryFactory
 */
@SPI("dubbo")
public interface RegistryFactory {

    /**
     * Connect to the registry
     * <p>
     * Connecting the registry needs to support the contract: <br>
     * 1. When the check=false is set, the connection is not checked, otherwise the exception is thrown when disconnection <br>
     * 2. Support username:password authority authentication on URL.<br>
     * 3. Support the backup=10.20.153.10 candidate registry cluster address.<br>
     * 4. Support file=registry.cache local disk file cache.<br>
     * 5. Support the timeout=1000 request timeout setting.<br>
     * 6. Support session=60000 session timeout or expiration settings.<br>
     *
     * @param url Registry address, is not allowed to be empty
     * @return Registry reference, never return empty value
     */
    @Adaptive({"protocol"})
    Registry getRegistry(URL url);

}

​ 启动dubbo-demo中的任意一个demo的Provider服务,然后在getRegistry方法中打断点查看断点信息(不同版本源码对注册中心配置的实现有所不同,此处结合源码提示分析不同的实现:关注AbstractRegistryFactory#getRegistry的方法实现,设置断点,启动Provider服务随后查看断点配置信息)

image-20241227134526592

image-20241227134530580

​ 根据上述断点调试的URL可以跟踪到设置的注册中心参数配置信息

zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=19584&timestamp=1735278180154

② URL 在服务暴露中的应用

​ 在介绍 Dubbo 的简化架构时提到,Provider 在启动时,会将自身暴露的服务注册到 ZooKeeper 上,此处也可通过断掉调试看具体是注册哪些信息到 ZooKeeper上

​ 此处使用的是zookeeper注册中心,因此可以通过跟踪dubbo-registry-zookeeper中的相关实现方法,定位ZookeeperRegistry#doRegistry(服务暴露方法核心)

image-20241227135546432

​ 根据注册的URL,查看基于服务暴露传递给注册中心的参数信息

  • 服务方接口地址:172.19.30.91:20880
  • 暴露服务接口等信息:org.apache.dubbo.demo.DemoService
dubbo://172.19.30.91:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=9476&release=&side=provider&timestamp=1735278886583

③ URL 在服务订阅中的应用

​ 同理,服务提供方启动后会将指定的服务接口注册到注册中心,而服务消费方启动后也会向注册中心发起订阅操作,监听并关注自己的Provider,那Consumer是如何告诉注册中心自己关注哪些 Provider 呢?

​ 此处使用的注册中心是zookeeper,因此继续关注ZookeeperRegistry#doSubscribe方法(服务订阅方法核心),启动Consumer端并通过断点调试。通过断掉调试可以看到服务订阅的相关URL

image-20241227140803326

consumer://172.19.30.91/org.apache.dubbo.demo.DemoService?application=dubbo-demo-annotation-consumer&category=providers,configurators,routers&dubbo=2.0.2&init=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=17536&side=consumer&sticky=false&timestamp=1735279582654

​ Consumer 订阅信息解读:

  • ① Protocol 为 consumer:表示是 Consumer 的订阅协议
  • ② category 参数表示要订阅的分类(这里要订阅 providers、configurators 以及 routers 三个分类)
  • ③ interface 参数表示订阅哪个服务接口,这里要订阅的是暴露 org.apache.dubbo.demo.DemoService 实现的 Provider

通过 URL 中的上述参数,ZookeeperRegistry 会在 toCategoriesPath() 方法中将其整理成一个 ZooKeeper 路径,然后调用 zkClient 在其上添加监听。

③ dubbo SPI 剖析

学习目标

  • SPI 机制概念核心
  • Dubbo 中的 SPI 机制剖析
    • ① SPI 配置文件扩展:目录扩展(引入多目录接口存储不同用途的SPI配置文件)、格式扩展(KV格式存储)
    • ② @SPI :扩展接口标记,关注ExtensionLoader(对标ServiceLoader)装配逻辑
    • ③ @Adaptive 注解与适配器
    • ④ 自动包装特性:Wrapper 概念(装饰类模式构建,封装基础代码)
    • ⑤ 自动装配特性:自动装配属性就是在加载一个扩展点的时候,将其依赖的扩展点一并加载,并进行装配
    • ⑥ @Activate 注解与自动激活特性:自动激活配置,用于协同多个分组下的类应用(例如用于指定一个场景中多个Filter的情况下哪些Filter是可用的)

1.什么是SPI?

⚽ JDK SPI

(1)SPI 基本概念

​ SPI(Service Provider Interface)服务提供接口主要是被框架开发人员使用的一种技术。

​ 例如,使用 Java 语言访问数据库时我们会使用到 java.sql.Driver 接口,不同数据库产品底层的协议不同,提供的 java.sql.Driver 实现也不同,在开发 java.sql.Driver 接口时,开发人员并不清楚用户最终会使用哪个数据库,在这种情况下就可以使用 Java SPI 机制在实际运行过程中,为 java.sql.Driver 接口寻找具体的实现。

​ SPI全称Service Provider Interface(提供服务的接口,动态加载服务的机制),Java提供的一套用来被第三方实现或扩展的接口,实现了接口的动态扩展,让第三方的实现类能像插件一样嵌入到系统中。

image-20240522183247167

​ 其主要思想:将装配的控制权限移到程序之外(解耦)

​ 其本质构建在于将接口的实现类的全限定名配置在文件中(文件名是接口的全限定名),由服务加载器读取配置文件,加载实现类。实现了运行时动态为接口替换实现类

SPI核心要素

  • SPI 接口:为服务提供者实现类约定的的接口或抽象类
  • SPI 实现类:实际提供服务的实现类
  • SPI 配置:Java SPI 机制约定的配置文件,提供查找服务实现类的逻辑。配置文件必须置于 META-INF/services 目录中,并且,文件名应与服务提供者接口的完全限定名保持一致。文件中的每一行都有一个实现服务类的详细信息,同样是服务提供者类的完全限定名称
  • ServiceLoader:Java SPI 的核心类,用于加载 SPI 实现类。 ServiceLoader 中有各种实用方法来获取特定实现、迭代它们或重新加载服务

SPI 场景应用

​ 除却一些业务开发场景扩展分析,还有一些框架场景也会用到SPI机制,例如最常见的就是JDBC数据库加载、LOG日志,此外SpringBoot、Dubbo等框架中也对SPI机制有着不同的实现和扩展

​ 传统的基于JDK的SPI机制虽然简单,但是也具备局限性。例如其无法支持按需加载(需全量扫描迭代所有接口服务),且存在并发安全问题(在并发场景下如果多个并发多线程同时使用ServiceLoader类是不安全的)

(2)SPI 简单demo示例(自定义MyLog)

① 基础准备(接口和接口服务实现定义)

  • 定义MyLog接口
  • 定义ConsoleLogWebLog分别实现MyLog接口
/**
 * 自定义日志接口
 */
public interface MyLog {
    public void info(String str);
}

// ConsoleLog
public class ConsoleLog implements MyLog {
    @Override
    public void info(String str) {
        System.out.println("console log out:" + str);
    }
}

// WebLog
public class WebLog implements MyLog {
    @Override
    public void info(String str) {
        System.out.println("web log out:" + str);
    }
}

② 传统接口定义和使用

  • 定义接口实现,直接通过引用类的方式实现服务
// ① 传统方式引用服务
public static void testCase1() {
    ConsoleLog consoleLog = new ConsoleLog();
    consoleLog.info("hhh");
    WebLog webLog = new WebLog();
    webLog.info("xxx");
}

③ SPI 方式接入

  • resources/services/下创建文件(文件名称为接口的全限定类名),内容为接口服务实现类(实现类的全限定类名)
# com.noob.rpc.spi.MyLog 文件
com.noob.rpc.spi.impl.ConsoleLog
com.noob.rpc.spi.impl.WebLog
  • 通过ServiceLoaderiterator迭代器遍历接口实现,调用相应的方法
// ② SPI方式引用服务
public static void testCase2() {
    // 使用ServiceLoader动态加载指定接口的实现类
    ServiceLoader<MyLog> serviceLoader = ServiceLoader.load(MyLog.class);
    // 借助迭代器获取实现类信息
    Iterator<MyLog> iterator = serviceLoader.iterator();
    while (iterator.hasNext()) {
        MyLog myLog = iterator.next();
        myLog.info("keep...");
    }
}

2.Dubbo中的SPI?

⚽Dubbo 中的SPI

​ Dubbo 为了更好地达到 OCP 原则(即“对扩展开放,对修改封闭”的原则),采用了“微内核+插件”的架构。那什么是微内核架构呢?微内核架构也被称为插件化架构(Plug-in Architecture),这是一种面向功能进行拆分的可扩展性架构。内核功能是比较稳定的,只负责管理插件的生命周期,不会因为系统功能的扩展而不断进行修改。功能上的扩展全部封装到插件之中,插件模块是独立存在的模块,包含特定的功能,能拓展内核系统的功能。

​ 微内核架构中,内核通常采用 Factory、IOC、OSGi 等方式管理插件生命周期,Dubbo 最终决定采用 SPI 机制来加载插件,Dubbo SPI 参考 JDK 原生的 SPI 机制,进行了性能优化以及功能增强。

扩展点 & 扩展实现

  • 扩展点:扩展接口定义,通过 SPI 机制查找并加载实现的接口(又称“扩展接口”)。例如场景案例中的 Log 接口、com.mysql.cj.jdbc.Driver 接口,都是扩展点
  • 扩展实现:实现了扩展接口的实现类

Dubbo 中的SPI机制

​ 通过前面的分析可以知道JDK SPI机制无法支持按需加载,也就是说JDK SPI 在查找扩展实现类的过程中,需要遍历 SPI 配置文件中定义的所有实现类,该过程中会将这些实现类全部实例化。如果 SPI 配置文件中定义了多个实现类,而实际只需要使用其中一个实现类时,就会生成不必要的对象。

​ 例如,org.apache.dubbo.rpc.Protocol 接口有 InjvmProtocol、DubboProtocol、RmiProtocol、HttpProtocol、HessianProtocol、ThriftProtocol 等多个实现,如果使用 JDK SPI,就会加载全部实现类,导致资源的浪费。

Dubbo SPI 不仅解决了上述资源浪费的问题,还对 SPI 配置文件扩展和修改。

⚽Dubbo 中的SPI核心实现

① SPI配置文件扩展

​ Dubbo 将 SPI 配置文件按照不同的用途划分为三类目录:

  • (1)META-INF/services/目录:该目录下的 SPI 配置文件用来兼容 JDK SPI
  • (2)META-INF/dubbo/目录:该目录用于存放用户自定义 SPI 配置文件
  • (3)META-INF/dubbo/internal/目录:该目录用于存放 Dubbo 内部使用的 SPI 配置文件

​ 并将配置文件的格式改成了KV格式,例如dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol。此处的key被称为扩展名(ExtensionName),当要为一个接口查找具体实现类时可以通过指定扩展名来选择相应的扩展实现。

​ 例如以上述的MyLog案例中,如果基于JDK SPI机制,由于无法实现按需加载,因此当要使用某个具体的扩展类实现时,就需要遍历全量的实现类列表。如果基于这种KV形式,则可以通过配置consoleLog=com.noob.rpc.spi.impl.ConsoleLogwebLog=com.noob.rpc.spi.impl.WebLog,那么在应用中就可以通过指定扩展名指定要加载的扩展实现类,避免全量加载,解决按需加载场景问题

​ 此外,使用KV格式SPI配置文件的另一个好处在于更加容易定位问题。假设使用的一个扩展实现类所在的 jar 包没有引入到项目中,那么 Dubbo SPI 在抛出异常的时候,会携带该扩展名信息,而不是简单地提示扩展实现类无法加载。这些更加准确的异常信息降低了排查问题的难度,提高了排查问题的效率

② @SPI 注解

​ Dubbo 中某个接口被 @SPI注解修饰时,就表示该接口是扩展接口,例如 org.apache.dubbo.rpc.Protocolorg.apache.dubbo.registry.RegistryFactory 接口就是一个扩展接口

扩展接口示例

@SPI("dubbo")
public interface Protocol {
	...
}
@SPI("dubbo")
public interface RegistryFactory {
    @Adaptive({"protocol"})
    Registry getRegistry(URL url);
}

@SPI注解

@SPI 注解的 value 值指定了默认的扩展名

​ 例如,在通过 Dubbo SPI 加载 Protocol 接口实现时,如果没有明确指定扩展名,则默认会将 @SPI 注解的 value 值作为扩展名,即加载 dubbo 这个扩展名对应的 org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol 这个扩展实现类,相关的 SPI 配置文件在 dubbo-rpc-dubbo 模块中,如下图所示:

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface SPI {

    /**
     * default extension name
     */
    String value() default "";

}

image-20241227151409878

ExtensionLoader

​ 对标JDK SPI机制中的ServiceLoader,Dubbo提供了ExtensionLoader用于加载扩展类。参考dubbo-common模块下的org.apache.dubbo.common.extension.ExtensionLoader实现,进一步剖析ExtensionLoader是如何处理@SPI注解的

​ ExtensionLoader 位于 dubbo-common 模块中的 extension 包中,功能类似于 JDK SPI 中的 java.util.ServiceLoader。Dubbo SPI 的核心逻辑几乎都封装在 ExtensionLoader 之中(其中就包括 @SPI 注解的处理逻辑)其使用方式如下所示:

Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo");
  • ExtensionLoader的核心静态字段:

    • strategiesLoadingStrategy[]类型):有多个实现,分别用于加载上述dubbo提供的约定目录下的SPI配置文件

      image-20241227152723043

    • EXTENSION_LOADERSConcurrentMap<Class<?>, ExtensionLoader<?>>类型):Dubbo 中一个扩展接口对应一个 ExtensionLoader 实例,该集合缓存了全部 ExtensionLoader 实例,其中的 Key 为扩展接口,Value 为加载其扩展实现的 ExtensionLoader 实例

    • EXTENSION_INSTANCESConcurrentMap<Class<?>, Object>类型):该集合缓存了扩展实现类与其实例对象的映射关系。以上述分析为例,Key 为 Class,Value 为 DubboProtocol 对象

  • ExtensionLoader的实例字段:

    • type(Class<?>类型):当前 ExtensionLoader 实例负责加载扩展接口
    • cachedDefaultName(String类型):记录了 type 这个扩展接口上 @SPI 注解的 value 值,也就是默认扩展名
    • cachedNames(ConcurrentMap, String>类型):缓存了该 ExtensionLoader 加载的扩展实现类与扩展名之间的映射关系
    • cachedClasses(Holder>>类型):缓存了该 ExtensionLoader 加载的扩展名与扩展实现类之间的映射关系。cachedNames 集合的反向关系缓存
    • cachedInstances(ConcurrentMap>类型):缓存了该 ExtensionLoader 加载的扩展名与扩展实现对象之间的映射关系
  • ExtensionLoader核心方法getExtensionLoader

@SuppressWarnings("unchecked")
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
    if (type == null) {
        throw new IllegalArgumentException("Extension type == null");
    }
    if (!type.isInterface()) {
        throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
    }
    if (!withExtensionAnnotation(type)) {
        throw new IllegalArgumentException("Extension type (" + type +
                                           ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
    }

    ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    if (loader == null) {
        EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
        loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    }
    return loader;
}

​ 得到接口对应的 ExtensionLoader 对象之后会调用其 getExtension() 方法,根据传入的扩展名称从 cachedInstances 缓存中查找扩展实现的实例,最终将其实例化后返回

@SuppressWarnings("unchecked")
public T getExtension(String name) {
    if (StringUtils.isEmpty(name)) {
        throw new IllegalArgumentException("Extension name == null");
    }
    if ("true".equals(name)) {
        return getDefaultExtension();
    }
    // getOrCreateHolder()方法中封装了查找cachedInstances缓存的逻辑
    final Holder<Object> holder = getOrCreateHolder(name);
    Object instance = holder.get();
    if (instance == null) { // double-check防止并发问题
        synchronized (holder) {
            instance = holder.get();
            if (instance == null) {
                // 根据扩展名从SPI配置文件中查找对应的扩展实现类
                instance = createExtension(name);
                holder.set(instance);
            }
        }
    }
    return (T) instance;
}

​ 在 createExtension() 方法中完成了 SPI 配置文件的查找以及相应扩展实现类的实例化,同时还实现了自动装配以及自动 Wrapper 包装等功能。其核心流程是这样的:

(1)获取 cachedClasses 缓存,根据扩展名从 cachedClasses 缓存中获取扩展实现类。如果 cachedClasses 未初始化,则会扫描前面介绍的三个 SPI 目录获取查找相应的 SPI 配置文件,然后加载其中的扩展实现类,最后将扩展名和扩展实现类的映射关系记录到 cachedClasses 缓存中。这部分逻辑在 loadExtensionClasses() 和 loadDirectory() 方法中

(2)根据扩展实现类从 EXTENSION_INSTANCES 缓存中查找相应的实例。如果查找失败,会通过反射创建扩展实现对象。

(3)自动装配扩展实现对象中的属性(即调用其 setter)。这里涉及 ExtensionFactory 以及自动装配的相关内容

(4)自动包装扩展实现对象。这里涉及 Wrapper 类以及自动包装特性的相关内容

(5)如果扩展实现类实现了 Lifecycle 接口,在 initExtension() 方法中会调用 initialize() 方法进行初始化

@SuppressWarnings("unchecked")
private T createExtension(String name) {
    // 获取 cachedClasses 缓存,根据扩展名从 cachedClasses 缓存中获取扩展实现类。
    // 如果 cachedClasses 未初始化,则会扫描前面介绍的三个 SPI 目录获取查找相应的 SPI 配置文件,
    // 然后加载其中的扩展实现类,最后将扩展名和扩展实现类的映射关系记录到 cachedClasses 缓存中。
    // 这部分逻辑在 loadExtensionClasses() 和 loadDirectory() 方法中。
    Class<?> clazz = getExtensionClasses().get(name); // --- 1
    if (clazz == null) {
        throw findException(name);
    }
    try {
        // 根据扩展实现类从 EXTENSION_INSTANCES 缓存中查找相应的实例。
        // 如果查找失败,会通过反射创建扩展实现对象。
        T instance = (T) EXTENSION_INSTANCES.get(clazz); // --- 2
        if (instance == null) {
            EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        }

        // 自动装配扩展实现对象中的属性(即调用其 setter)。
        // 这里涉及到 ExtensionFactory 以及自动装配的相关内容,本课时后面会进行详细介绍。
        injectExtension(instance); // --- 3

        // 自动包装扩展实现对象。这里涉及到 Wrapper 类以及自动包装特性的相关内容,本课时后面会进行详细介绍。
        Set<Class<?>> wrapperClasses = cachedWrapperClasses; // --- 4
        if (CollectionUtils.isNotEmpty(wrapperClasses)) {
            for (Class<?> wrapperClass : wrapperClasses) {
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        // 如果扩展实现类实现了 Lifecycle 接口,在 initExtension() 方法中会调用 initialize() 方法进行初始化。
        initExtension(instance); // --5
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                                        type + ") couldn't be instantiated: " + t.getMessage(), t);
    }
}
@Adaptive注解与适配器

@Adaptive 注解用来实现 Dubbo 的适配器功能,

​ Dubbo 中的 ExtensionFactory 接口有三个实现类,如下图所示,ExtensionFactory 接口上有 @SPI 注解,AdaptiveExtensionFactory 实现类上有 @Adaptive 注解。

​ AdaptiveExtensionFactory 不实现任何具体的功能,而是用来适配 ExtensionFactory 的 SpiExtensionFactory 和 SpringExtensionFactory 这两种实现。AdaptiveExtensionFactory 会根据运行时的一些状态来选择具体调用 ExtensionFactory 的哪个实现。

​ @Adaptive 注解还可以加到接口方法之上,Dubbo 会动态生成适配器类。例如,Transporter接口有两个被 @Adaptive 注解修饰的方法:

@SPI("netty")
public interface Transporter {

    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;

    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;

}

​ Dubbo 会生成一个 Transporter$Adaptive 适配器类,该类继承了 Transporter 接口

public class Transporter$Adaptive implements Transporter { 
    public org.apache.dubbo.remoting.Client connect(URL arg0, ChannelHandler arg1) throws RemotingException { 
        if (arg0 == null) throw new IllegalArgumentException("url == null"); 
        URL url = arg0; 
        // 确定扩展名,优先从URL中的client参数获取,其次是transporter参数 
        // 这两个参数名称由@Adaptive注解指定,最后是@SPI注解中的默认值 
        String extName = url.getParameter("client",
            url.getParameter("transporter", "netty")); 
        if (extName == null) 
            throw new IllegalStateException("..."); 
        // 通过ExtensionLoader加载Transporter接口的指定扩展实现 
        Transporter extension = (Transporter) ExtensionLoader 
              .getExtensionLoader(Transporter.class) 
                    .getExtension(extName); 
        return extension.connect(arg0, arg1); 
    } 
    ... // 省略bind()方法 
}

​ Dubbo 动态生成适配类的逻辑在org.apache.dubbo.common.extension.ExtensionLoader#createAdaptiveExtensionClass。其中涉及的 javassist 等方面的知识

​ 适配器类及其实例保存在 ExtensionLoader 类的 cachedAdaptiveClass 和 cachedAdaptiveInstance 属性里

private Class<?> createAdaptiveExtensionClass() {
    String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
    ClassLoader classLoader = findClassLoader();
    org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
    return compiler.compile(code, classLoader);
}

​ 明确了 @Adaptive 注解的作用之后,回到 ExtensionLoader.createExtension() 方法,其中在扫描 SPI 配置文件的时候,会调用 loadClass() 方法加载 SPI 配置文件中指定的类,如下图所示:

image-20241227160208839

loadClass() 方法中会识别加载扩展实现类上的 @Adaptive 注解,将该扩展实现的类型缓存到 cachedAdaptiveClass 这个实例字段上(volatile修饰):

private void loadClass(){ 
    if (clazz.isAnnotationPresent(Adaptive.class)) { 

        // 缓存到cachedAdaptiveClass字段 

        cacheAdaptiveClass(clazz, overridden);

    } else ... // 省略其他分支 
}

可以通过 ExtensionLoader.getAdaptiveExtension() 方法获取适配器实例,并将该实例缓存到 cachedAdaptiveInstance 字段(Holder类型)中,核心流程如下:

  • ① 检查 cachedAdaptiveInstance 字段中是否已缓存了适配器实例,如果已缓存,则直接返回该实例即可
  • ② 调用 getExtensionClasses() 方法,其中就会触发前文介绍的 loadClass() 方法,完成 cachedAdaptiveClass 字段的填充。
  • ③ 如果存在 @Adaptive 注解修饰的扩展实现类,该类就是适配器类,通过 newInstance() 将其实例化即可。如果不存在 @Adaptive 注解修饰的扩展实现类,就需要通过 createAdaptiveExtensionClass() 方法扫描扩展接口中方法上的 @Adaptive 注解,动态生成适配器类,然后实例化
  • ④ 调用 injectExtension() 方法进行自动装配,就能得到一个完整的适配器实例
  • ⑤将适配器实例缓存到 cachedAdaptiveInstance 字段,然后返回适配器实例。

​ 此外,还可以通过 API 方式(addExtension() 方法)设置 cachedAdaptiveClass 这个字段,指定适配器类型。总之,适配器什么实际工作都不用做,就是根据参数和状态选择其他实现来完成工作

④ 自动包装特性

​ Dubbo 中的一个扩展接口可能有多个扩展实现类,这些扩展实现类可能会包含一些相同的逻辑,如果在每个实现类中都写一遍,那么这些重复代码就会变得很难维护。Dubbo 提供的自动包装特性,就可以解决这个问题。

​ Dubbo 将多个扩展实现类的公共逻辑,抽象到 Wrapper 类中,Wrapper 类与普通的扩展实现类一样,也实现了扩展接口,在获取真正的扩展实现对象时,在其外面包装一层 Wrapper 对象,可以理解成一层装饰器。

​ 了解了 Wrapper 类的基本功能,回到 ExtensionLoader.loadClass() 方法中,可以看到:

private void loadClass(){ 

    ... // 省略前面对@Adaptive注解的处理 

    } else if (isWrapperClass(clazz)) { // ---1 

        cacheWrapperClass(clazz); // ---2 

    } else ... // 省略其他分支

}

​ ① 在 isWrapperClass() 方法中,会判断该扩展实现类是否包含拷贝构造函数(即构造函数只有一个参数且为扩展接口类型),如果包含,则为 Wrapper 类,这就是判断 Wrapper 类的标准。

​ ② 将 Wrapper 类记录到 cachedWrapperClasses(Set>类型)这个实例字段中进行缓存

​ 前面在介绍 createExtension() 方法时的 4 处,有下面这段代码,其中会遍历全部 Wrapper 类并一层层包装到真正的扩展实例对象外层:

Set<Class<?>> wrapperClasses = cachedWrapperClasses;

if (CollectionUtils.isNotEmpty(wrapperClasses)) { 

    for (Class<?> wrapperClass : wrapperClasses) { 

        instance = injectExtension((T) wrapperClass 

            .getConstructor(type).newInstance(instance)); 

    } 

}
⑤ 自动装配特性

​ 在 createExtension() 方法中可以看到,Dubbo SPI 在拿到扩展实现类的对象(以及 Wrapper 类的对象)之后,还会调用 injectExtension() 方法扫描其全部 setter 方法,并根据 setter 方法的名称以及参数的类型,加载相应的扩展实现,然后调用相应的 setter 方法填充属性,这就实现了 Dubbo SPI 的自动装配特性。简单来说,自动装配属性就是在加载一个扩展点的时候,将其依赖的扩展点一并加载,并进行装配

​ 下面简单看一下 injectExtension() 方法的具体实现:

private T injectExtension(T instance) { 

    if (objectFactory == null) { // 检测objectFactory字段 

        return instance; 

    } 

    for (Method method : instance.getClass().getMethods()) { 

        ... // 如果不是setter方法,忽略该方法(略) 

        if (method.getAnnotation(DisableInject.class) != null) { 

            continue; // 如果方法上明确标注了@DisableInject注解,忽略该方法 

        } 

        // 根据setter方法的参数,确定扩展接口 

        Class<?> pt = method.getParameterTypes()[0]; 

        ... // 如果参数为简单类型,忽略该setter方法(略) 

        // 根据setter方法的名称确定属性名称 

        String property = getSetterProperty(method); 

        // 加载并实例化扩展实现类 

        Object object = objectFactory.getExtension(pt, property); 

        if (object != null) { 

            method.invoke(instance, object); // 调用setter方法进行装配 

        } 

    } 

    return instance; 

}

​ injectExtension() 方法实现的自动装配依赖了 ExtensionFactory(即 objectFactory 字段),前面我们提到过 ExtensionFactory 有 SpringExtensionFactory 和 SpiExtensionFactory 两个真正的实现(还有一个实现是 AdaptiveExtensionFactory 是适配器)。下面我们分别介绍下这两个真正的实现。

第一个,SpiExtensionFactory。 根据扩展接口获取相应的适配器,没有到属性名称:

@Override 

public <T> T getExtension(Class<T> type, String name) { 

    if (type.isInterface() && type.isAnnotationPresent(SPI.class)) { 

        // 查找type对应的ExtensionLoader实例 

        ExtensionLoader<T> loader = ExtensionLoader 

          .getExtensionLoader(type); 

        if (!loader.getSupportedExtensions().isEmpty()) { 

            return loader.getAdaptiveExtension(); // 获取适配器实现 

        } 

    } 

    return null; 

}

第二个,SpringExtensionFactory。 将属性名称作为 Spring Bean 的名称,从 Spring 容器中获取 Bean:

public <T> T getExtension(Class<T> type, String name) { 

    ... // 检查:type必须为接口且必须包含@SPI注解(略) 

    for (ApplicationContext context : CONTEXTS) { 

        // 从Spring容器中查找Bean 

        T bean = BeanFactoryUtils.getOptionalBean(context,name,type); 

        if (bean != null) { 

            return bean; 

        } 

    } 

    return null; 

}
@Activate注解与自动激活特性

​ 这里以 Dubbo 中的 Filter 为例说明自动激活特性的含义,org.apache.dubbo.rpc.Filter 接口有非常多的扩展实现类,在一个场景中可能需要某几个 Filter 扩展实现类协同工作,而另一个场景中可能需要另外几个实现类一起工作。这样,就需要一套配置来指定当前场景中哪些 Filter 实现是可用的,这就是 @Activate 注解要做的事情。

@Activate 注解标注在扩展实现类上,有 group、value 以及 order 三个属性

  • group 属性:修饰的实现类是在 Provider 端被激活还是在 Consumer 端被激活
  • value 属性:修饰的实现类只在 URL 参数中出现指定的 key 时才会被激活
  • order 属性:用来确定扩展实现类的排序

​ loadClass() 方法对 @Activate 的扫描,其中会将包含 @Activate 注解的实现类缓存到 cachedActivates 这个实例字段(Map类型,Key为扩展名,Value为 @Activate 注解):

private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name,
                       boolean overridden) throws NoSuchMethodException {
    if (!type.isAssignableFrom(clazz)) {
        throw new IllegalStateException("Error occurred when loading extension class (interface: " +
                                        type + ", class line: " + clazz.getName() + "), class "
                                        + clazz.getName() + " is not subtype of interface.");
    }

    if (clazz.isAnnotationPresent(Adaptive.class)) {
        // 缓存到cachedAdaptiveClass字段
        cacheAdaptiveClass(clazz, overridden);
    } else if (isWrapperClass(clazz)) { // --1
        // 1.在 isWrapperClass() 方法中,会判断该扩展实现类是否包含拷贝构造函数
        // (即构造函数只有一个参数且为扩展接口类型),如果包含,则为 Wrapper 类,这就是判断 Wrapper 类的标准。

        // 2.将 Wrapper 类记录到 cachedWrapperClasses(Set<Class<?>>类型)这个实例字段中进行缓存。
        cacheWrapperClass(clazz); // ---2
    } else {
        clazz.getConstructor(); // 扩展实现类必须有无参构造函数
        if (StringUtils.isEmpty(name)) {
            name = findAnnotationName(clazz);
            if (name.length() == 0) {
                throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
            }
        }

        String[] names = NAME_SEPARATOR.split(name);
        if (ArrayUtils.isNotEmpty(names)) {
            // 将包含@Activate注解的实现类缓存到cachedActivates集合中
            cacheActivateClass(clazz, names[0]);
            for (String n : names) {
                cacheName(clazz, n);
                saveInExtensionClass(extensionClasses, clazz, n, overridden);
            }
        }
    }
}

​ 使用 cachedActivates 这个集合的地方是 getActivateExtension() 方法。首先来关注 getActivateExtension() 方法的参数:url 中包含了配置信息,values 是配置中指定的扩展名,group 为 Provider 或 Consumer。下面是 getActivateExtension() 方法的核心逻辑:

​ (1)获取默认激活的扩展集合。默认激活的扩展实现类有几个条件:①在 cachedActivates 集合中存在;②@Activate 注解指定的 group 属性与当前 group 匹配;③扩展名没有出现在 values 中(即未在配置中明确指定,也未在配置中明确指定删除);④URL 中出现了 @Activate 注解中指定的 Key

​ (2)按照 @Activate 注解中的 order 属性对默认激活的扩展集合进行排序

​ (3)按序添加自定义扩展实现类的对象

public List<T> getActivateExtension(URL url, String[] values, String group) {
    List<T> activateExtensions = new ArrayList<>();
    // values就是扩展名
    List<String> names = values == null ? new ArrayList<>(0) : asList(values);
    if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) { // ---1
        getExtensionClasses(); // 触发cachedActivates等缓存字段的加载
        for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
            String name = entry.getKey(); // 扩展名
            Object activate = entry.getValue(); // @Activate注解

            String[] activateGroup, activateValue;

            if (activate instanceof Activate) { // @Activate注解中的配置
                activateGroup = ((Activate) activate).group();
                activateValue = ((Activate) activate).value();
            } else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {
                activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();
                activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();
            } else {
                continue;
            }
            if (isMatchGroup(group, activateGroup) // 匹配group
                && !names.contains(name) // 匹配扩展名
                && !names.contains(REMOVE_VALUE_PREFIX + name) // 如果包含"-"表示不激活该扩展实现
                && isActive(activateValue, url)) // 检测URL中是否出现了指定的Key
            {
                activateExtensions.add(getExtension(name)); // 加载扩展实现
            }
        }
        // 排序 --- 2
        activateExtensions.sort(ActivateComparator.COMPARATOR);
    }
    List<T> loadedExtensions = new ArrayList<>();
    for (int i = 0; i < names.size(); i++) { // ---3
        String name = names.get(i);
        // 通过"-"开头的配置明确指定不激活的扩展实现,直接就忽略了
        if (!name.startsWith(REMOVE_VALUE_PREFIX)
            && !names.contains(REMOVE_VALUE_PREFIX + name)) {
            if (DEFAULT_KEY.equals(name)) {
                if (!loadedExtensions.isEmpty()) {
                    // 按照顺序,将自定义的扩展添加到默认扩展集合前面
                    activateExtensions.addAll(0, loadedExtensions);
                    loadedExtensions.clear();
                }
            } else {
                loadedExtensions.add(getExtension(name));
            }
        }
    }
    if (!loadedExtensions.isEmpty()) {
        // 按照顺序,将自定义的扩展添加到默认扩展集合后面
        activateExtensions.addAll(loadedExtensions);
    }
    return activateExtensions;
}

举例分析

假设 cachedActivates 集合缓存的扩展实现如下表所示:

扩展名@Activate中的group@Activate中的order
demoFilter1Provider6
demoFilter2Provider5
demoFilter3Provider4
demoFilter4Provider3
demoFilter5Consumer2
demoFilter6Provider1

在 Provider 端调用 getActivateExtension() 方法时传入的 values 配置为 “demoFilter3、demoFilter2、default、demoFilter1”,那么根据上面的逻辑:

① 得到默认激活的扩展实实现集合中有 [ demoFilter4, demoFilter6 ];

② 排序后为 [ demoFilter6, demoFilter4 ];

③ 按序添加自定义扩展实例之后得到 [ demoFilter3, demoFilter6, demoFilter4, demoFilter1 ]

④ 海量定时任务:时间轮

学习目标

  • todo 海量定时任务处理

1.时间轮

​ 在很多开源框架中,都需要定时任务的管理功能,例如 ZooKeeper、Netty、Quartz、Kafka 以及 Linux 操作系统。

​ JDK 提供的 java.util.TimerDelayedQueue 等工具类,可以实现简单的定时任务管理,其底层实现使用的是这种数据结构,存取操作的复杂度都是 O(nlog(n)),无法支持大量的定时任务。在定时任务量比较大、性能要求比较高的场景中,为了将定时任务的存取操作以及取消操作的时间复杂度降为 O(1),一般会使用时间轮的方式。

时间轮是一种高效的、批量管理定时任务的调度模型。时间轮一般会实现成一个环形结构,类似一个时钟,分为很多槽,一个槽代表一个时间间隔,每个槽使用双向链表存储定时任务;指针周期性地跳动,跳动到一个槽位,就执行该槽位的定时任务。

​ 需要注意的是,单层时间轮的容量和精度都是有限的,对于精度要求特别高、时间跨度特别大或是海量定时任务需要调度的场景,通常会使用多级时间轮以及持久化存储与时间轮结合的方案。

2.核心设计

① 接口定义

​ Dubbo 的时间轮实现位于 dubbo-common 模块的 org.apache.dubbo.common.timer 包中,下面就来分析时间轮涉及的核心接口和实现。

​ 在 Dubbo 中,所有的定时任务都要继承 TimerTask 接口。TimerTask 接口非常简单,只定义了一个 run() 方法,该方法的入参是一个 Timeout 接口的对象。

public interface TimerTask {
    void run(Timeout timeout) throws Exception;
}

​ Timeout 对象与 TimerTask 对象一一对应,两者的关系类似于线程池返回的 Future 对象与提交到线程池中的任务对象之间的关系。通过 Timeout 对象,不仅可以查看定时任务的状态,还可以操作定时任务(例如取消关联的定时任务)。Timeout 接口中的方法如下图所示

public interface Timeout {

    Timer timer();

    TimerTask task();

    boolean isExpired();

    boolean isCancelled();

    boolean cancel();
}

​ Timer 接口定义了定时器的基本行为,如下图所示,其核心是 newTimeout() 方法:提交一个定时任务(TimerTask)并返回关联的 Timeout 对象,这有点类似于向线程池提交任务的感觉

public interface Timer {

    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);

    Set<Timeout> stop();

    boolean isStop();
}

② 接口实现

HashedWheelTimeout

HashedWheelTimeout :HashedWheelTimeout 是 Timeout 接口的唯一实现,是 HashedWheelTimer 的内部类。HashedWheelTimeout 扮演了两个角色:

  • 第一个,时间轮中双向链表的节点,即定时任务 TimerTask 在 HashedWheelTimer 中的容器。
  • 第二个,定时任务 TimerTask 提交到 HashedWheelTimer 之后返回的句柄(Handle),用于在时间轮外部查看和控制定时任务。

HashedWheelTimeout 中的核心字段如下:

  • prev、next(HashedWheelTimeout类型),分别对应当前定时任务在链表中的前驱节点和后继节点
  • task(TimerTask类型),指实际被调度的任务
  • deadline(long类型),指定时任务执行的时间。这个时间是在创建 HashedWheelTimeout 时指定的,计算公式是:currentTime(创建 HashedWheelTimeout 的时间) + delay(任务延迟时间) - startTime(HashedWheelTimer 的启动时间),时间单位为纳秒
  • state(volatile int类型),指定时任务当前所处状态,可选的有三个,分别是 INIT(0)、CANCELLED(1)和 EXPIRED(2)。另外,还有一个 STATE_UPDATER 字段(AtomicIntegerFieldUpdater类型)实现 state 状态变更的原子性
  • remainingRounds(long类型),指当前任务剩余的时钟周期数。时间轮所能表示的时间长度是有限的,在任务到期时间与当前时刻的时间差,超过时间轮单圈能表示的时长,就出现了套圈的情况,需要该字段值表示剩余的时钟周期

HashedWheelTimeout 中的核心方法有:

  • isCancelled()、isExpired() 、state() 方法, 主要用于检查当前 HashedWheelTimeout 状态
  • cancel() 方法, 将当前 HashedWheelTimeout 的状态设置为 CANCELLED,并将当前 HashedWheelTimeout 添加到 cancelledTimeouts 队列中等待销毁
  • expire() 方法, 当任务到期时,会调用该方法将当前 HashedWheelTimeout 设置为 EXPIRED 状态,然后调用其中的 TimerTask 的 run() 方法执行定时任务
  • remove() 方法, 将当前 HashedWheelTimeout 从时间轮中删除
HashedWheelBucket

​ HashedWheelBucket 是时间轮中的一个槽,时间轮中的槽实际上就是一个用于缓存和管理双向链表的容器,双向链表中的每一个节点就是一个 HashedWheelTimeout 对象,也就关联了一个 TimerTask 定时任务。

​ HashedWheelBucket 持有双向链表的首尾两个节点,分别是 head 和 tail 两个字段,再加上每个 HashedWheelTimeout 节点均持有前驱和后继的引用,这样就可以正向或是逆向遍历整个双向链表了。

​ HashedWheelBucket 中的核心方法:

  • addTimeout() 方法:新增 HashedWheelTimeout 到双向链表的尾部
  • pollTimeout() 方法:移除双向链表中的头结点,并将其返回
  • remove() 方法:从双向链表中移除指定的 HashedWheelTimeout 节点
  • clearTimeouts() 方法:循环调用 pollTimeout() 方法处理整个双向链表,并返回所有未超时或者未被取消的任务
  • expireTimeouts() 方法:遍历双向链表中的全部 HashedWheelTimeout 节点。 在处理到期的定时任务时,会通过 remove() 方法取出,并调用其 expire() 方法执行;对于已取消的任务,通过 remove() 方法取出后直接丢弃;对于未到期的任务,会将 remainingRounds 字段(剩余时钟周期数)减一
HashedWheelTimer

​ HashedWheelTimer 是 Timer 接口的实现,它通过时间轮算法实现了一个定时器。HashedWheelTimer 会根据当前时间轮指针选定对应的槽(HashedWheelBucket),从双向链表的头部开始迭代,对每个定时任务(HashedWheelTimeout)进行计算,属于当前时钟周期则取出运行,不属于则将其剩余的时钟周期数减一操作。

HashedWheelTimer 的核心属性:

  • workerState(volatile int类型):时间轮当前所处状态,可选值有 init、started、shutdown。同时,有相应的 AtomicIntegerFieldUpdater 实现 workerState 的原子修改
  • startTime(long类型):当前时间轮的启动时间,提交到该时间轮的定时任务的 deadline 字段值均以该时间戳为起点进行计算
  • wheel(HashedWheelBucket[]类型):该数组就是时间轮的环形队列,每一个元素都是一个槽。当指定时间轮槽数为 n 时,实际上会取大于且最靠近 n 的 2 的幂次方值
  • timeouts、cancelledTimeouts(LinkedBlockingQueue类型):timeouts 队列用于缓冲外部提交时间轮中的定时任务,cancelledTimeouts 队列用于暂存取消的定时任务HashedWheelTimer 会在处理 HashedWheelBucket 的双向链表之前,先处理这两个队列中的数据
  • tick(long类型):该字段在 HashedWheelTimer$Worker 中,是时间轮的指针,是一个步长为 1 的单调递增计数器
  • mask(int类型):掩码, mask = wheel.length - 1,执行 ticks & mask 便能定位到对应的时钟槽
  • ticksDuration(long类型):时间指针每次加 1 所代表的实际时间,单位为纳秒
  • pendingTimeouts(AtomicLong类型):当前时间轮剩余的定时任务总数
  • workerThread(Thread类型):时间轮内部真正执行定时任务的线程
  • worker(Worker类型):真正执行定时任务的逻辑封装这个 Runnable 对象中

时间轮对外提供了一个 newTimeout() 接口用于提交定时任务,在定时任务进入到 timeouts 队列之前会先调用 start() 方法启动时间轮,其中会完成下面两个关键步骤:

  • (1)确定时间轮的 startTime 字段;
  • (2)启动 workerThread 线程,开始执行 worker 任务;

​ 之后根据 startTime 计算该定时任务的 deadline 字段,最后才能将定时任务封装成 HashedWheelTimeout 并添加到 timeouts 队列

时间轮指针一次转动的全流程分析

  • (1)时间轮指针转动,时间轮周期开始
  • (2)清理用户主动取消的定时任务,这些定时任务在用户取消时,会记录到 cancelledTimeouts 队列中。在每次指针转动的时候,时间轮都会清理该队列
  • (3)将缓存在 timeouts 队列中的定时任务转移到时间轮中对应的槽中
  • (4)根据当前指针定位对应槽,处理该槽位的双向链表中的定时任务
  • (5)检测时间轮的状态。如果时间轮处于运行状态,则循环执行上述步骤,不断执行定时任务。如果时间轮处于停止状态,则执行下面的步骤获取到未被执行的定时任务并加入 unprocessedTimeouts 队列:遍历时间轮中每个槽位,并调用 clearTimeouts() 方法;对 timeouts 队列中未被加入槽中循环调用 poll()
  • (5)最后再次清理 cancelledTimeouts 队列中用户主动取消的定时任务

上述核心逻辑在 HashedWheelTimer$Worker.run() 方法中

3.Dubbo 中如何使用定时任务

​ 在 Dubbo 中,时间轮并不直接用于周期性操作,而是只向时间轮提交执行单次的定时任务,在上一次任务执行完成的时候,调用 newTimeout() 方法再次提交当前任务,这样就会在下个周期执行该任务。即使在任务执行过程中出现了 GC、I/O 阻塞等情况,导致任务延迟或卡住,也不会有同样的任务源源不断地提交进来,导致任务堆积。

​ Dubbo 中对时间轮的应用主要体现在如下两个方面:

  • 失败重试, 例如,Provider 向注册中心进行注册失败时的重试操作,或是 Consumer 向注册中心订阅时的失败重试等
  • 周期性定时任务, 例如,定期发送心跳请求,请求超时的处理,或是网络连接断开后的重连机制

⑤ Zookeeper & Curator

学习目标

  • Dubbo 的 注册中心
  • Zookeeper 核心概念 及 工作原理
    • Zookeeper 集群中各个节点的角色以及职能
    • Zookeeper 中存储数据的逻辑结构、ZNode 节点的相关特性
    • Zookeeper 集群读写机制的核心流程
    • Zookeeper 集群的崩溃恢复流程
  • Zookeeper 客户端(ZkClient VS Curator)
    • 对比其他zk客户端(存在API功能不足、文档可读性差、重试机制不足等问题),Curator的易用性占据很大的优势
    • Curator 的基本操作API应用
    • curator-x-discovery 在 RPC 场景中的应用:实现一个简易版本的RPC注册中心,完成服务注册、服务发现功能
    • curator-recipies 的功能介绍

​ Dubbo Provider 在启动时会将自身的服务信息整理成 URL 注册到注册中心,Dubbo Consumer 在启动时会向注册中心订阅感兴趣的 Provider 信息,之后 Provider 和 Consumer 才能建立连接,进行后续的交互。可见,一个稳定、高效的注册中心对基于 Dubbo 的微服务来说是至关重要的

​ Dubbo 目前支持 ConsuletcdNacosZooKeeperRedis 等多种开源组件作为注册中心,并且在 Dubbo 源码也有相应的接入模块。

image-20241230095014208

​ Dubbo 官方推荐使用 ZooKeeper 作为注册中心,它是在实际生产中最常用的注册中心实现。要与 ZooKeeper 集群进行交互,可以使用 ZooKeeper 原生客户端或是 ZkClient、Apache Curator 等第三方开源客户端。在 dubbo-registry-zookeeper 模块学习是可以了解到Dubbo 底层使用的是 Apache Curator。Apache Curator 是实践中最常用的 ZooKeeper 客户端

1.Zookeeper 核心概念及工作原理

⚽ Zookeeper 核心概念

Apache ZooKeeper 是一个针对分布式系统的、可靠的、可扩展的协调服务,它通常作为统一命名服务、统一配置管理、注册中心(分布式集群管理)、分布式锁服务、Leader 选举服务等角色出现。很多分布式系统都依赖与 ZooKeeper 集群实现分布式系统间的协调调度,例如:Dubbo、HDFS 2.x、HBase、Kafka 等。ZooKeeper 已经成为现代分布式系统的标配

ZooKeeper 集群的核心架构(整体架构)

​ ZooKeeper 本身也是一个分布式应用程序,下图展示了 ZooKeeper 集群的核心架构

image-20241230095617786

  • Client 节点:从业务角度来看,这是分布式应用中的一个节点,通过 ZkClient 或是其他 ZooKeeper 客户端与 ZooKeeper 集群中的一个 Server 实例维持长连接,并定时发送心跳。从 ZooKeeper 集群的角度来看,它是 ZooKeeper 集群的一个客户端,可以主动查询或操作 ZooKeeper 集群中的数据,也可以在某些 ZooKeeper 节点(ZNode)上添加监听。当被监听的 ZNode 节点发生变化时,例如,该 ZNode 节点被删除、新增子节点或是其中数据被修改等,ZooKeeper 集群都会立即通过长连接通知 Client
  • Leader 节点:ZooKeeper 集群的主节点,负责整个 ZooKeeper 集群的写操作,保证集群内事务处理的顺序性。同时,还要负责整个集群中所有 Follower 节点与 Observer 节点的数据同步
  • Follower 节点:ZooKeeper 集群中的从节点,可以接收 Client 读请求并向 Client 返回结果,并不处理写请求,而是转发到 Leader 节点完成写入操作。另外,Follower 节点还会参与 Leader 节点的选举
  • Observer 节点:ZooKeeper 集群中特殊的从节点,不会参与 Leader 节点的选举,其他功能与 Follower 节点相同。引入 Observer 角色的目的是增加 ZooKeeper 集群读操作的吞吐量,如果单纯依靠增加 Follower 节点来提高 ZooKeeper 的读吞吐量,那么有一个很严重的副作用,就是 ZooKeeper 集群的写能力会大大降低,因为 ZooKeeper 写数据时需要 Leader 将写操作同步给半数以上的 Follower 节点。引入 Observer 节点使得 ZooKeeper 集群在写能力不降低的情况下,大大提升了读操作的吞吐量

ZooKeeper 集群存储数据的逻辑结构

​ ZooKeeper 逻辑上是按照树型结构进行数据存储的(如下图),其中的节点称为 ZNode。每个 ZNode 有一个名称标识,即树根到该节点的路径(用 “/” 分隔),ZooKeeper 树中的每个节点都可以拥有子节点,这与文件系统的目录树类似。ZooKeeper 树型存储结构图示如下所示:

ZNode 节点类型有如下四种:

  • 持久节点:持久节点创建后,会一直存在,不会因创建该节点的 Client 会话失效而删除
  • 持久顺序节点:持久顺序节点的基本特性与持久节点一致,创建节点的过程中,ZooKeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名
  • 临时节点:创建临时节点的 ZooKeeper Client 会话失效之后,其创建的临时节点会被 ZooKeeper 集群自动删除。与持久节点的另一点区别是,临时节点下面不能再创建子节点
  • 临时顺序节点:基本特性与临时节点一致,创建节点的过程中,ZooKeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名

​ 在每个 ZNode 中都维护着一个 stat 结构,记录了该 ZNode 的元数据,其中包括版本号、操作控制列表(ACL)、时间戳和数据长度等信息,如下表所示:

image-20241230100459681

​ 除了可以通过 ZooKeeper Client 对 ZNode 进行增删改查等基本操作,还可以注册 Watcher 监听 ZNode 节点、其中的数据以及子节点的变化。一旦监听到变化,则相应的 Watcher 即被触发,相应的 ZooKeeper Client 会立即得到通知。Watcher 有如下特点:

  • 主动推送:Watcher 被触发时,由 ZooKeeper 集群主动将更新推送给客户端,而不需要客户端轮询
  • 一次性:数据变化时,Watcher 只会被触发一次。如果客户端想得到后续更新的通知,必须要在 Watcher 被触发后重新注册一个 Watcher
  • 可见性:如果一个客户端在读请求中附带 Watcher,Watcher 被触发的同时再次读取数据,客户端在得到 Watcher 消息之前肯定不可能看到更新后的数据。换句话说,更新通知先于更新结果
  • 顺序性:如果多个更新触发了多个 Watcher ,那 Watcher 被触发的顺序与更新顺序一致

⚽ 消息广播流程

​ ZooKeeper 集群中三种角色的节点(Leader、Follower 和 Observer)都可以处理 Client 的读请求,因为每个节点都保存了相同的数据副本,直接进行读取即可返回给 Client。

​ 对于写请求,如果 Client 连接的是 Follower 节点(或 Observer 节点),则在 Follower 节点(或 Observer 节点)收到写请求将会被转发到 Leader 节点。下面是 Leader 处理写请求的核心流程:

① Leader 节点接收写请求后,会为写请求赋予一个全局唯一的 zxid(64 位自增 id),通过 zxid 的大小比较就可以实现写操作的顺序一致性

② Leader 通过先进先出队列(会给每个 Follower 节点都创建一个队列,保证发送的顺序性),将带有 zxid 的消息作为一个 proposal(提案)分发给所有 Follower 节点

③ 当 Follower 节点接收到 proposal 之后,会先将 proposal 写到本地事务日志,写事务成功后再向 Leader 节点回一个 ACK 响应

④ 当 Leader 节点接收到过半 Follower 的 ACK 响应之后,Leader 节点就向所有 Follower 节点发送 COMMIT 命令,并在本地执行提交

⑤ 当 Follower 收到消息的 COMMIT 命令之后也会提交操作,写操作到此完成

⑥ 最后,Follower 节点会返回 Client 写请求相应的响应

​ 写操作核心流程图示分析如下:

image-20241230100731478

⚽ 崩溃恢复

​ 上面写请求处理流程中,如果发生 Leader 节点宕机,整个 ZooKeeper 集群可能处于如下两种状态:

​ (1)当 Leader 节点收到半数以上 Follower 节点的 ACK 响应之后,会向各个 Follower 节点广播 COMMIT 命令,同时也会在本地执行 COMMIT 并向连接的客户端进行响应。如果在各个 Follower 收到 COMMIT 命令前 Leader 就宕机了,就会导致剩下的服务器没法执行这条消息

​ (2)当 Leader 节点生成 proposal 之后就宕机了,而其他 Follower 并没有收到此 proposal(或者只有一小部分 Follower 节点收到了这条 proposal),那么此次写操作就是执行失败的

​ 在 Leader 宕机后,ZooKeeper 会进入崩溃恢复模式,重新进行 Leader 节点的选举。ZooKeeper 对新 Leader 有如下两个要求:

​ (1)对于原 Leader 已经提交了的 proposal,新 Leader 必须能够广播并提交,这样就需要选择拥有最大 zxid 值的节点作为 Leader;

​ (2)对于原 Leader 还未广播或只部分广播成功的 proposal,新 Leader 能够通知原 Leader 和已经同步了的 Follower 删除,从而保证集群数据的一致性;

​ ZooKeeper 选主使用的是 ZAB 协议,此处通过一个示例简单介绍 ZooKeeper 选主的大致流程。

​ 比如,当前集群中有 5 个 ZooKeeper 节点构成,sid 分别为 1、2、3、4 和 5,zxid 分别为 10、10、9、9 和 8,此时,sid 为 1 的节点是 Leader 节点。实际上,zxid 包含了 epoch(高 32 位)和自增计数器(低 32 位) 两部分。其中,epoch 是“纪元”的意思,标识当前 Leader 周期,每次选举时 epoch 部分都会递增,这就防止了网络隔离之后,上一周期的旧 Leader 重新连入集群造成不必要的重新选举。该示例中假设各个节点的 epoch 都相同。

​ 某一时刻,节点 1 的服务器宕机了,ZooKeeper 集群开始进行选主。由于无法检测到集群中其他节点的状态信息(处于 Looking 状态),因此每个节点都将自己作为被选举的对象来进行投票。于是 sid 为 2、3、4、5 的节点,投票情况分别为(2,10)、(3,9)、(4,9)、(5,8),同时各个节点也会接收到来自其他节点的投票(这里以(sid, zxid)的形式来标识一次投票信息)

  • 对于节点 2 来说,接收到(3,9)、(4,9)、(5,8)的投票,对比后发现自己的 zxid 最大,因此不需要做任何投票变更
  • 对于节点 3 来说,接收到(2,10)、(4,9)、(5,8)的投票,对比后由于 2 的 zxid 比自己的 zxid 要大,因此需要更改投票,改投(2,10),并将改投后的票发给其他节点
  • 对于节点 4 来说,接收到(2,10)、(3,9)、(5,8)的投票,对比后由于 2 的 zxid 比自己的 zxid 要大,因此需要更改投票,改投(2,10),并将改投后的票发给其他节点
  • 对于节点 5 来说,也是一样,最终改投(2,10)

​ 经过第二轮投票后,集群中的每个节点都会再次收到其他机器的投票,然后开始统计投票,如果有过半的节点投了同一个节点,则该节点成为新的 Leader,这里显然节点 2 成了新 Leader节点

​ Leader 节点此时会将 epoch 值加 1,并将新生成的 epoch 分发给各个 Follower 节点。各个 Follower 节点收到全新的 epoch 后,返回 ACK 给 Leader 节点,并带上各自最大的 zxid 和历史事务日志信息。Leader 选出最大的 zxid,并更新自身历史事务日志,示例中的节点 2 无须更新。Leader 节点紧接着会将最新的事务日志同步给集群中所有的 Follower 节点,只有当半数 Follower 同步成功,这个准 Leader 节点才能成为正式的 Leader 节点并开始工作

2.Zookeeper 客户端的选择

⚽ Zookeeper 客户端介绍

​ 在实际工作中,直接使用客户端与 ZooKeeper 进行交互的次数比深入 ZooKeeper 底层进行扩展和二次开发的次数要多得多。从 ZooKeeper 架构的角度看,使用 Dubbo 的业务节点也只是一个 ZooKeeper 客户端罢了。

​ ZooKeeper 官方提供的客户端支持了一些基本操作,例如,创建会话、创建节点、读取节点、更新数据、删除节点和检查节点是否存在等,但在实际开发中只有这些简单功能是根本不够的。而且,ZooKeeper 本身的一些 API 也存在不足,例如:

  • ZooKeeper 的 Watcher 是一次性的,每次触发之后都需要重新进行注册
  • 会话超时之后,没有实现自动重连的机制
  • ZooKeeper 提供了非常详细的异常,异常处理显得非常烦琐,对开发新手来说,非常不友好
  • 只提供了简单的 byte[] 数组的接口,没有提供基本类型以及对象级别的序列化
  • 创建节点时,如果节点存在抛出异常,需要自行检查节点是否存在
  • 删除节点就无法实现级联删除

常见的第三方开源 ZooKeeper 客户端有 ZkClient 和 Apache Curator

​ ZkClient 是在 ZooKeeper 原生 API 接口的基础上进行了包装,虽然 ZkClient 解决了 ZooKeeper 原生 API 接口的很多问题,提供了非常简洁的 API 接口,实现了会话超时自动重连的机制,解决了 Watcher 反复注册等问题,但其缺陷也非常明显。例如,文档不全、重试机制难用、异常全部转换成了 RuntimeException、没有足够的参考示例等。可见,一个简单易用、高效可靠的 ZooKeeper 客户端是多么重要。

⚽ Apache Curator

​ **Apache Curator 是 Apache 基金会提供的一款 ZooKeeper 客户端,它提供了一套易用性和可读性非常强的 Fluent 风格的客户端 API ,可以帮助我们快速搭建稳定可靠的 ZooKeeper 客户端程序。**Curator 功能梳理如下:

名称描述
curator-frameworkZookeeper-API 的高层封装,简化Zookeeper客户端编程,添加了Zookeeper连接管理、重试机制、重复注册Watcher等功能
curator-recipesZookeeper典型应用场景的实现(基于Curator Framework),例如Leader选举、分布式锁、Barrier分布式队列等
curator-clientZookeeper Client的封装,取代原生Zookeeper客户端,提供一些非常有用的客户端特性
curator-x-discovery基于curator-framework上构建的服务发现实现
curator-x-discoveryserver可以和curator-x-discovery一起使用的RESTful服务器
curator-examples各种使用curator特性的案例
(1)基本操作

版本兼容性处理

​ 一开始使用zookeeper3.4.14版本和curator4.0.1进行搭配使用的时候报错,考虑版本兼容问题需注意zookeeper与curator的版本选择

11:09:34.615 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x10000c08f260001, packet:: clientPath:/zookeeper/config serverPath:/zookeeper/config finished:false header:: 1,4  replyHeader:: 1,177,-101  request:: '/zookeeper/config,T  response::  
11:09:34.616 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x10000c08f260001, packet:: clientPath:/zookeeper/config serverPath:/zookeeper/config finished:false header:: 2,4  replyHeader:: 2,177,-101  request:: '/zookeeper/config,T  response::  
11:09:34.629 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x10000c08f260001, packet:: clientPath:null serverPath:null finished:false header:: 3,15  replyHeader:: 3,-1,-6  request:: '/user,#74657374,v{s{31,s{'world,'anyone}}},0  response::  
11:09:34.631 [main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Unable to read additional data from server sessionid 0x10000c08f260001, likely server has closed socket, closing socket connection and attempting reconnect
Exception in thread "main" org.apache.zookeeper.KeeperException$UnimplementedException: KeeperErrorCode = Unimplemented for /user
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:103)
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
	at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:1525)
	at org.apache.curator.framework.imps.CreateBuilderImpl$17.call(CreateBuilderImpl.java:1181)

​ 版本兼容参考

<dependencyManagement>
    <dependencies>
        <!-- zookeeper -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.14</version>
        </dependency>
    </dependencies>
</dependencyManagement>

CRUD 基本操作示例

/**
 * 基本操作
 */
public class ZookeeperCuratorDemo1 {
    /**
     * Curator中常用的API示例
     */
    public static void main(String[] args) throws Exception {

        // Zookeeper集群地址,多个节点地址可以用逗号分隔
        String zkAddress = "127.0.0.1:2181";

        // 重试策略:如果连接不上ZooKeeper集群,会重试三次,重试间隔会递增
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

        // 创建Curator Client并启动,启动成功之后,就可以与Zookeeper进行交互
        CuratorFramework client = CuratorFrameworkFactory.newClient(zkAddress, retryPolicy);
        client.start();

        // ① 创建一个名为"user"的持久节点,其中会存储一个test字符串(create()方法创建ZNode,可以调用额外方法来设置节点类型、添加Watcher)
        String path = client.create().withMode(CreateMode.PERSISTENT).forPath("/user", "test".getBytes());
        System.out.println(path); // 输出:/user

        // ② checkExists()方法可以检查一个节点是否存在
        Stat stat = client.checkExists().forPath("/user");
        System.out.println(stat != null); // 输出:true,返回的Stat不为null,即表示节点存在

        // ③ getData()方法可以获取一个节点中的数据
        byte[] data = client.getData().forPath("/user");
        System.out.println(new String(data));  // 输出:test

        // ④ setData()方法可以设置一个节点中的数据
        stat = client.setData().forPath("/user", "data".getBytes());
        data = client.getData().forPath("/user");
        System.out.println(new String(data)); // 输出:data

        // ⑤ 在/user节点下,创建多个临时顺序节点
        for (int i = 0; i < 3; i++) {
            client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/user/child-");
        }

        // ⑥ 获取所有子节点
        List<String> children = client.getChildren().forPath("/user");
        System.out.println(children); // 输出:[child-0000000002, child-0000000001, child-0000000000]

        // ⑦ delete()方法可以删除指定节点,deletingChildrenIfNeeded()方法 会级联删除子节点
        client.delete().deletingChildrenIfNeeded().forPath("/user");
    }
}
(2)Background (异步处理):BackgroundCallback CuratorListener

​ 上述的创建、删除、更新、读取等方法都是同步的,Curator 提供异步接口,引入了BackgroundCallback 这个回调接口以及 CuratorListener 这个监听器,用于处理 Background 调用之后服务端返回的结果信息。BackgroundCallback 接口和 CuratorListener 监听器中接收一个 CuratorEvent 的参数,里面包含事件类型、响应码、节点路径等详细信息。通过一个示例说明 BackgroundCallback 接口以及 CuratorListener 监听器的基本使用

/**
 * 基本操作
 */
public class ZookeeperCuratorDemo2 {
    /**
     * Background
     */
    public static void main(String[] args) throws Exception {

        // Zookeeper集群地址,多个节点地址可以用逗号分隔
        String zkAddress = "127.0.0.1:2181";

        // 重试策略,如果连接不上ZooKeeper集群,会重试三次,重试间隔会递增
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

        // 创建Curator Client并启动,启动成功之后,就可以与Zookeeper进行交互
        CuratorFramework client = CuratorFrameworkFactory.newClient(zkAddress, retryPolicy);
        client.start();

        // 添加CuratorListener监听器,针对不同的事件进行处理
        client.getCuratorListenable().addListener(new CuratorListener() {
            public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                switch (event.getType()) {
                    case CREATE:
                        System.out.println("CREATE:" + event.getPath());
                        break;
                    case DELETE:
                        System.out.println("DELETE:" + event.getPath());
                        break;
                    case EXISTS:
                        System.out.println("EXISTS:" + event.getPath());
                        break;
                    case GET_DATA:
                        System.out.println("GET_DATA:" + event.getPath() + "," + new String(event.getData()));
                        break;
                    case SET_DATA:
                        System.out.println("SET_DATA:" + new String(event.getData()));
                        break;
                    case CHILDREN:
                        System.out.println("CHILDREN:" + event.getPath());
                        break;
                    default:
                }
            }
        });

        /**
         * Background 操作,转化为后台操作:下面示例操作都添加了inBackground()方法,转换为后台操作
         */
        client.create().withMode(CreateMode.PERSISTENT).inBackground().forPath("/user", "test".getBytes());
        client.checkExists().inBackground().forPath("/user");
        client.setData().inBackground().forPath("/user", "setData-Test".getBytes());
        client.getData().inBackground().forPath("/user");
        for (int i = 0; i < 3; i++) {
            client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground().forPath("/user/child-");
        }
        client.getChildren().inBackground().forPath("/user");

        // 添加BackgroundCallback
        client.getChildren().inBackground(new BackgroundCallback() {
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("in background:" + event.getType() + "," + event.getPath());
            }
        }).forPath("/user");
        client.delete().deletingChildrenIfNeeded().inBackground().forPath("/user");
        System.in.read();
    }
}

/**
 * output:
 * CREATE:/user
 * EXISTS:/user
 * GET_DATA:/user,setData-Test
 * CREATE:/user/child-
 * CREATE:/user/child-
 * CREATE:/user/child-
 * CHILDREN:/user
 * in background:CHILDREN,/user
 * DELETE:/user
 */
(3)连接状态监听ConnectionStateListener

​ 除了基础的数据操作,Curator 还提供了监听连接状态的监听器——ConnectionStateListener,它主要是处理 Curator 客户端和 ZooKeeper 服务器间连接的异常情况,例如, 短暂或者长时间断开连接。

​ 短暂断开连接时,ZooKeeper 客户端会检测到与服务端的连接已经断开,但是服务端维护的客户端 Session 尚未过期,之后客户端和服务端重新建立了连接;当客户端重新连接后,由于 Session 没有过期,ZooKeeper 能够保证连接恢复后保持正常服务

​ 而长时间断开连接时,Session 已过期,与先前 Session 相关的 Watcher 和临时节点都会丢失。当 Curator 重新创建了与 ZooKeeper 的连接时,会获取到 Session 过期的相关异常,Curator 会销毁老 Session,并且创建一个新的 Session。由于老 Session 关联的数据不存在了,在 ConnectionStateListener 监听到 LOST 事件时,就可以依靠本地存储的数据恢复 Session

此处 Session 指的是 ZooKeeper 服务器与客户端的会话。客户端启动的时候会与服务器建立一个 TCP 连接,从第一次连接建立开始,客户端会话的生命周期也开始了。客户端能够通过心跳检测与服务器保持有效的会话,也能够向 ZooKeeper 服务器发送请求并接受响应,同时还能够通过该连接接收来自服务器的 Watch 事件通知。

​ 可以设置客户端会话的超时时间(sessionTimeout),当服务器压力太大、网络故障或是客户端主动断开连接等原因导致连接断开时,只要客户端在 sessionTimeout 规定的时间内能够重新连接到 ZooKeeper 集群中任意一个实例,那么之前创建的会话仍然有效。ZooKeeper 通过 sessionID 唯一标识 Session,所以在 ZooKeeper 集群中,sessionID 需要保证全局唯一。 由于 ZooKeeper 会将 Session 信息存放到硬盘中,即使节点重启,之前未过期的 Session 仍然会存在。

/**
 * 基本操作
 */
public class ZookeeperCuratorDemo3 {
    /**
     * ConnectionStateListener 连接状态监听
     */
    public static void main(String[] args) throws Exception {

        // Zookeeper集群地址,多个节点地址可以用逗号分隔
        String zkAddress = "127.0.0.1:2181";

        // 重试策略,如果连接不上ZooKeeper集群,会重试三次,重试间隔会递增
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

        // 创建Curator Client并启动,启动成功之后,就可以与Zookeeper进行交互
        CuratorFramework client = CuratorFrameworkFactory.newClient(zkAddress, retryPolicy);
        client.start();

        // 添加ConnectionStateListener监听器
        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                // 此处可以针对不同的连接状态进行特殊的处理
                switch (newState) {
                    case CONNECTED:
                        // 第一次成功连接到ZooKeeper之后会进入该状态,对于每个CuratorFramework对象,此状态仅出现一次
                        break;
                    case SUSPENDED:
                        //   ZooKeeper的连接丢失
                        break;
                    case RECONNECTED:
                        // 丢失的连接被重新建立
                        break;
                    case LOST:
                        // 当Curator认为会话已经过期时,则进入此状态
                        break;
                    case READ_ONLY:
                        // 连接进入只读模式
                        break;
                }
            }
        });
    }
}
(4)Wathcer(监听机制)

​ Watcher 监听机制是 ZooKeeper 中非常重要的特性,可以监听某个节点上发生的特定事件,例如,监听节点数据变更、节点删除、子节点状态变更等事件。当相应事件发生时,ZooKeeper 会产生一个 Watcher 事件,并且发送到客户端。通过 Watcher 机制,就可以使用 ZooKeeper 实现分布式锁、集群管理等功能。

​ 在 Curator 客户端中,可以使用 usingWatcher() 方法添加 Watcher,前面示例中能够添加 Watcher 的有 checkExists()、getData()以及 getChildren() 三个方法


​ 启动上述main示例,随后通过zookeeper命令行客户端,在user下先后添加两个子节点。可以看到控制台输出NodeChildrenChanged,/user,是因为通过 usingWatcher() 方法添加的 CuratorWatcher 只会触发一次,触发完毕后就会销毁。checkExists() 方法、getData() 方法通过 usingWatcher() 方法添加的 Watcher 也是一样的原理,只不过监听的事件不同

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] delete /user
[zk: localhost:2181(CONNECTED) 1] ls /
[dubbo, zookeeper]
[zk: localhost:2181(CONNECTED) 2] create /user/test1 hello-test1
Created /user/test1
[zk: localhost:2181(CONNECTED) 3] create /user/test2 hello-test2
Created /user/test2
[zk: localhost:2181(CONNECTED) 4]

image-20241230134715611

​ 直接通过注册 Watcher 进行事件监听不是特别方便,需要自己反复注册 Watcher。Apache Curator 引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。Cache 是 Curator 中对事件监听的包装,其对事件的监听其实可以近似看作是一个本地缓存视图和远程 ZooKeeper 视图的对比过程。同时,Curator 能够自动为开发人员处理反复注册监听,从而大大简化了代码的复杂程度。实践中常用的 Cache 有三大类:

  • NodeCache:对一个节点进行监听,监听事件包括指定节点的增删改操作。注意,NodeCache 不仅可以监听数据节点的内容变更,也能监听指定节点是否存在,如果原本节点不存在,那么 Cache 就会在节点被创建后触发 NodeCacheListener,删除操作亦然
  • PathChildrenCache:对指定节点的一级子节点进行监听,监听事件包括子节点的增删改操作,但是不对该节点的操作监听
  • TreeCache:综合 NodeCache 和 PathChildrenCache 的功能,是对指定节点以及其子节点进行监听,同时还可以设置监听的深度
/**
 * 基本操作
 */
public class ZookeeperCuratorDemo5 {
    /**
     * Watcher 监听机制
     */
    public static void main(String[] args) throws Exception {

        // Zookeeper集群地址,多个节点地址可以用逗号分隔
        String zkAddress = "127.0.0.1:2181";

        // 重试策略,如果连接不上ZooKeeper集群,会重试三次,重试间隔会递增
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

        // 创建Curator Client并启动,启动成功之后,就可以与Zookeeper进行交互
        CuratorFramework client = CuratorFrameworkFactory.newClient(zkAddress, retryPolicy);
        client.start();

        // 创建NodeCache,监听的是"/user"这个节点
        NodeCache nodeCache = new NodeCache(client, "/user");

        // start()方法有个boolean类型的参数(默认是false)。如果设置为true,那么NodeCache在第一次启动的时候就会立刻从ZooKeeper上读取对应节点的数据内容,并保存在Cache中。
        nodeCache.start(true);
        if (nodeCache.getCurrentData() != null) {
            System.out.println("NodeCache节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
        } else {
            System.out.println("NodeCache节点数据为空");
        }

        // 添加监听器
        nodeCache.getListenable().addListener(() -> {
            String data = new String(nodeCache.getCurrentData().getData());
            System.out.println("NodeCache节点路径:" + nodeCache.getCurrentData().getPath() + ",节点数据为:" + data);
        });

        // 创建PathChildrenCache实例,监听的是"user"这个节点
        PathChildrenCache childrenCache = new PathChildrenCache(client, "/user", true);
        /**
         * StartMode指定的初始化的模式:
         * ① NORMAL:普通异步初始化
         * ② BUILD_INITIAL_CACHE:同步初始化
         * ③ POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
         */
        childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        // childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        // childrenCache.start(PathChildrenCache.StartMode.NORMAL);
        List<ChildData> children = childrenCache.getCurrentData();
        System.out.println("获取子节点列表:");
        // 如果是BUILD_INITIAL_CACHE可以获取这个数据,如果不是就不行
        children.forEach(childData -> {
            System.out.println(new String(childData.getData()));
        });

        childrenCache.getListenable().addListener(((client1, event) -> {
            System.out.println(LocalDateTime.now() + "  " + event.getType());
            if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
                System.out.println("PathChildrenCache:子节点初始化成功...");
            } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
                String path = event.getData().getPath();
                System.out.println("PathChildrenCache添加子节点:" + event.getData().getPath());
                System.out.println("PathChildrenCache子节点数据:" + new String(event.getData().getData()));
            } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                System.out.println("PathChildrenCache删除子节点:" + event.getData().getPath());
            } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                System.out.println("PathChildrenCache修改子节点路径:" + event.getData().getPath());
                System.out.println("PathChildrenCache修改子节点数据:" + new String(event.getData().getData()));
            }
        }));

        // 创建TreeCache实例监听"user"节点
        TreeCache cache = TreeCache.newBuilder(client, "/user").setCacheData(false).build();
        cache.getListenable().addListener((c, event) -> {
            if (event.getData() != null) {
                System.out.println("TreeCache,type=" + event.getType() + " path=" + event.getData().getPath());
            } else {
                System.out.println("TreeCache,type=" + event.getType());
            }
        });
        cache.start();

        System.in.read();
    }
}

① 初始化测试

​ 集群测试初始化设定已存在/user/test1/user/test2两个节点(如果没有可通过客户端命令行创建create /user/test1 hello-test1create /user/test2 hello-test2),启动测试代码控制台会输出下述信息(简化提取版本)

# NodeCache 相关输出
NodeCache节点初始化数据为:test

# PathChildrenCache 相关输出
hello-test1
hello-test2

# TreeCache 相关输出
TreeCache,type=NODE_ADDED path=/user
TreeCache,type=NODE_ADDED path=/user/test1
TreeCache,type=NODE_ADDED path=/user/test2
TreeCache,type=INITIALIZED

② 添加节点数据

​ 随后在Zookeeper命令行客户端中更新/user节点中的数据set /user userData

[zk: localhost:2181(CONNECTED) 27] set /user userData
cZxid = 0xeb
ctime = Mon Dec 30 13:46:13 CST 2024
mZxid = 0xfc
mtime = Mon Dec 30 14:06:07 CST 2024
pZxid = 0xfa
cversion = 10
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 2

​ 此时控制台同步输出信息,体现监听过程

TreeCache,type=NODE_UPDATED path=/user
NodeCache节点路径:/user,节点数据为:userData

③ 子节点修改操作

​ 创建/user/test3子节点:create /user/test3 hello-test3,控制台同步监听输出信息提示

TreeCache,type=NODE_ADDED path=/user/test3

2024-12-30T14:09:54.910  CHILD_ADDED
PathChildrenCache添加子节点:/user/test3
PathChildrenCache子节点数据:hello-test3

​ 删除上述创建的test3子节点:delete /user/test3,控制台同步监听输出信息提示

TreeCache,type=NODE_REMOVED path=/user/test3

2024-12-30T14:12:56.990  CHILD_REMOVED
PathChildrenCache删除子节点:/user/test3
(5)curator-x-discover 扩展库

​ 为了避免 curator-framework 包过于膨胀,Curator 将很多其他解决方案都拆出来并作为单独的一个包,例如:curator-recipes、curator-x-discovery、curator-x-rpc 等。

​ 在后面会使用到 curator-x-discovery 来完成一个简易 RPC 框架的注册中心模块。curator-x-discovery 扩展包是一个服务发现的解决方案。在 ZooKeeper 中,可以使用临时节点实现一个服务注册机制。当服务启动后在 ZooKeeper 的指定 Path 下创建临时节点,服务断掉与 ZooKeeper 的会话之后,其相应的临时节点就会被删除。这个 curator-x-discovery 扩展包抽象了这种功能,并提供了一套简单的 API 来实现服务发现机制。curator-x-discovery 扩展包的核心概念如下:

  • ServiceInstance:curator-x-discovery 扩展包对服务实例的抽象,由 name、id、address、port 以及一个可选的 payload 属性构成。其存储在 ZooKeeper 中的方式如下图展示的这样
base path
	| __________ service A name
					| __________instance 1 id --> (serialized ServiceInstance)
					| __________instance 2 id --> (serialized ServiceInstance)
					| __________ ...
	| __________ service B name
					| __________instance 1 id --> (serialized ServiceInstance)
					| __________instance 2 id --> (serialized ServiceInstance)
					| __________ ...
	| __________ ......
  • ServiceProvider: curator-x-discovery 扩展包的核心组件之一,提供了多种不同策略的服务发现方式,具体策略有轮询调度、随机和黏性(总是选择相同的一个)。得到 ServiceProvider 对象之后,可以调用其 getInstance() 方法,按照指定策略获取 ServiceInstance 对象(即发现可用服务实例);还可以调用 getAllInstances() 方法,获取所有 ServiceInstance 对象(即获取全部可用服务实例)
  • ServiceDiscovery: curator-x-discovery 扩展包的入口类。开始必须调用 start() 方法,当使用完成应该调用 close() 方法进行销毁
  • ServiceCache:如果程序中会频繁地查询 ServiceInstance 对象,我们可以添加 ServiceCache 缓存,ServiceCache 会在内存中缓存 ServiceInstance 实例的列表,并且添加相应的 Watcher 来同步更新缓存。查询 ServiceCache 的方式也是 getInstances() 方法。另外,ServiceCache 上还可以添加 Listener 来监听缓存变化

Springboot 整合 curator 测试:实现简易版本的RPC注册中心(服务注册、服务发现)

① pom.xml 配置:在curator-recipes基础上加入curator-x-discovery-server

<!-- 引入zookeeper客户端zookeeper-curator 相关依赖 -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.13.0</version> <!-- 版本兼容性:与zookeeper3.4.14兼容 -->
</dependency>

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-discovery-server</artifactId>
    <version>2.13.0</version>
</dependency>

② 基于springboot项目整合配置:application.properties,自定义zk配置类ZooKeeperConfig

/**
 * 连接 zk server的配置信息,在application.properties进行配置
 */
//@Component  不需要配置:@Component,在@Configuration已经包括了,无需多此一举
@Data
// 标志这是一个配置文件
@Configuration
// 配置文件信息前缀
@ConfigurationProperties(prefix = "zookeeper")
public class ZooKeeperConfig {
    private String hostPort; // zookeeper 集群地址
    private String path; // 管理的zk路径
    private String namespace; // 命名空间
    private int sessionTimeoutMs = 60000;
    private int connectionTimeoutMs = 15000;
    private int baseSleepTimeMs = 1000; // 重试间隔
    private int maxRetries = 3; // 最大重试次数
}
#服务器地址
zookeeper.hostPort=127.0.0.1:2181
#管理的zk路径(注册根节点)
zookeeper.path=rpc-user
#命名空间,被称为ZNode
zookeeper.namespace=curator
#会话超时时间,默认值为:60000
zookeeper.sessionTimeoutMs=60000
#连接超时时间,默认值为:15000
zookeeper.connectionTimeoutMs=15000
#重试之间等待的初始时间
zookeeper.baseSleepTimeMs=1000
#重试的最大次数
zookeeper.maxRetries=3

③ 注册中心定义(ZookeeperCuratorDiscovery:服务注册、服务发现,此处自定义服务信息ServiceInfo用于定义要上传到ZK注册中心的节点信息)

/**
 * 自定义服务信息
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ServerInfo {
    public String host; // host
    public int port; // 端口号
    public String descr; // 描述
}
/**
 * zookeeper-curator-x-discover 示例
 */
public class ZookeeperCuratorDiscovery {

    private ServiceDiscovery<ServerInfo> serviceDiscovery;

    private ServiceCache<ServerInfo> serviceCache;

    private CuratorFramework client;

    private String root;

    // 此处JsonInstanceSerializer是将ServerInfo序列化成Json
    private InstanceSerializer serializer = new JsonInstanceSerializer<>(ServerInfo.class);

    // 构造器
    public ZookeeperCuratorDiscovery(ZooKeeperConfig config) throws Exception {
        // 初始化root
        this.root = config.getPath();

        // ① 创建并启动 Curator客户端
        client = CuratorFrameworkFactory.newClient(config.getHostPort(), new ExponentialBackoffRetry(config.getBaseSleepTimeMs(), config.getMaxRetries()));
        client.start(); // 启动Curator客户端
        client.blockUntilConnected();  // 阻塞当前线程,等待连接成功

        // ② 创建并启动 ServiceDiscovery
        serviceDiscovery = ServiceDiscoveryBuilder
                .builder(ServerInfo.class)
                .client(client) // 依赖Curator客户端
                .basePath(root) // 管理的Zk路径
                .watchInstances(true) // 当ServiceInstance加载
                .serializer(serializer) // 序列化
                .build();
        serviceDiscovery.start(); // 启动ServiceDiscovery

        // ③ 创建ServiceCache,监Zookeeper相应节点的变化,也方便后续的读取
        serviceCache = serviceDiscovery.serviceCacheBuilder()
                .name(root)
                .build();
        serviceCache.start(); // 启动ServiceCache
    }

    // 服务注册:将ServiceInfo对象注册到ZK
    public void registerRemote(ServerInfo serverInfo) throws Exception {
        // 将ServerInfo对象转换成ServiceInstance对象
        ServiceInstance<ServerInfo> thisInstance = ServiceInstance.<ServerInfo>builder()
                .name(root)
                .id(UUID.randomUUID().toString()) // 随机生成的UUID
                .address(serverInfo.getHost()) // host
                .port(serverInfo.getPort()) // port
                .payload(serverInfo) // payload
                .build();

        // 将ServiceInstance写入到Zookeeper中
        serviceDiscovery.registerService(thisInstance);
    }

    // 服务发现:查找注册到ZK的服务节点列表
    public List<ServerInfo> queryRemoteNodes() {
        List<ServerInfo> ServerInfoDetails = new ArrayList<>();
        // 查询 ServiceCache 获取全部的 ServiceInstance 对象
        List<ServiceInstance<ServerInfo>> serviceInstances = serviceCache.getInstances();
        serviceInstances.forEach(serviceInstance -> {
            // 从每个ServiceInstance对象的playload字段中反序列化得到ServerInfo实例
            ServerInfo instance = serviceInstance.getPayload();
            ServerInfoDetails.add(instance);
        });
        return ServerInfoDetails;
    }
}

④ 服务启动测试ZookeeperCuratorDemoApplication(测试服务注册、服务发现功能)

@SpringBootApplication
public class ZookeeperCuratorDemoApplication {

    @Autowired
    ZooKeeperConfig config;

    @Bean
    public ZookeeperCuratorDiscovery setZookeeperCuratorDiscovery() throws Exception {
        ZookeeperCuratorDiscovery zookeeperCuratorDiscovery = new ZookeeperCuratorDiscovery(config);
        return zookeeperCuratorDiscovery;
    }

    public static void main(String[] args) throws Exception {
        // SpringApplication.run(ZookeeperCuratorDemoApplication.class, args);

        ConfigurableApplicationContext ctx = SpringApplication.run(ZookeeperCuratorDemoApplication.class, args);

        // 注册服务
        ZookeeperCuratorDiscovery zcd = ctx.getBean(ZookeeperCuratorDiscovery.class);
        zcd.registerRemote(new ServerInfo("127.0.0.1",8081,"本地服务测试"));

        // 服务发现
        while(true){
            List<ServerInfo> serverInfoList =  zcd.queryRemoteNodes();
            for (ServerInfo serverInfo : serverInfoList) {
                System.out.println(serverInfo.host + ":" + serverInfo.port + "/" + serverInfo.descr);
                Thread.sleep(3000);
            }
        }
    }

}

// 服务启动控制台日志:output
127.0.0.1:8081/本地服务测试
(6)curator-recipes 简介

​ Recipes 是 Curator 对常见分布式场景的解决方案

  • Queues:提供了多种的分布式队列解决方法,比如:权重队列、延迟队列等。在生产环境中,很少将 ZooKeeper 用作分布式队列,只适合在压力非常小的情况下,才使用该解决方案,所以建议要适度使用
  • Counters:全局计数器是分布式系统中很常用的工具,curator-recipes 提供了 SharedCount、DistributedAtomicLong 等组件,帮助开发人员实现分布式计数器功能
  • Locks:java.util.concurrent.locks 中提供的各种锁是语言基础,在微服务架构中,分布式锁也是一项非常基础的服务组件,curator-recipes 提供了多种基于 ZooKeeper 实现的分布式锁,满足日常工作中对分布式锁的需求
  • Barries:curator-recipes 提供的分布式栅栏可以实现多个服务之间协同工作,具体实现有 DistributedBarrier 和 DistributedDoubleBarrier
  • Elections:实现的主要功能是在多个参与者中选举出 Leader,然后由 Leader 节点作为操作调度、任务监控或是队列消费的执行者。curator-recipes 给出的实现是 LeaderLatch

⑥ 代理模式设计与实现

学习目标

  • 代理模式核心概念(静态代理、动态代理)
  • 设计模式之代理模式(静态代理实现):为每个接口实现类创建代理,屏蔽被代理对象的调用、实现方法增强
  • 动态代理
    • JDK 动态代理:基于接口的动态代理(核心InvocationHandler),具备局限性(由于Java单继承设定,结合动态代理原理(生成的代理类)限定了其只能代理接口)
    • CGLib 动态代理:基于的动态代理(核心MethodInterceptorEnhancer)(与JDK动态代理形成互补,应用在许多场景中)
    • Javassist 动态代理:基于类的动态代理(基于Java语言提供API实现动态代理,Dubbo默认的代理生成方式

1.代理模式

​ 动态代理机制在 Java 中有着广泛的应用,例如,Spring AOP、MyBatis、Hibernate 等常用的开源框架,都使用到了动态代理机制。当然,Dubbo 中也使用到了动态代理,在后面开发简易版 RPC 框架的时候,还会参考 Dubbo 使用动态代理机制来屏蔽底层的网络传输以及服务发现的相关实现。

​ 针对RPC中代理模式的实现,先从代理模式基础概念切入,掌握 JDK 动态代理的使用以及底层实现原理 以及 JDK 动态代理的一些局限性,扩展分析基于字节码生成的动态代理机制

⚽ 设计模式之静态代理模式

代理模式核心

  • Subject:抽象主题,业务逻辑接口定义
  • RealSubject:真实主题,被代理对象(接口实现)
  • Proxy:代理主题,代理对象(隐藏真实主题的调用,提供方法的增强)

image-20241230163802915

// 代理接口定义
public interface Subject {
    public void operation();
}

// 真正的执行对象
public class RealSubject implements Subject {
    @Override
    public void operation() {
        System.out.println("i am real subject ......");
    }
}

/**
 * 自定义代理类
 */
public class Proxy implements Subject {

    // 定义被代理对象
    public RealSubject realSubject = new RealSubject();

    @Override
    public void operation() {
        System.out.println("proxy start ......");
        realSubject.operation();
        System.out.println("proxy end ......");
    }

}

/**
 * 设计模式之代理模式测试
 */
public class ProxyDemo {

    public static void main(String[] args) {
        // Proxy proxy = new Proxy();
        Subject proxy = new Proxy();
        proxy.operation();
    }

}

​ 在程序中不会直接通过RealSubject调用接口方法,而是通过提供的代理对象Proxy来完成接口调用和方法增强,代理的作用体现在两个点:

  • ① 隐藏被代理对象的调用(对于客户端来说不需要知道要通过哪个被代理对象执行方法,而是关注要使用哪个代理对象)
  • ② 实现代理方法的增强(proxy 不仅正常指向业务逻辑,还可实现方法的增强,补充业务逻辑(做一些预处理或者后置处理))

使用代理模式可以控制程序对 RealSubject 对象的访问,如果发现异常的访问,可以直接限流或是返回,也可以在执行业务处理的前后进行相关的预处理和后置处理,帮助上层调用方屏蔽底层的细节。例如,在 RPC 框架中,代理可以完成序列化、网络 I/O 操作、负载均衡、故障恢复以及服务发现等一系列操作,而上层调用方只感知到了一次本地调用

代理模式还可以用于实现延迟加载的功能。例如数据库检索场景,查询数据库是一个耗时的操作,而有些时候查询到的数据也并没有真正被程序使用。延迟加载功能就可以有效地避免这种浪费,系统访问数据库时,首先可以得到一个代理对象,此时并没有执行任何数据库查询操作,代理对象中自然也没有真正的数据;当系统真正需要使用数据时,再调用代理对象完成数据库查询并返回数据。常见 ORM 框架(例如MyBatis、 Hibernate)中的延迟加载的原理大致也是如此。

⚽ JDK 动态代理

(1)JDK 动态代理示例

​ 上述的代理模式是一种静态代理模式,在编译阶段就要为每个RealSubject类创建相应的Proxy类进行代理。当需要代理的类很多的时候,这个时候就会出现大量的Proxy类。基于这种场景优化方向思考,可以引入JDK动态代理来解决这个问题

​ JDK 动态代理核心:InvocationHandler接口(通过实现InvocationHandler接口中的invoke方法实现动态注入)。对于相同代理逻辑的场景,只需要提供一个InvocationHandler实现类。在 Java 运行的过程中,JDK会为每个 RealSubject 类动态生成相应的代理类并加载到 JVM 中,然后创建对应的代理实例对象,返回给上层调用者。

  • InvocationHandler接口实现:填充代理逻辑,提供getProxy方法创建代理对象
/**
 * JDK 动态代理(实现InvocationHandler)
 */
public class JdkInvokerHandler implements InvocationHandler {

    // 被代理对象
    private Object target;

    // 初始化
    JdkInvokerHandler(Object target) {
        this.target = target;
    }

    // 创建代理实例并返回
    public Object getProxy() {
        // 创建代理对象
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), target.getClass().getInterfaces(), this);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        System.out.println("proxy start ....."); // 前置处理
        Object res = method.invoke(target, args); // 执行业务方法逻辑
        System.out.println("proxy end ......"); // 后置处理
        return res;
    }
}
  • ② 模拟上层调用者:获取代理对象,执行业务逻辑
/**
 * JDK 动态代理 demo 测试
 */
public class JdkInvokerHandlerDemo {

    public static void main(String[] args) {
        Subject subject = new RealSubject();
        JdkInvokerHandler invokerHandler = new JdkInvokerHandler(subject);
        // 获取代理对象
        Subject proxy = (Subject) invokerHandler.getProxy();
        // 调用方法
        proxy.operation(); // 会调用DemoInvokerHandler.invoke()方法
    }

}
(2)JDK 动态代理原理剖析

​ 结合上述样例分析,JDK 动态代理相关实现的入口是 Proxy.newProxyInstance() 这个静态方法,它的三个参数分别是加载动态生成的代理类的类加载器、业务类实现的接口和上面介绍的InvocationHandler对象。Proxy.newProxyInstance()方法的具体实现如下(不同JDK版本实现有细微差别,但核心思路不变,此处以JDK1.8为例进行分析(省略try..catch...处理,关注核心代码实现)

@CallerSensitive
public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h) throws IllegalArgumentException
{
    Objects.requireNonNull(h);

    final Class<?>[] intfs = interfaces.clone();
    final SecurityManager sm = System.getSecurityManager();
    if (sm != null) {
        checkProxyAccess(Reflection.getCallerClass(), loader, intfs);
    }

    Class<?> cl = getProxyClass0(loader, intfs);


    if (sm != null) {
        checkNewProxyPermission(Reflection.getCallerClass(), cl);
    }

    final Constructor<?> cons = cl.getConstructor(constructorParams);
    final InvocationHandler ih = h;
    if (!Modifier.isPublic(cl.getModifiers())) {
        AccessController.doPrivileged(new PrivilegedAction<Void>() {
            public Void run() {
                cons.setAccessible(true);
                return null;
            }
        });
    }
    return cons.newInstance(new Object[]{h});

}

​ 通过 newProxyInstance()方法的实现可以看到,JDK 动态代理是在 getProxyClass0() 方法中完成代理类的生成和加载。getProxyClass0() 方法的具体实现如下。其中proxyClassCache是定义在 Proxy 类中的静态字段,主要用于缓存已经创建过的代理类

private static final WeakCache<ClassLoader, Class<?>[], Class<?>> proxyClassCache = new WeakCache<>(new KeyFactory(), new ProxyClassFactory());

private static Class<?> getProxyClass0(ClassLoader loader,
                                       Class<?>... interfaces) {
    if (interfaces.length > 65535) {
        throw new IllegalArgumentException("interface limit exceeded");
    }

    // 边界检查,限制接口数量(略)
    // 如果指定的类加载器中已经创建了实现指定接口的代理类,则查找缓存;
    // 否则通过ProxyClassFactory创建实现指定接口的代理类
    return proxyClassCache.get(loader, interfaces);
}

​ WeakCache.get() 方法会首先尝试从缓存中查找代理类,如果查找不到,则会创建 Factory 对象并调用其 get() 方法获取代理类。Factory 是 WeakCache 中的内部类,Factory.get() 方法会调用 ProxyClassFactory.apply() 方法创建并加载代理类。

​ ProxyClassFactory.apply() 方法首先会检测代理类需要实现的接口集合,然后确定代理类的名称,之后创建代理类并将其写入文件中,最后加载代理类,返回对应的 Class 对象用于后续的实例化代理类对象。该方法的具体实现如下:

image-20241230173451177

private static final class ProxyClassFactory implements BiFunction<ClassLoader, Class<?>[], Class<?>>
{
    // ....... 核心参数定义 ......

    @Override
    public Class<?> apply(ClassLoader loader, Class<?>[] interfaces) {

        // ...... 1.对interfaces集合进行一系列检测(略) ......

        // ...... 2.选择定义代理类的包名(略) ......

        // 代理类的名称是通过包名、代理类名称前缀及编号这三项组成的
        long num = nextUniqueNumber.getAndIncrement();
        String proxyName = proxyPkg + proxyClassNamePrefix + num;

        // 生成代理类并写入文件(此处将try...catch...省略)
        byte[] proxyClassFile = ProxyGenerator.generateProxyClass(proxyName, interfaces, accessFlags);
        // 加载代理类并返回Class对象
        return defineClass0(loader, proxyName, proxyClassFile, 0, proxyClassFile.length);
    }
}

ProxyGenerator.generateProxyClass() 方法会按照指定的名称和接口集合生成代理类的字节码,并根据条件决定是否保存到磁盘上。该方法的具体代码如下:

public static byte[] generateProxyClass(final String var0, Class<?>[] var1, int var2) {
    ProxyGenerator var3 = new ProxyGenerator(var0, var1, var2);
    // 动态生成代理类的字节码
    final byte[] var4 = var3.generateClassFile(); 
    // 如果saveGeneratedFiles值为true,会将生成的代理类的字节码保存到文件中
    if (saveGeneratedFiles) {
        AccessController.doPrivileged(new PrivilegedAction<Void>() {
            public Void run() {
                try {
                    int var1 = var0.lastIndexOf(46);
                    Path var2;
                    if (var1 > 0) {
                        Path var3 = Paths.get(var0.substring(0, var1).replace('.', File.separatorChar));
                        Files.createDirectories(var3);
                        var2 = var3.resolve(var0.substring(var1 + 1, var0.length()) + ".class");
                    } else {
                        var2 = Paths.get(var0 + ".class");
                    }

                    Files.write(var2, var4, new OpenOption[0]);
                    return null;
                } catch (IOException var4x) {
                    throw new InternalError("I/O exception saving generated file: " + var4x);
                }
            }
        });
    }
	// 返回上面生成的代理类的字节码
    return var4;
}
🔔 查看JDK动态代理生成的类的内容

​ 关注核心代码byte[] proxyClassFile = ProxyGenerator.generateProxyClass(proxyName, interfaces, accessFlags);,此处可自定义方法生成代理类。此处生成Subject接口的代理类,在指定目标路径可以跟踪到.class文件,随后借助反编译工具查看文件信息

/**
 * JDK 动态代理:将生成的代理类写入文件并输出
 */
public class JdkProxySourceClassExport {

    public static void writeClassToDisk(String path) {
        // 动态代理生成类信息
        byte[] classFile = ProxyGenerator.generateProxyClass("$proxy4", new Class[]{Subject.class});
        // 写入本地磁盘
        FileOutputStream fos = null;
        try {
            fos = new FileOutputStream(path);
            fos.write(classFile);
            fos.flush();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (fos != null) {
                try {
                    fos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        // 动态生成代理类并输出到指定的目录文件
        JdkProxySourceClassExport.writeClassToDisk("D:/$Proxy4.class");
    }
}

D:/$Proxy4.class反编译文件信息查看:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

import com.noob.rpc.proxy.designer.Subject;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;

public final class $proxy4 extends Proxy implements Subject {
    private static Method m1;
    private static Method m3;
    private static Method m2;
    private static Method m0;

    public $proxy4(InvocationHandler var1) throws  {
        super(var1);
    }

    public final boolean equals(Object var1) throws  {
        try {
            return (Boolean)super.h.invoke(this, m1, new Object[]{var1});
        } catch (RuntimeException | Error var3) {
            throw var3;
        } catch (Throwable var4) {
            throw new UndeclaredThrowableException(var4);
        }
    }

    public final void operation() throws  {
        try {
            super.h.invoke(this, m3, (Object[])null);
        } catch (RuntimeException | Error var2) {
            throw var2;
        } catch (Throwable var3) {
            throw new UndeclaredThrowableException(var3);
        }
    }

    public final String toString() throws  {
        try {
            return (String)super.h.invoke(this, m2, (Object[])null);
        } catch (RuntimeException | Error var2) {
            throw var2;
        } catch (Throwable var3) {
            throw new UndeclaredThrowableException(var3);
        }
    }

    public final int hashCode() throws  {
        try {
            return (Integer)super.h.invoke(this, m0, (Object[])null);
        } catch (RuntimeException | Error var2) {
            throw var2;
        } catch (Throwable var3) {
            throw new UndeclaredThrowableException(var3);
        }
    }

    static {
        try {
            m1 = Class.forName("java.lang.Object").getMethod("equals", Class.forName("java.lang.Object"));
            m3 = Class.forName("com.noob.rpc.proxy.designer.Subject").getMethod("operation");
            m2 = Class.forName("java.lang.Object").getMethod("toString");
            m0 = Class.forName("java.lang.Object").getMethod("hashCode");
        } catch (NoSuchMethodException var2) {
            throw new NoSuchMethodError(var2.getMessage());
        } catch (ClassNotFoundException var3) {
            throw new NoClassDefFoundError(var3.getMessage());
        }
    }
}

​ 从反编译出来的源码中可以看到在静态代码块中得到了equals、toString、hashCode和Subject接口中operation方法的Method对象。当调用Subject中的operation方法的时候,实际上就会调用InvocationHandler中的invoke方法,并传入了之前获取到的对应的Method和参数

​ ==JDK动态代理为什么只能代理接口不能代理类?==根据所得的代理类源码分析public final class $proxy4 extends Proxy implements Subject,生成的代理类默认继承了Proxy这个类,而java是单继承机制所以此处只能代理接口(例如此处implements Subject)而不能代理类

🔔JDK 动态代理核心总结

​ JDK 动态代理的实现原理是动态创建代理类并通过指定类加载器进行加载,在创建代理对象时将InvocationHandler对象作为构造参数传入。当调用代理对象时,会调用 InvocationHandler.invoke() 方法,从而执行代理逻辑,并最终调用真正业务对象的相应方法

⚽ CGLib 动态代理

(1)CGLib 动态代理核心

​ 基于上述分析可知,JDK动态代理是Java原生支持的,不需要任何外部依赖。但JDK动态代理的局限性也在于其只能基于接口进行代理,那么对于没有继承任何接口的类则无法进行代理。因此,对于没有实现任何接口的类进行代理,可以考虑使用 CGLib

CGLib(Code Generation Library)是一个基于 ASM 的字节码生成库,它允许在运行时对字节码进行修改和动态生成。CGLib 采用字节码技术实现动态代理功能,其底层原理是通过字节码技术为目标类生成一个子类,并在该子类中采用方法拦截的方式拦截所有父类方法的调用,从而实现代理的功能。

​ 因为 CGLib 使用生成子类的方式实现动态代理,所以无法代理 final 关键字修饰的方法(因为final 方法是不能够被重写的)。基于此,CGLib 与 JDK 动态代理之间可以相互补充:在目标类实现接口时,使用 JDK 动态代理创建代理对象;当目标类没有实现接口时,使用 CGLib 实现动态代理的功能。在 Spring、MyBatis 等多种开源框架中,都可以看到JDK动态代理与 CGLib 结合使用的场景。

CGLib 的实现有两个重要的成员组成:

  • Enhancer:指定要代理的目标对象以及实际处理代理逻辑的对象,最终通过调用 create() 方法得到代理对象,对这个对象所有的非 final 方法的调用都会转发给 MethodInterceptor 进行处理
  • MethodInterceptor:动态代理对象的方法调用都会转发到intercept方法进行增强

这两个组件的使用与 JDK 动态代理中的 Proxy 和 InvocationHandler 概念相似

(2)CGLib 动态代理 示例 ①

构建步骤说明

  • ① 引入CGLib相关依赖
  • ② 构建代理类实现MethodInterceptor接口
  • ③ 构建测试模拟调用
① 引入CGLib相关依赖
<!-- 引入CGLib依赖 -->
<dependency>
    <groupId>cglib</groupId>
    <artifactId>cglib</artifactId>
    <version>3.3.0</version>
</dependency>
② 构建代理类实现MethodInterceptor接口

​ 接收被代理类并构建代理实例返回(和JDK动态代理实现InvocationHandler接口概念类似)

/**
 * 基于Cglib的动态代理(实现MethodInterceptor)
 */
public class CglibDynamicProxy implements MethodInterceptor {

    // 被代理对象
    private Object target;

    // 构造函数初始化
    public CglibDynamicProxy(Object target) {
        this.target = target;
    }

    // 创建并返回代理实例
    public Object getProxyInstance() {
        Enhancer enhancer = new Enhancer();
        enhancer.setSuperclass(target.getClass());
        enhancer.setCallback(this);
        return enhancer.create();
    }

    // 实现intercept方法
    @Override
    public Object intercept(Object object, Method method, Object[] args, MethodProxy proxy) throws Throwable {
        System.out.println("Before method invocation: " + method.getName());
        Object result = method.invoke(target, args);
        System.out.println("After method invocation: " + method.getName());
        return result;
    }
}
③ 构建测试模拟调用(接口、类)

接口代理

// 代理接口定义
public interface Subject {
    public void operation();
}

// 真正的执行对象
public class RealSubject implements Subject {
    @Override
    public void operation() {
        System.out.println("i am real subject ......");
    }
}

类代理

// 被代理对象(类)
public class Target {
    public String operation(String str) {
        System.out.println(str);
        return "do sth:" + str;
    }
}

代理测试

/**
 * 基于Cglib动态代理测试
 */
public class CglibDynamicProxyDemo {

    // 接口代理测试
    public static void testInterface() {
        // ① 接口代理
        RealSubject realSubject = new RealSubject();
        CglibDynamicProxy proxy = new CglibDynamicProxy(realSubject);
        Subject subject = (Subject) proxy.getProxyInstance();
        subject.operation();
    }

    // 类代理测试
    public static void testClass() {
        // ② 类代理
        CglibDynamicProxy proxy = new CglibDynamicProxy(new Target());
        Target target = (Target) proxy.getProxyInstance();
        target.operation("test ...");
    }

    public static void main(String[] args) {
        testInterface();
        System.out.println("---------------------------------------");
        testClass();
    }
}
(3)CGLib 动态代理示例 ②

​ 本质核心也是接收被代理对象并基于其构建代理,提供返回代理的实例,重写intercept方法实现代理业务逻辑

/**
 * CGLib 动态代理
 */
public class CGLibProxy implements MethodInterceptor {

    // ① 初始化Enhancer对象
    private Enhancer enhancer = new Enhancer();

    // ② 获取代理
    public Object getProxy(Class clazz) {
        enhancer.setSuperclass(clazz); // 指定生成的代理类的父类
        enhancer.setCallback(this); // 设置Callback对象
        return enhancer.create(); // 通过ASM字节码技术动态创建子类实例
    }

    // ③ 实现MethodInterceptor接口的intercept方法
    @Override
    public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
        System.out.println("前置处理......");
        Object result = methodProxy.invokeSuper(o, objects); // obj args 调用父类中的方法
        System.out.println("后置处理......");
        return result;
    }
}
/**
 * CGLib 动态代理
 */
public class CGLibProxyDemo {

    public static void main(String[] args) {
        // 创建代理
        CGLibProxy proxy = new CGLibProxy();

        // 生成Target的代理对象
        Target target = (Target)proxy.getProxy(Target.class);

        // 调用代理对象的method()方法
        String result = target.operation("test");
        System.out.println(result);

        /**
         * 输出:
         * 前置处理......
         * test
         * 后置处理......
         * do sth:test
         */
    }
}

2.Javassist

Javassist 是一个开源的生成 Java 字节码的类库,其主要优点在于简单、快速,直接使用Javassist 提供的 Java API 就能动态修改类的结构,或是动态生成类。

⚽ 基于 Javassist 提供的Java API动态创建类

基于 Javassist 提供的Java API动态创建类

  • ① 引入Javassist依赖

  • ② 构建demo测试基于Javassist 生成类

① 引入Javassist依赖

<!-- 引入 javassist 依赖 -->
<dependency>
    <groupId>org.javassist</groupId>
    <artifactId>javassist</artifactId>
    <version>3.29.0-GA</version>
</dependency>

② 构建demo测试基于Javassist 生成类

​ 测试通过之后会在指定目录(例如代码示例中D:\\)生成JavassistTemplate.class文件(com.noob.javassist.demo.JavassistTemplate

/**
 * Javassist demo 示例
 */
public class JavassistDemo1 {

    // 基于Javassist生成类(返回CtClass类型)
    public static CtClass generateClass(String savePath) throws Exception {
        // 创建ClassPool
        ClassPool cp = ClassPool.getDefault();

        // 指定生成的类名称
        CtClass clazz = cp.makeClass("com.noob.javassist.demo.JavassistTemplate");

        // 创建字段,指定字段基础属性:字段类型、字段名称、字段所属的类
        CtField field = new CtField(cp.get("java.lang.String"), "prop", clazz);
        field.setModifiers(Modifier.PRIVATE); // 指定该字段使用private修饰

        // 设置prop字段的getter/setter方法
        clazz.addMethod(CtNewMethod.setter("setProp", field));
        clazz.addMethod(CtNewMethod.getter("getProp", field));

        // 设置prop字段的初始化值,并将prop字段添加到clazz中
        clazz.addField(field, CtField.Initializer.constant("hello world"));

        // 创建构造方法,指定了构造方法的参数类型和构造方法所属的类
        CtConstructor ctConstructor = new CtConstructor(new CtClass[]{}, clazz);

        // 设置方法体
        StringBuffer body = new StringBuffer();
        body.append("{\n prop=\"dididi\";\n}");
        ctConstructor.setBody(body.toString());
        clazz.addConstructor(ctConstructor); // 将构造方法添加到clazz中

        // 创建execute()方法,指定了方法返回值、方法名称、方法参数列表以及方法所属的类
        CtMethod ctMethod = new CtMethod(CtClass.voidType, "execute", new CtClass[]{}, clazz);
        ctMethod.setModifiers(Modifier.PUBLIC); // 指定该方法使用public修饰

        // 设置方法体
        body = new StringBuffer(); // reset
        body.append("{\n System.out.println(\"execute():\" " + "+ this.prop);").append("\n}"); // 填充方法体内容
        ctMethod.setBody(body.toString()); // 设置方法体
        clazz.addMethod(ctMethod); // 将execute()方法添加到clazz中

        // 将定义好的JavassistDemo类保存到指定的目录
        clazz.writeFile(savePath);  // /Users/xxx/

        // 返回CtClass对象
        return clazz;
    }

    // 测试基于Javassist生成类
    public static void testJavassist(CtClass clazz) throws Exception {
        // 加载clazz类,并创建对象
        Class<?> c = clazz.toClass();
        Object o = c.newInstance();

        // 调用execute()方法
        Method method = o.getClass().getMethod("execute", new Class[]{});
        method.invoke(o, new Object[]{});
    }


    public static void main(String[] args) throws Exception {
        CtClass clazz = generateClass("D:\\");
        testJavassist(clazz);
    }
}

// output
execute():dididi

​ 生成的代码示例JavassistTemplate.class参考(在指定的生成目录可以跟踪到生成的JavassistTemplate.class文件,将其反编译后得到下述信息)

package com.noob.javassist.demo;

public class JavassistTemplate {
    private String prop = "hello world";

    public void setProp(String var1) {
        this.prop = var1;
    }

    public String getProp() {
        return this.prop;
    }

    public JavassistTemplate() {
        this.prop = "dididi";
    }

    public void execute() {
        System.out.println("execute():" + this.prop);
    }
}

⚽ 基于 Javassist 实现动态代理功能

​ Javassist 也可以实现动态代理功能,底层的原理也是通过创建目标类的子类的方式实现的。这里使用 Javassist 为上面生成的 JavassistTemplate类 创建一个代理对象并实现动态代理功能

/**
 * Javassist 动态代理
 */
public class JavassistDemo2 {

    public static void main(String[] args) throws Exception {
        ProxyFactory factory = new ProxyFactory();

        // 指定父类,ProxyFactory会动态生成继承该父类的子类
        factory.setSuperclass(JavassistTemplate.class);
        // 设置过滤器,判断哪些方法调用需要被拦截
        factory.setFilter(new MethodFilter() {
            public boolean isHandled(Method m) {
                if (m.getName().equals("execute")) {
                    return true;
                }
                return false;
            }
        });

        // 设置拦截处理
        factory.setHandler(new MethodHandler() {
            @Override
            public Object invoke(Object self, Method thisMethod, Method proceed, Object[] args) throws Throwable {
                System.out.println("前置处理");
                Object result = proceed.invoke(self, args);
                System.out.println("执行结果:" + result);
                System.out.println("后置处理");
                return result;
            }
        });

        // 创建JavassistTemplate的代理类,并创建代理对象
        Class<?> c = factory.createClass();
        JavassistTemplate javassistTemplate = (JavassistTemplate) c.newInstance();
        javassistTemplate.execute(); // 执行execute()方法,会被拦截
        System.out.println(javassistTemplate.getProp());
    }

}

// output
前置处理
execute():dididi
执行结果:null
后置处理
dididi

⑦ Netty 入门核心

学习目标

  • 原生JDK的NIO机制(API设计非常复杂,开发和维护成本极大,且JDK自身存在一些Bug(例如Epoll Bug))
  • Netty 切入背景:基于传统Java NIO机制编程的不足,需要一套NIO框架统一封装这些公共能力
  • Netty IO模型:BIO、NIO、IO多路复用机制
  • Netty 线程模型:
    • ① 单Reactor单线程(此处表示单线程是性能瓶颈)
      • 核心:Reactor监听客户端请求,然后通过Dispatch进行分发
        • 如果是【建立连接的事件】,通过Acceptor处理连接请求,然后创建一个 Handler 对象处理连接建立之后的业务请求
        • 如果是【数据的读写事件】(非建立连接事件),则 Reactor 会将事件分发对应的 Handler 来处理(由唯一的线程调用Handler对象完成完整流程(读取数据、业务处理、发送响应))。在这个过程中如果出现连接不可用或者不可写的情况,该单线程会去执行其他Handler逻辑,而不会阻塞等待
      • 不足:存在性能瓶颈,一个线程只能跑在一个CPU上,无法充分发挥多核CPU的优势(一旦某个业务逻辑耗时较长,这个线程就会卡在这里而无法处理其他请求,程序进入假死状态,可用性降低)
    • 单Reactor多线程(此处表示单Reactor是性能瓶颈)
      • 核心:基本流程和【单Reactor单线程】机制差不多,唯一不同的点在于其是执行 Handler 逻辑的线程隶属于一个线程池
      • 特点:单 Reactor 多线程的模型可以充分利用多核 CPU 的处理能力,提高整个系统的吞吐量,但引入多线程模型就要考虑线程并发、数据共享、线程调度等问题
      • 不足:在这个模型中,只有一个线程来处理(分发) Reactor 监听到的所有 I/O 事件,其中就包括连接建立事件以及读写事件(交由worker线程池中的线程处理),当连接数不断增大的时候,这个唯一的 Reactor 线程也会遇到瓶颈
    • ③ 主从Reactor多线程
      • 核心:为了解决【单Reactor多线程】中的问题,可以考虑引入多个Reactor(MainReactor 、SubReactor)
        • MainReactor 主线程负责通过 Acceptor 对象处理 MainReactor 监听到的连接建立事件,当Acceptor 完成网络连接的建立之后,MainReactor 会将建立好的连接分配给 SubReactor 进行后续监听
        • SubReactor 负责监听该连接上的读写事件,当有新事件触发就会调用对应的Handler读取数据并分发给Worker线程池中的线程进行处理并返回结果
      • 特点:
        • 多Reactor =》主从职责明确(主监听连接、从监听读写):主 Reactor 只负责监听连接建立事件,SubReactor只负责监听读写事件
        • 多线程 =》充分利用多核CPU的优势,可以支持扩展,而且与具体的业务逻辑充分解耦,复用性
      • 不足:不足之处体现在交互上略显复杂,需要一定的编程门槛
    • ④ Netty 线程模型
      • 核心:Netty 同时支持上述几种线程模式,Netty 针对服务器端的设计是在主从 Reactor 多线程模型的基础上进行的修改
        • 抽象出两组线程池:BossGroup 专门用于接收客户端的连接,WorkerGroup 专门用于网络的读写

1.Netty 概念核心

(1)Netty 切入背景

JDK 本身提供了一套 NIO 的 API,但是这一套原生的 API 存在一系列的问题:

  • Java NIO 的 API 非常复杂:要写出成熟可用的 Java NIO 代码,需要熟练掌握 JDK 中的 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等组件,还要理解其中一些反人类的设计以及底层原理,这对新手来说是非常不友好的
  • 如果直接使用 Java NIO 进行开发,难度和开发量会非常大:需要自己补齐很多可靠性方面的实现(例如网络波动导致的连接重连、半包读写等)。这就会导致一些本末倒置的情况出现:核心业务逻辑比较简单,但补齐其他公共能力的代码非常多,开发耗时比较长。这时就需要一个统一的 NIO 框架来封装这些公共能力
  • JDK 自身的 Bug:其中比较出名的就要属 Epoll Bug ,这个 Bug 会导致 Selector 空轮询,CPU 使用率达到 100%,这样就会导致业务逻辑无法执行,降低服务性能

Netty 在 JDK 自带的 NIO API 基础之上进行了封装,解决了 JDK 自身的一些问题,具备如下优点:

  • 入门简单,使用方便,文档齐全,无其他依赖(依赖 JDK 即可)
  • 高性能,高吞吐,低延迟,资源消耗少
  • 灵活的线程模型,支持阻塞和非阻塞的I/O 模型
  • 代码质量高,目前主流版本基本没有 Bug

​ 正因为 Netty 有以上优点,所以很多互联网公司以及开源的 RPC 框架都将其作为网络通信的基础库,例如,Apache Spark、Apache Flink、 Elastic Search、Dubbo 等。下述将从 I/O 模型和线程模型的角度详细梳理 Netty 的核心设计,进而辅助全面掌握 Netty 原理

(2)Netty I/O 模型设计

​ 在进行网络 I/O 操作的时候,用什么样的方式读写数据将在很大程度上决定了 I/O 的性能。作为一款优秀的网络基础库,Netty 就采用了 NIO 的 I/O 模型,这也是其高性能的重要原因之一

① 传统阻塞 I/O 模型(BIO,blocking IO)

​ 在传统阻塞型 I/O 模型(BIO)中,如下图所示,每个请求都需要独立的线程完成读数据、业务处理以及写回数据的完整操作

image-20241231163504637

​ 一个线程在同一时刻只能与一个连接绑定,如下图所示,当请求的并发量较大时,就需要创建大量线程来处理连接,这就会导致系统浪费大量的资源进行线程切换,降低程序的性能。网络数据的传输速度是远远慢于 CPU 的处理速度,连接建立后,并不总是有数据可读,连接也并不总是可写,那么线程就只能阻塞等待,CPU 的计算能力不能得到充分发挥,同时还会导致大量线程的切换,浪费资源。

image-20241231163645399
② I/O 多路复用模型

​ 针对传统的阻塞 I/O 模型的缺点,I/O 复用的模型在性能方面有不小的提升。I/O 复用模型中的多个连接会共用一个 Selector 对象,由 Selector 感知连接的读写事件,而此时的线程数并不需要和连接数一致,只需要很少的线程定期从 Selector 上查询连接的读写状态即可,无须大量线程阻塞等待连接。当某个连接有新的数据可以处理时,操作系统会通知线程,线程从阻塞状态返回,开始进行读写操作以及后续的业务逻辑处理。I/O 复用的模型如下图所示

image-20241231163744572

​ Netty 就是采用了上述 I/O 复用的模型。由于多路复用器 Selector 的存在,可以同时并发处理成百上千个网络连接,大大增加了服务器的处理能力。另外,Selector 并不会阻塞线程,也就是说当一个连接不可读或不可写的时候,线程可以去处理其他可读或可写的连接,这就充分提升了 I/O 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程切换。如下图所示

image-20241231163938658

​ 从数据处理的角度来看,传统的阻塞 I/O 模型处理的是字节流或字符流,也就是以流式的方式顺序地从一个数据流中读取一个或多个字节,并且不能随意改变读取指针的位置。而在 NIO 中则抛弃了这种传统的 I/O 流概念,引入了 ChannelBuffer 的概念,可以从 Channel 中读取数据到 Buffer 中或将数据从 Buffer 中写入到 Channel。Buffer 不像传统 I/O 中的流那样必须顺序操作,在 NIO 中可以读写 Buffer 中任意位置的数据

(3)Netty 线程模型设计

​ 服务器程序在读取到二进制数据之后,首先需要通过编解码,得到程序逻辑可以理解的消息,然后将消息传入业务逻辑进行处理,并产生相应的结果,返回给客户端。编解码逻辑、消息派发逻辑、业务处理逻辑以及返回响应的逻辑,是放到一个线程里面串行执行,还是分配到不同的线程中执行,会对程序的性能产生很大的影响。所以,优秀的线程模型对一个高性能网络库来说是至关重要的。

Netty 采用了 Reactor 线程模型的设计。 Reactor 模式,也被称为 Dispatcher 模式,核心原理是 Selector 负责监听 I/O 事件,在监听到 I/O 事件之后,分发(Dispatch)给相关线程进行处理

​ 为了更好地了解 Netty 线程模型的设计理念,此处从最基础的单 Reactor 单线程模型开始介绍,然后逐步增加模型的复杂度,最终到 Netty 目前使用的非常成熟的线程模型设计。

① 单Reactor单线程

​ Reactor 对象监听客户端请求事件,收到事件后通过 Dispatch 进行分发。如果是连接建立的事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对象处理连接建立之后的业务请求。如果不是连接建立的事件,而是数据的读写事件,则 Reactor 会将事件分发对应的 Handler 来处理,由这里唯一的线程调用 Handler 对象来完成读取数据、业务处理、发送响应的完整流程。当然,该过程中也可能会出现连接不可读或不可写等情况,该单线程会去执行其他 Handler 的逻辑,而不是阻塞等待。具体情况如下图所示

image-20241231164352941

​ 单 Reactor 单线程的优点就是:线程模型简单,没有引入多线程,自然也就没有多线程并发和竞争的问题

​ 但其缺点也非常明显,那就是性能瓶颈问题,一个线程只能跑在一个 CPU 上,能处理的连接数是有限的,无法完全发挥多核 CPU 的优势。一旦某个业务逻辑耗时较长,这唯一的线程就会卡在上面,无法处理其他连接的请求,程序进入假死的状态,可用性也就降低了。正是由于这种限制,一般只会在客户端使用这种线程模型

②单Reactor多线程

​ 在单 Reactor 多线程的架构中,Reactor 监控到客户端请求之后,如果连接建立的请求,则由Acceptor 通过 accept 处理,然后创建一个 Handler 对象处理连接建立之后的业务请求。如果不是连接建立请求,则 Reactor 会将事件分发给调用连接对应的 Handler 来处理。到此为止,该流程与单 Reactor 单线程的模型基本一致,唯一的区别就是执行 Handler 逻辑的线程隶属于一个线程池

image-20241231164605776

​ 单 Reactor 多线程的模型可以充分利用多核 CPU 的处理能力,提高整个系统的吞吐量,但引入多线程模型就要考虑线程并发、数据共享、线程调度等问题。在这个模型中,只有一个线程来处理 Reactor 监听到的所有 I/O 事件,其中就包括连接建立事件以及读写事件,当连接数不断增大的时候,这个唯一的 Reactor 线程也会遇到瓶颈。

③主从Reactor多线程

​ 为了解决单 Reactor 多线程模型中的问题,可以引入多个 Reactor。其中,Reactor 主线程负责通过 Acceptor 对象处理 MainReactor 监听到的连接建立事件,当Acceptor 完成网络连接的建立之后,MainReactor 会将建立好的连接分配给 SubReactor 进行后续监听。

​ 当一个连接被分配到一个 SubReactor 之上时,会由 SubReactor 负责监听该连接上的读写事件。当有新的读事件(OP_READ)发生时,Reactor 子线程就会调用对应的 Handler 读取数据,然后分发给 Worker 线程池中的线程进行处理并返回结果。待处理结束之后,Handler 会根据处理结果调用 send 将响应返回给客户端,当然此时连接要有可写事件(OP_WRITE)才能发送数据。

image-20241231164753363

​ 主从 Reactor 多线程的设计模式解决了单一 Reactor 的瓶颈。主从 Reactor 职责明确,主 Reactor 只负责监听连接建立事件,SubReactor只负责监听读写事件。整个主从 Reactor 多线程架构充分利用了多核 CPU 的优势,可以支持扩展,而且与具体的业务逻辑充分解耦,复用性高。但不足的地方是,在交互上略显复杂,需要一定的编程门槛

④ Netty 线程模型

​ Netty 同时支持上述几种线程模式,Netty 针对服务器端的设计是在主从 Reactor 多线程模型的基础上进行的修改,如下图所示

image-20241231165002870

Netty 抽象出两组线程池:BossGroup 专门用于接收客户端的连接,WorkerGroup 专门用于网络的读写。BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup,相当于一个事件循环组,其中包含多个事件循环 ,每一个事件循环是 NioEventLoop

​ NioEventLoop 表示一个不断循环的、执行处理任务的线程,每个 NioEventLoop 都有一个Selector 对象与之对应,用于监听绑定在其上的连接,这些连接上的事件由 Selector 对应的这条线程处理。每个 NioEventLoopGroup 可以含有多个 NioEventLoop,也就是多个线程

​ 每个 Boss NioEventLoop 会监听 Selector 上连接建立的 accept 事件,然后处理 accept 事件与客户端建立网络连接,生成相应的 NioSocketChannel 对象,一个 NioSocketChannel 就表示一条网络连接。之后会将 NioSocketChannel 注册到某个 Worker NioEventLoop 上的 Selector 中

​ 每个 Worker NioEventLoop 会监听对应 Selector 上的 read/write 事件,当监听到 read/write 事件的时候,会通过 Pipeline 进行处理。一个 Pipeline 与一个 Channel 绑定,在 Pipeline 上可以添加多个 ChannelHandler,每个 ChannelHandler 中都可以包含一定的逻辑,例如编解码等。Pipeline 在处理请求的时候,会按照我们指定的顺序调用 ChannelHandler

2.Netty 框架核心组件

  • Netty 框架核心组件
    • I/O 模型设计相关抽象概念中的组件(如Selector
      • IO 多路复用构成核心:ChannelChannelFutureSelector
        • Channel对网络连接的抽象,核心是执行网络IO操作,支持同步或异步操作
        • ChannelFuture:一般实践中采用异步操作,通过向 ChannelFuture 上注册监听器来监听 I/O 操作的结果
        • Selector对多路复用的抽象,Netty 基于 Selector 对象实现 I/O 多路复用,Selector 内部会通过系统调用不断地查询这些注册在其上的 Channel 是否有已就绪的 I/O 事件,进而通过一个线程监听多个Channel上发生的事件,而无须使用用户线程进行轮询
    • 线程模型设计相关抽象概念中的组件(如 NioEventLoopNioEventLoopGroup
      • Netty 线程模型构成核心:NioEventLoopNioEventLoopGroup
    • Netty 处理数据的相关组件(如ByteBuf内存管理相关)
      • 从内存分配管理、内存碎片优化、并发分配内存等角度切入

(1)Channel

​ Channel 是 Netty 对网络连接的抽象,核心功能是执行网络 I/O 操作。不同协议、不同阻塞类型的连接对应不同的 Channel 类型。一般用的都是 NIO 的 Channel,下面是一些常用的 NIO Channel 类型:

  • NioSocketChannel:对应异步的 TCP Socket 连接。
  • NioServerSocketChannel:对应异步的服务器端 TCP Socket 连接。
  • NioDatagramChannel:对应异步的 UDP 连接。

​ 上述异步 Channel 主要提供了异步的网络 I/O 操作,例如:建立连接、读写操作等。异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用返回时所请求的 I/O 操作已完成。I/O 操作返回的是一个 ChannelFuture 对象,无论 I/O 操作是否成功,Channel 都可以通过监听器通知调用方,通过向 ChannelFuture 上注册监听器来监听 I/O 操作的结果。

​ Netty 也支持同步 I/O 操作,但在实践中几乎不使用。绝大多数情况下,使用的是 Netty 中异步 I/O 操作。虽然立即返回一个 ChannelFuture 对象,但不能立刻知晓 I/O 操作是否成功,这时就需要向 ChannelFuture 中注册一个监听器,当操作执行成功或失败时,监听器会自动触发注册的监听事件

​ 另外,Channel 还提供了检测当前网络连接状态等功能,这些功能的提供可以辅助实现网络异常断开后自动重连的功能

(2)Selector

Selector 是对多路复用器的抽象,也是 Java NIO 的核心基础组件之一。Netty 就是基于 Selector 对象实现 I/O 多路复用的,在 Selector 内部,会通过系统调用不断地查询这些注册在其上的 Channel 是否有已就绪的 I/O 事件,例如,可读事件(OP_READ)、可写事件(OP_WRITE)或是网络连接事件(OP_ACCEPT)等,而无须使用用户线程进行轮询。这样,我们就可以用一个线程监听多个 Channel 上发生的事件

(3)ChannelPipeline&ChannelHandler

​ 提到 Pipeline,可能最先想到的是 Linux 命令中的管道,它可以实现将一条命令的输出作为另一条命令的输入。Netty 中的 ChannelPipeline 也可以实现类似的功能:ChannelPipeline 会将一个 ChannelHandler 处理后的数据作为下一个 ChannelHandler 的输入

​ 下图引用了 Netty Javadoc 中对 ChannelPipeline 的说明,描述了 ChannelPipeline 中 ChannelHandler 通常是如何处理 I/O 事件的。Netty 中定义了两种事件类型:入站(Inbound)事件出站(Outbound)事件。这两种事件就像 Linux 管道中的数据一样,在 ChannelPipeline 中传递,事件之中也可能会附加数据。ChannelPipeline 之上可以注册多个 ChannelHandler(ChannelInboundHandler 或 ChannelOutboundHandler),在 ChannelHandler 注册的时候决定处理 I/O 事件的顺序,这就是典型的责任链模式

image-20250102210957356

​ 结合图示分析可知,I/O 事件不会在 ChannelPipeline 中自动传播,而是需要调用ChannelHandlerContext 中定义的相应方法进行传播,例如:fireChannelRead() 方法和 write() 方法等。

​ 此处简单举例说明如下所示,在该 ChannelPipeline 上,添加了 5 个 ChannelHandler 对象

ChannelPipeline p = socketChannel.pipeline(); 
p.addLast("1", new InboundHandlerA()); 
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
  • 入站(Inbound)与出站(Outbound)事件处理顺序正好相反:
    • ① 对于入站(Inbound)事件,处理序列为:1 -> 2 -> 5
    • ② 对于出站(Outbound)事件,处理序列为:5 -> 4 -> 3

入站(Inbound)事件一般由 I/O 线程触发。举个例子,自定义了一种消息协议,一条完整的消息是由消息头和消息体两部分组成,其中消息头会含有消息类型、控制位、数据长度等元数据,消息体则包含了真正传输的数据。在面对一块较大的数据时,客户端一般会将数据切分成多条消息发送,服务端接收到数据后,一般会先进行解码和缓存,待收集到长度足够的字节数据,组装成有固定含义的消息之后,才会传递给下一个 ChannelInboudHandler 进行后续处理。

​ 在 Netty 中就提供了很多 Encoder 的实现用来解码读取到的数据,Encoder 会处理多次 channelRead() 事件,等拿到有意义的数据之后,才会触发一次下一个 ChannelInboundHandler 的 channelRead() 方法

​ **出站(Outbound)事件与入站(Inbound)事件相反,一般是由用户触发的。**ChannelHandler 接口中并没有定义方法来处理事件,而是由其子类进行处理的,如下图所示,ChannelInboundHandler 拦截并处理入站事件,ChannelOutboundHandler 拦截并处理出站事件。

image-20250102211720795

​ Netty 提供的 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 主要是帮助完成事件流转功能的,即自动调用传递事件的相应方法。这样,在自定义 ChannelHandler 实现类的时候,就可以直接继承相应的 Adapter 类,并覆盖需要的事件处理方法,其他不关心的事件方法直接使用默认实现即可,从而提高开发效率。

​ ChannelHandler 中的很多方法都需要一个 ChannelHandlerContext 类型的参数,ChannelHandlerContext 抽象的是 ChannleHandler 之间的关系以及 ChannelHandler 与ChannelPipeline 之间的关系。ChannelPipeline 中的事件传播主要依赖于ChannelHandlerContext 实现,在 ChannelHandlerContext 中维护了 ChannelHandler 之间的关系,所以可以从 ChannelHandlerContext 中得到当前 ChannelHandler 的后继节点,从而将事件传播到后续的 ChannelHandler。

​ ChannelHandlerContext 继承了 AttributeMap,所以提供了 attr() 方法设置和删除一些状态属性信息,我们可将业务逻辑中所需使用的状态属性值存入到 ChannelHandlerContext 中,然后这些属性就可以随它传播了。Channel 中也维护了一个 AttributeMap,与 ChannelHandlerContext 中的 AttributeMap,从 Netty 4.1 开始,都是作用于整个 ChannelPipeline。

​ 通过上述分析,可以了解到,一个 Channel 对应一个 ChannelPipeline,一个 ChannelHandlerContext 对应一个ChannelHandler。 如下图所示:

image-20250102212157372

​ 最后,需要注意的是,如果要在 ChannelHandler 中执行耗时较长的逻辑,例如,操作 DB 、进行网络或磁盘 I/O 等操作,一般会在注册到 ChannelPipeline 的同时,指定一个线程池异步执行 ChannelHandler 中的操作

(4)NioEventLoop & NioEventLoopGroup

NioEventLoop

一个 EventLoop 对象由一个永远都不会改变的线程驱动,同时一个 NioEventLoop 包含了一个 Selector 对象,可以支持多个 Channel 注册在其上,该 NioEventLoop 可以同时服务多个 Channel,每个 Channel 只能与一个 NioEventLoop 绑定,这样就实现了线程与 Channel 之间的关联。

​ Channel 中的 I/O 操作是由 ChannelPipeline 中注册的 ChannelHandler 进行处理的,而 ChannelHandler 的逻辑都是由相应 NioEventLoop 关联的那个线程执行的。

除了与一个线程绑定之外,NioEvenLoop 中还维护了两个任务队列:

  • 普通任务队列:用户产生的普通任务可以提交到该队列中暂存,NioEventLoop 发现该队列中的任务后会立即执行。这是一个多生产者、单消费者的队列,Netty 使用该队列将外部用户线程产生的任务收集到一起,并在 Reactor 线程内部用单线程的方式串行执行队列中的任务。例如,外部非 I/O 线程调用了 Channel 的 write() 方法,Netty 会将其封装成一个任务放入 TaskQueue 队列中,这样,所有的 I/O 操作都会在 I/O 线程中串行执行。

  • 定时任务队列:当用户在非 I/O 线程产生定时操作时,Netty 将用户的定时操作封装成定时任务,并将其放入该定时任务队列中等待相应 NioEventLoop 串行执行

​ 基于上述分析可知,NioEventLoop 主要做三件事:监听 I/O 事件、执行普通任务以及执行定时任务。NioEventLoop 到底分配多少时间在不同类型的任务上,是可以配置的。另外,为了防止 NioEventLoop 长时间阻塞在一个任务上,一般会将耗时的操作提交到其他业务线程池处理

NioEventLoopGroup

NioEventLoopGroup 表示的是一组 NioEventLoop。Netty 为了能更充分地利用多核 CPU 资源,一般会有多个 NioEventLoop 同时工作,至于多少线程可由用户决定,Netty 会根据实际上的处理器核数计算一个默认值,具体计算公式是:CPU 的核心数 * 2,也可以根据实际情况手动调整。

​ 当一个 Channel 创建之后,Netty 会调用 NioEventLoopGroup 提供的 next() 方法,按照一定规则获取其中一个 NioEventLoop 实例,并将 Channel 注册到该 NioEventLoop 实例,之后,就由该 NioEventLoop 来处理 Channel 上的事件。EventLoopGroup、EventLoop 以及 Channel 三者的关联关系,如下图所示

image-20250102213109999

​ 在 Netty 服务器端中,会有 BossEventLoopGroup 和 WorkerEventLoopGroup 两个 NioEventLoopGroup。通常一个服务端口只需要一个ServerSocketChannel,对应一个 Selector 和一个 NioEventLoop 线程。

​ BossEventLoop 负责接收客户端的连接事件,即 OP_ACCEPT 事件,然后将创建的 NioSocketChannel 交给 WorkerEventLoopGroup; WorkerEventLoopGroup 会由 next() 方法选择其中一个 NioEventLoopGroup,并将这个 NioSocketChannel 注册到其维护的 Selector 并对其后续的I/O事件进行处理。

image-20250102213303963

​ 如上图,BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个 Selector 对象,其上注册了一个 ServerSocketChannel,BoosEventLoop 会不断轮询 Selector 监听连接事件,在发生连接事件时,通过 accept 操作与客户端创建连接,创建 SocketChannel 对象。然后将 accept 操作得到的 SocketChannel 交给 WorkerEventLoopGroup,在Reactor 模式中 WorkerEventLoopGroup 中会维护多个 EventLoop,而每个 EventLoop 都会监听分配给它的 SocketChannel 上发生的 I/O 事件,并将这些具体的事件分发给业务线程池处理。

(5)ByteBuf - 数据的容器

​ 在进行跨进程远程交互的时候,我们需要以字节的形式发送和接收数据,发送端和接收端都需要一个高效的数据容器来缓存字节数据,ByteBuf 就扮演了这样一个数据容器的角色。

​ ByteBuf 类似于一个字节数组,其中维护了一个读索引和一个写索引,分别用来控制对 ByteBuf 中数据的读写操作,两者符合不等式:0 <= readerIndex <= writerIndex <= capacity

image-20250102213504283

​ ByteBuf 提供的读写操作 API 主要操作底层的字节容器(byte[]、ByteBuffer 等)以及读写索引这两指针(可以查阅相关的 API 说明),Netty 中主要分为以下三大类 ByteBuf:

  • Heap Buffer(堆缓冲区):最常用的一种 ByteBuf,它将数据存储在 JVM 的堆空间,其底层实现是在 JVM 堆内分配一个数组,实现数据的存储。堆缓冲区可以快速分配,当不使用时也可以由 GC 轻松释放。它还提供了直接访问底层数组的方法,通过 ByteBuf.array() 来获取底层存储数据的 byte[] 。
  • Direct Buffer(直接缓冲区):直接缓冲区会使用堆外内存存储数据,不会占用 JVM 堆的空间,使用时应该考虑应用程序要使用的最大内存容量以及如何及时释放。直接缓冲区在使用 Socket 传递数据时性能很好,当然,它也是有缺点的,因为没有了 JVM GC 的管理,在分配内存空间和释放内存时,比堆缓冲区更复杂,Netty 主要使用内存池来解决这样的问题,这也是 Netty 使用内存池的原因之一
  • Composite Buffer(复合缓冲区):可以创建多个不同的 ByteBuf,然后提供一个这些 ByteBuf 组合的视图,也就是 CompositeByteBuf。它就像一个列表,可以动态添加和删除其中的 ByteBuf

(6)内存管理

Netty 使用 ByteBuf 对象作为数据容器,进行 I/O 读写操作,其实 Netty 的内存管理也是围绕着ByteBuf 对象高效地分配和释放。从内存管理角度来看,ByteBuf 可分为 Unpooled 和 Pooled 两类。

  • Unpooled,是指非池化的内存管理方式:每次分配时直接调用系统 API 向操作系统申请 ByteBuf,在使用完成之后,通过系统调用进行释放。Unpooled 将内存管理完全交给系统,不做任何特殊处理,使用起来比较方便,对于申请和释放操作不频繁、操作成本比较低的 ByteBuf 来说,是比较好的选择。
  • Pooled,是指池化的内存管理方式:该方式会预先申请一块大内存形成内存池,在需要申请 ByteBuf 空间的时候,会将内存池中一部分合理的空间封装成 ByteBuf 给服务使用,使用完成后回收到内存池中。前面提到 DirectByteBuf 底层使用的堆外内存管理比较复杂,池化技术很好地解决了这一问题。
Netty 的 ByteBuf 池化技术分析

​ 下面从如何高效分配和释放内存、如何减少内存碎片以及在多线程环境下如何减少锁竞争这三个方面介绍一下 Netty 提供的 ByteBuf 池化技术。

​ Netty 首先会向系统申请一整块连续内存,称为 Chunk(默认大小为 16 MB),这一块连续的内存通过 PoolChunk 对象进行封装。之后,Netty 将 Chunk 空间进一步拆分为 Page,每个 Chunk 默认包含 2048 个 Page,每个 Page 的大小为 8 KB

​ 在同一个 Chunk 中,Netty 将 Page 按照不同粒度进行分层管理。如下图所示,从下数第 1 层中每个分组的大小为 1 * PageSize,一共有 2048 个分组;第 2 层中每个分组大小为 2 * PageSize,一共有 1024 个组;第 3 层中每个分组大小为 4 * PageSize,一共有 512 个组;依次类推,直至最顶层

image-20250102213943062

① 内存分配 & 释放

​ 当服务向内存池请求内存时,Netty 会将请求分配的内存数向上取整到最接近的分组大小,然后在该分组的相应层级中从左至右寻找空闲分组。例如,服务请求分配 3 * PageSize 的内存,向上取整得到的分组大小为 4 * PageSize,在该层分组中找到完全空闲的一组内存进行分配即可

​ 当分组大小 4 * PageSize 的内存分配出去后,为了方便下次内存分配,分组被标记为全部已使用(图中红色标记),向上更粗粒度的内存分组被标记为部分已使用(图中黄色标记)。

​ Netty 使用完全平衡树的结构实现了上述算法,这个完全平衡树底层是基于一个 byte 数组构建的,图示如下

image-20250102214049018

② 大对象 & 小对象的处理

​ 当申请分配的对象是超过 Chunk 容量的大型对象,Netty 就不再使用池化管理方式了,在每次请求分配内存时单独创建特殊的非池化 PoolChunk 对象进行管理,当对象内存释放时整个PoolChunk 内存释放。

如果需要一定数量空间远小于 PageSize 的 ByteBuf 对象,例如,创建 256 Byte 的 ByteBuf,按照上述算法,就需要为每个小 ByteBuf 对象分配一个 Page,这就出现了很多内存碎片。Netty 通过再将 Page 细分的方式,解决这个问题。Netty 将请求的空间大小向上取最近的 16 的倍数(或 2 的幂),规整后小于 PageSize 的小 Buffer 可分为两类。

  • 微型对象:规整后的大小为 16 的整倍数,如 16、32、48、……、496,一共 31 种大小。
  • 小型对象:规整后的大小为 2 的幂,如 512、1024、2048、4096,一共 4 种大小。

Netty 的实现会先从 PoolChunk 中申请空闲 Page,同一个 Page 分为相同大小的小 Buffer 进行存储;这些 Page 用 PoolSubpage 对象进行封装,PoolSubpage 内部会记录它自己能分配的小 Buffer 的规格大小、可用内存数量,并通过 bitmap 的方式记录各个小内存的使用情况(如下图所示)。虽然这种方案不能完美消灭内存碎片,但是很大程度上还是减少了内存浪费。

image-20250102214149859

​ 为了解决单个 PoolChunk 容量有限的问题,Netty 将多个 PoolChunk 组成链表一起管理,然后用 PoolChunkList 对象持有链表的 head。

Netty 通过 PoolArena 管理 PoolChunkList 以及 PoolSubpage

​ PoolArena 内部持有 6 个 PoolChunkList,各个 PoolChunkList 持有的 PoolChunk 的使用率区间有所不同,如下图所示:

image-20250102214223948

​ 6 个 PoolChunkList 对象组成双向链表,当 PoolChunk 内存分配、释放,导致使用率变化,需要判断 PoolChunk 是否超过所在 PoolChunkList 的限定使用率范围,如果超出了,需要沿着 6 个 PoolChunkList 的双向链表找到新的合适的 PoolChunkList ,成为新的 head。同样,当新建 PoolChunk 分配内存或释放空间时,PoolChunk 也需要按照上面逻辑放入合适的PoolChunkList 中

image-20250102214308043

​ 从上图可以看出,这 6 个 PoolChunkList 额定使用率区间存在交叉,这样设计的原因是:如果使用单个临界值的话,当一个 PoolChunk 被来回申请和释放,内存使用率会在临界值上下徘徊,这就会导致它在两个 PoolChunkList 链表中来回移动。

​ PoolArena 内部持有 2 个 PoolSubpage 数组,分别存储微型 Buffer 和小型 Buffer 的PoolSubpage。相同大小的 PoolSubpage 组成链表,不同大小的 PoolSubpage 链表的 head 节点保存在 tinySubpagePools 或者 smallSubpagePools 数组中,如下图:

image-20250102214440892

③ 并发处理

​ 内存分配释放不可避免地会遇到多线程并发场景,PoolChunk 的完全平衡树标记以及 PoolSubpage 的 bitmap 标记都是多线程不安全的,都是需要加锁同步的。为了减少线程间的竞争,Netty 会提前创建多个 PoolArena(默认数量为 2 * CPU 核心数),当线程首次请求池化内存分配,会找被最少线程持有的 PoolArena,并保存线程局部变量 PoolThreadCache 中,实现线程与 PoolArena 的关联绑定

​ Netty 还提供了延迟释放的功能,来提升并发性能。当内存释放时,PoolArena 并没有马上释放,而是先尝试将该内存关联的 PoolChunk 和 Chunk 中的偏移位置等信息存入 ThreadLocal 的固定大小缓存队列中,如果该缓存队列满了,则马上释放内存。当有新的分配请求时,PoolArena 会优先访问线程本地的缓存队列,查询是否有缓存可用,如果有,则直接分配,提高分配效率

⑧ 简易版RPC框架实现

学习目标

1.RPC框架构建分析(基石-远程调用)

​ RPC 是“远程过程调用(Remote Procedure Call)”的缩写形式,比较通俗的解释是:像本地方法调用一样调用远程的服务。虽然 RPC 的定义非常简单,但是相对完整的、通用的 RPC 框架涉及很多方面的内容,例如注册发现、服务治理、负载均衡、集群容错、RPC 协议等

image-20250103104552138

​ 实现RPC框架的基石部分——远程调用,用于构建简易版本RPC框架一次远程调用核心流程:

  • ① Client 首先会调用本地的代理(图中的 Proxy)
  • ② Client 端 Proxy 会按照协议(Protocol),将调用中传入的数据序列化成字节流
  • ③ Client 会通过网络,将字节数据发送到 Server 端
  • ④ Server 端接收到字节数据之后,会按照协议进行反序列化,得到相应的请求信息
  • ⑤ Server 端 Proxy 会根据序列化后的请求信息,调用相应的业务逻辑
  • ⑥ Server 端业务逻辑的返回值,也会按照上述逻辑返回给 Client 端

核心代码结构说明

  • protocol:简易版 RPC 框架的自定义协议
  • serialization:提供了自定义协议对应的序列化、反序列化的相关工具类
  • codec:提供了自定义协议对应的编码器和解码器
  • transport:基于 Netty 提供了底层网络通信的功能,其中会使用到 codec 包中定义编码器和解码器,以及 serialization 包中的序列化器和反序列化器
  • registry:基于 ZooKeeper 和 Curator 实现了简易版本的注册中心功能
  • proxy:使用 JDK 动态代理实现了一层代理

简易版本rpc所需依赖

  • Netty:辅助构建网络通信
  • hessian:序列化
  • snappy:压缩算法
  • curator-x-discovery:zookeeper 客户端
  • ......
<!-- 引入rpc构建所需相关辅助依赖 -->

<!-- 引入Netty框架 -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.25.Final</version>
</dependency>

<!-- 序列化(hessian) -->
<dependency>
    <groupId>com.caucho</groupId>
    <artifactId>hessian</artifactId>
    <!--            <version>4.0.38</version>-->
    <version>3.1.5</version>
</dependency>

<!-- 压缩算法(snappy) -->
<dependency>
    <groupId>org.xerial.snappy</groupId>
    <artifactId>snappy-java</artifactId>
    <version>1.1.7.5</version>
</dependency>

<!-- 注册中心(zookeeper客户端curator) -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-discovery-server</artifactId>
    <version>2.13.0</version>
</dependency>

(1)自定义协议(protocol

自定义协议相关实体

  • Header消息头(自定义协议定义)
  • RequestResponse 请求/响应实体定义
  • Message:消息体定义(包括消息头和消息内容)

​ 当前已经有很多成熟的协议了,例如 HTTP、HTTPS 等,那为什么还要自定义 RPC 协议呢?

​ 从功能角度考虑,HTTP 协议在 1.X 时代,只支持半双工传输模式,虽然支持长连接,但是不支持服务端主动推送数据。从效率角度来看,在一次简单的远程调用中,只需要传递方法名和加个简单的参数,此时,HTTP 请求中大部分数据都被 HTTP Header 占据,真正的有效负载非常少,效率就比较低。

​ 当然,HTTP 协议也有自己的优势,例如,天然穿透防火墙,大量的框架和开源软件支持 HTTP 接口,而且配合 REST 规范使用也是很便捷的,所以有很多 RPC 框架直接使用 HTTP 协议,尤其是在 HTTP 2.0 之后,如 gRPC、Spring Cloud 等

​ 此处自定义一个简易版本的rpc-simple-demo的自定义协议:

shortbytebytelongintn byte
magicversionextraInfomessageIdsizemessage body
魔数协议版本0(消息类型:请求/响应)
1~2(序列化方式)
3~4(压缩方式)
5~6(请求类型:正常请求、心跳请求)
消息ID消息体长度消息体

​ 在上述消息头中,包含了整个 RPC 消息的一些控制信息,例如,版本号、魔数、消息类型、附加信息、消息 ID 以及消息体的长度,在附加信息(extraInfo)中,按位进行划分,分别定义消息的类型、序列化方式、压缩方式以及请求类型。当然,也可以自己扩充 Demo RPC 协议,实现更加复杂的功能

​ 消息头Header定义(此处省略getter、setter定义)

public class Header {

    private short magic; // 魔数

    private byte version; // 协议版本

    private byte extraInfo; // 附加信息

    private Long messageId; // 消息ID

    private Integer size; // 消息体长度

    ... // 省略getter/setter方法

}

Request 实体 & Response 实体

​ 用 Request 和 Response 两个实体类来表示请求消息和响应消息的消息体

public class Request implements Serializable {

    private String serviceName; // 请求的Service类名

    private String methodName; // 请求的方法名称

    private Class[] argTypes; // 请求方法的参数类型

    private Object[] args; // 请求方法的参数

    ... // 省略getter/setter方法

}

public class Response implements Serializable {

    private int code = 0; // 响应的错误码,正常响应为0,非0表示异常响应

    private String errMsg; // 异常信息

    private Object result; // 响应结果

    ... // 省略getter/setter方法

}

(2)序列化(serialization

序列化与反序列化 构建核心

  • ① 序列化接口定义(Serialization

  • ② 不同的序列化方式实现(xxxSerialization):基于不同的序列化机制,实现Serialization接口,填充序列化逻辑

  • ③ 序列化工厂(SerializationFactory):根据不同的类型创建序列化实例对象

  • 序列化接口定义Serialization(定义Serialization 接口提供方法)

​ 为了让这两个类的对象能够在 Client 和 Server 之间跨进程传输,需要进行序列化和反序列化操作,这里定义一个 Serialization 接口,统一完成序列化相关的操作,可自定义方法实现序列化接口逻辑

/**
 * 序列化接口定义
 */
public interface Serialization {
    // 序列化
    <T> byte[] serialize(T obj) throws IOException;

    // 反序列化
    <T> T deserialize(byte[] data, Class<T> clz) throws IOException;
}

​ 序列化机制:JDK序列化、Hessian序列化等方式实现。不同的序列化方式只需要实现Serialization接口方法逻辑即可。例如此处定义HessianSerialization基于Hessian序列化方式实现Serialization接口

  • 基于Hessian的序列化方式实现(HessianSerialization作为rpc-simple-demo的默认序列化方式)
/**
 * 基于Hessian的序列化机制实现
 */
public class HessianSerialization implements Serialization {
    @Override
    public <T> byte[] serialize(T obj) throws IOException {
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        HessianOutput hessianOutput = new HessianOutput(os);
        hessianOutput.writeObject(obj);
        return os.toByteArray();
    }

    @Override
    public <T> T deserialize(byte[] data, Class<T> clz) throws IOException {
        Hessian2Input input = new Hessian2Input(new ByteArrayInputStream(data));
        return (T) input.readObject(clz);
    }
}
  • 序列化工厂(SerializationFactory):根据不同的类型创建序列化实例对象
/**
 * 对象工厂方法:根据指定类型返回序列化机制
 */
public class SerializationFactory {

    public static Serialization get(byte type) {
        switch (type & 0x7) {
            case 0x0:
                return new HessianSerialization();
            default:
                return new HessianSerialization();
        }
    }
}

(3)压缩方式(compress

压缩相关构建核心(定义Compressor接口)

  • ① 定义Compressor接口
  • ② 根据不同的压缩算法实现Compressor接口(xxxCompressor实现)
  • ③ 压缩工厂(CompressorFactory):根据不同的类型创建压缩实例对象

​ 在一些场景中,请求或者传输的数据比较大,直接传输比较消耗带宽,所以一般会采用压缩后再发送的方式。在前面介绍的消息头中的 extraInfo 字段中,就包含了标识消息体压缩方式的 bit 位。此处定义一个 Compressor 接口抽象所有压缩算法:

  • Compressor 压缩算法接口
/**
 * 压缩相关接口定义(压缩、解压)
 */
public interface Compressor {
    // 压缩对象
    byte[] compress(byte[] array) throws IOException;

    // 解压对象
    byte[] unCompress(byte[] array) throws IOException;
}
  • 基于Snappy压缩算法的实现(SnappyCompressor作为rpc-simple-demo的默认压缩方式)
/**
 * 基于Snappy实现的压缩机制
 */
public class SnappyCompressor implements Compressor {

    @Override
    public byte[] compress(byte[] array) throws IOException {
        if (array == null) {
            return null;
        }
        return Snappy.compress(array);
    }

    @Override
    public byte[] unCompress(byte[] array) throws IOException {
        if (array == null) {
            return null;
        }
        return Snappy.uncompress(array);
    }
}
  • 压缩工厂(CompressorFactory):根据不同的类型创建压缩实例对象
/**
 * 对象工厂方法:根据指定类型返回压缩机制
 */
public class CompressorFactory {
    public static Compressor get(byte extraInfo) {
        switch (extraInfo & 24) {
            case 0x0:
                return new SnappyCompressor();
            default:
                return new SnappyCompressor();
        }
    }
}

(4)编解码器(codec

​ 上述介绍 Netty 核心概念的时候提到过,Netty 每个 Channel 绑定一个 ChannelPipeline,并依赖 ChannelPipeline 中添加的 ChannelHandler 处理接收到(或要发送)的数据,其中就包括字节到消息(以及消息到字节)的转换。Netty 中提供了 ByteToMessageDecoder、 MessageToByteEncoder、MessageToMessageEncoder、MessageToMessageDecoder 等抽象类来实现 Message 与 ByteBuf 之间的转换以及 Message 之间的转换

​ 在 Netty 的源码中,可以看到对很多已有协议的序列化和反序列化都是基于上述抽象类实现的,例如,HttpServerCodec 中通过依赖 HttpServerRequestDecoder 和 HttpServerResponseEncoder 来实现 HTTP 请求的解码和 HTTP 响应的编码。如下图所示,HttpServerRequestDecoder 继承自 ByteToMessageDecoder,实现了 ByteBuf 到 HTTP 请求之间的转换;HttpServerResponseEncoder 继承自 MessageToMessageEncoder,实现 HTTP 响应到其他消息的转换(其中包括转换成 ByteBuf 的能力)

​ 在简易版 RPC 框架中,自定义请求暂时没有 HTTP 协议那么复杂,只要简单继承 ByteToMessageDecoder 和 MessageToMessageEncoder 即可

  • DemoRpcDecoder
public class RpcDecoder extends ByteToMessageDecoder {

    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
        if (byteBuf.readableBytes() < Constants.HEADER_SIZE) {
            return; // 不到16字节的话无法解析消息头,暂不读取
        }
        // 记录当前readIndex指针的位置,方便重置
        byteBuf.markReaderIndex();
        // 尝试读取消息头的魔数部分
        short magic = byteBuf.readShort();
        if (magic != Constants.MAGIC) { // 魔数不匹配会抛出异常
            byteBuf.resetReaderIndex(); // 重置readIndex指针
            throw new RuntimeException("magic number error:" + magic);
        }
        // 依次读取消息版本、附加信息、消息ID以及消息体长度四部分
        byte version = byteBuf.readByte();
        byte extraInfo = byteBuf.readByte();
        long messageId = byteBuf.readLong();
        int size = byteBuf.readInt();
        Object body = null;
        // 心跳消息是没有消息体的,无需读取
        if (!Constants.isHeartBeat(extraInfo)) {
            // 对于非心跳消息,没有积累到足够的数据是无法进行反序列化的
            if (byteBuf.readableBytes() < size) {
                byteBuf.resetReaderIndex();
                return;
            }
            // 读取消息体并进行反序列化
            byte[] payload = new byte[size];
            byteBuf.readBytes(payload);
            // 这里根据消息头中的extraInfo部分选择相应的序列化和压缩方式
            Serialization serialization = SerializationFactory.get(extraInfo);
            Compressor compressor = CompressorFactory.get(extraInfo);
            if (Constants.isRequest(extraInfo)) {
                // 得到消息体
                body = serialization.deserialize(compressor.unCompress(payload), Request.class);
            } else {
                // 得到消息体
                body = serialization.deserialize(compressor.unCompress(payload), Response.class);
            }
        }
        // 将上面读取到的消息头和消息体拼装成完整的Message并向后传递
        Header header = new Header(magic, version, extraInfo, messageId, size);
        Message message = new Message(header, body);
        out.add(message);
    }
}
  • DemoRpcEncoder
public class RpcEncoder extends MessageToByteEncoder<Message> {

    @Override
    protected void encode(ChannelHandlerContext ctx,
                          Message message, ByteBuf byteBuf) throws Exception {
        Header header = message.getHeader();
        // 依次序列化消息头中的魔数、版本、附加信息以及消息ID
        byteBuf.writeShort(header.getMagic());
        byteBuf.writeByte(header.getVersion());
        byteBuf.writeByte(header.getExtraInfo());
        byteBuf.writeLong(header.getMessageId());
        Object content = message.getContent();
        if (Constants.isHeartBeat(header.getExtraInfo())) {
            byteBuf.writeInt(0); // 心跳消息,没有消息体,这里写入0
            return;
        }
        // 按照extraInfo部分指定的序列化方式和压缩方式进行处理
        Serialization serialization = SerializationFactory.get(header.getExtraInfo());
        Compressor compressor = CompressorFactory.get(header.getExtraInfo());
        byte[] payload = compressor.compress(serialization.serialize(content));
        byteBuf.writeInt(payload.length); // 写入消息体长度
        byteBuf.writeBytes(payload); // 写入消息体
    }
}

(5)客户端与服务端的通信(transport

​ 在介绍 Netty 线程模型的时候提到,不能在 Netty 的 I/O 线程中执行耗时的业务逻辑。在 rpc-simple-demo 框架的 Server 端接收到请求时,首先会通过 RpcDecoder 反序列化得到请求消息,之后会通过一个自定义的 ChannelHandler(RpcServerHandler)将请求提交给业务线程池进行处理

​ 在 rpc-simple-demo 框架的 Client 端接收到响应消息的时候,也是先通过 RpcDecoder 反序列化得到响应消息,之后通过一个自定义的 ChannelHandler(RpcClientHandler)将响应返回给上层业务

​ RpcServerHandler 和 RpcClientHandler 都继承自 SimpleChannelInboundHandler

​ RpcServerHandler 和 RpcClientHandler 参考实现如下,其中需要构建一些辅助类填充逻辑

① RpcServerHandler
public class RpcServerHandler extends SimpleChannelInboundHandler<Message<Request>> {

    // 业务线程池
    private static Executor executor = Executors.newCachedThreadPool();

    @Override
    protected void channelRead0(final ChannelHandlerContext channelHandlerContext, Message<Request> message) throws Exception {
        byte extraInfo = message.getHeader().getExtraInfo();
        if (Constants.isHeartBeat(extraInfo)) { // 心跳消息,直接返回即可
            channelHandlerContext.writeAndFlush(message);
            return;
        }
        // 非心跳消息,直接封装成Runnable提交到业务线程池
        executor.execute(new InvokeRunnable(message, channelHandlerContext));
    }
}

​ 其中RpcServerHandler中的InvokeRunnable是一个Runnable线程任务,在这个Runnable 任务中会根据请求的 serviceName、methodName 以及参数信息,调用相应的方法

class InvokeRunnable implements Runnable {

    private ChannelHandlerContext ctx;
    private Message<Request> message;

    public InvokeRunnable(Message<Request> message, ChannelHandlerContext ctx) {
        this.message = message;
        this.ctx = ctx;
    }

    @Override
    public void run() {
        Response response = new Response();
        Object result = null;
        try {
            Request request = message.getContent();
            String serviceName = request.getServiceName();
            // 提供BeanManager对所有业务Bean进行管理,其底层在内存中维护了一个业务Bean实例的集合(也可以尝试接入Spring等容器管理业务Bean)
            Object bean = BeanManager.getBean(serviceName);
            // 下面通过反射调用Bean中的相应方法
            Method method = bean.getClass().getMethod(request.getMethodName(), request.getArgTypes());
            result = method.invoke(bean, request.getArgs());
        } catch (Exception e) {
            // 省略异常处理
        } finally {
        }
        Header header = message.getHeader();
        header.setExtraInfo((byte) 1);
        response.setResult(result); // 设置响应结果
        // 将响应消息返回给客户端
        ctx.writeAndFlush(new Message(header, response));
    }

}
② RpcClientHandler
public class RpcClientHandler extends SimpleChannelInboundHandler<Message<Response>> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message<Response> message) throws Exception {
        NettyResponseFuture responseFuture = Connection.IN_FLIGHT_REQUEST_MAP.remove(message.getHeader().getMessageId());
        Response response = message.getContent();
        // 心跳消息特殊处理
        if (response == null && Constants.isHeartBeat(message.getHeader().getExtraInfo())) {
            response = new Response();
            response.setCode(Constants.HEARTBEAT_CODE);
        }
        responseFuture.getPromise().setSuccess(response.getResult());
    }
}

​ 其中RpcClientHandler相关 Client 端的 Connection,它是用来暂存已发送出去但未得到响应的请求,这样,在响应返回时,就可以查找到相应的请求以及 Future,从而将响应结果返回给上层业务逻辑

public class Connection implements Closeable {
    // 用于生成消息ID,全局唯一
    private final static AtomicLong ID_GENERATOR = new AtomicLong(0);

    // TODO 时间轮定时删除
    public final static Map<Long, NettyResponseFuture<Response>> IN_FLIGHT_REQUEST_MAP = new ConcurrentHashMap<>();

    private ChannelFuture future;

    private AtomicBoolean isConnected = new AtomicBoolean();

    public Connection() {
        this.isConnected.set(false);
        this.future = null;
    }

    public Connection(ChannelFuture future, boolean isConnected) {
        this.future = future;
        this.isConnected.set(isConnected);
    }

    public ChannelFuture getFuture() {
        return future;
    }

    public void setFuture(ChannelFuture future) {
        this.future = future;
    }

    public boolean isConnected() {
        return isConnected.get();
    }

    public void setIsConnected(boolean isConnected) {
        this.isConnected.set(isConnected);
    }

    public NettyResponseFuture<Response> request(Message<Request> message, long timeOut) {
        // 生成并设置消息ID
        long messageId = ID_GENERATOR.incrementAndGet();
        message.getHeader().setMessageId(messageId);
        // 创建消息关联的Future
        NettyResponseFuture responseFuture = new NettyResponseFuture(System.currentTimeMillis(),
                timeOut, message, future.channel(), new DefaultPromise(new DefaultEventLoop()));
        // 将消息ID和关联的Future记录到IN_FLIGHT_REQUEST_MAP集合中
        IN_FLIGHT_REQUEST_MAP.put(messageId, responseFuture);
        try {
            future.channel().writeAndFlush(message); // 发送请求
        } catch (Exception e) {
            // 发送请求异常时,删除对应的Future
            IN_FLIGHT_REQUEST_MAP.remove(messageId);
            throw e;
        }
        return responseFuture;
    }

    public boolean ping() {
        Header heartBeatHeader = new Header(Constants.MAGIC, Constants.VERSION_1);
        heartBeatHeader.setExtraInfo(Constants.HEART_EXTRA_INFO);
        Message message = new Message(heartBeatHeader, null);
        NettyResponseFuture<Response> request = request(message, Constants.DEFAULT_TIMEOUT);
        try {
            Promise<Response> await = request.getPromise().await();
            return await.get().getCode() == Constants.HEARTBEAT_CODE;
        } catch (Exception e) {
            return false;
        }
    }

    @Override
    public void close() throws IOException {
        future.channel().close();
    }
}

​ 此处 Connection 中没有定时清理 IN_FLIGHT_REQUEST_MAP 集合的操作,在无法正常获取响应的时候,就会导致 IN_FLIGHT_REQUEST_MAP 不断膨胀,最终 OOM。可以添加一个时间轮定时器,定时清理过期的请求消息。

③ 启动入口(服务端RpcClient、客户端RpcServer
  • NettyEventLoopFactory
  • RpcClient
  • RpcServer

NettyEventLoopFactory

public class NettyEventLoopFactory {
    public static EventLoopGroup eventLoopGroup(int threads, String threadFactoryName) {
        ThreadFactory threadFactory = new DefaultThreadFactory(threadFactoryName, true);
        return shouldEpoll() ? new EpollEventLoopGroup(threads, threadFactory) :
                new NioEventLoopGroup(threads, threadFactory);
    }

    public static Class<? extends SocketChannel> socketChannelClass() {
        return shouldEpoll() ? EpollSocketChannel.class : NioSocketChannel.class;
    }

    public static Class<? extends ServerSocketChannel> serverSocketChannelClass() {
        return shouldEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class;
    }

    private static boolean shouldEpoll() {
        return System.getProperty("os.name").toLowerCase().contains("linux");
    }
}

RpcClient

/**
 * 客户端入口
 */
public class RpcClient implements Closeable {

    protected Bootstrap clientBootstrap;
    protected EventLoopGroup group;
    private String host;
    private int port;

    public RpcClient(String host, int port) {
        this.host = host;
        this.port = port;
        // 创建并配置客户端Bootstrap
        clientBootstrap = new Bootstrap();
        group = NettyEventLoopFactory.eventLoopGroup(Constants.DEFAULT_IO_THREADS, "NettyClientWorker");
        clientBootstrap.group(group)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .channel(NioSocketChannel.class) // 创建的Channel类型
                // 指定ChannelHandler的顺序
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast("demo-rpc-encoder", new RpcEncoder());
                        ch.pipeline().addLast("demo-rpc-decoder", new RpcDecoder());
                        ch.pipeline().addLast("client-handler", new RpcClientHandler());
                    }
                });
    }


    public ChannelFuture connect() {
        // 连接指定的地址和端口
        ChannelFuture connect = clientBootstrap.connect(host, port);
        connect.awaitUninterruptibly();
        return connect;
    }

    @Override
    public void close() {
        group.shutdownGracefully();
    }
}

RpcServer

/**
 * 服务端入口
 */
public class RpcServer {

    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private ServerBootstrap serverBootstrap;
    private Channel channel;
    protected int port;

    public RpcServer(int port) throws InterruptedException {
        this.port = port;
        // 创建boss和worker两个EventLoopGroup,注意一些小细节,workerGroup 是按照中的线程数是按照 CPU 核数计算得到的
        bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
        workerGroup = NettyEventLoopFactory.eventLoopGroup(Math.min(Runtime.getRuntime().availableProcessors() + 1, 32), "NettyServerWorker");
        serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                // 指定每个Channel上注册的ChannelHandler以及顺序
                .handler(new LoggingHandler(LogLevel.INFO)).childHandler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast("demp-rpc-decoder", new RpcDecoder());
                                ch.pipeline().addLast("demo-rpc-encoder", new RpcEncoder());
                                ch.pipeline().addLast("server-handler", new RpcServerHandler());
                            }
                        });
    }

    public ChannelFuture start() throws InterruptedException {
        // 监听指定的端口
        ChannelFuture channelFuture = serverBootstrap.bind(port);
        channel = channelFuture.channel();
        channel.closeFuture();
        return channelFuture;
    }


    public void startAndWait() throws InterruptedException {
        try {
            channel.closeFuture().await();
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
    }


    public void shutdown() throws InterruptedException {
        channel.close().sync();
        if (bossGroup != null)
            bossGroup.shutdownGracefully().awaitUninterruptibly(15000);
        if (workerGroup != null)
            workerGroup.shutdownGracefully().awaitUninterruptibly(15000);
    }

}

​ 结合RpcClient/RpcServer代码理解ChannelHandler的执行顺序

image-20250103145007677

(6)注册中心(registry

​ 服务注册和服务发现能力是RPC框架的另一个基础能力,其对应registry包下的相关实现。registry 包主要是依赖 Apache Curator 实现了一个简易版本的 ZooKeeper 客户端,并基于 ZooKeeper 实现了注册中心最基本的两个功能:Provider 注册以及 Consumer 订阅

  • ServiceInfo:服务信息实体定义
  • Registry:提供注册和查询服务实例的方法
  • xxRegistry:实现Registry接口,例如此处提供ZookeeperRegistry是基于Zookeeper实现服务注册和发现,并引入ServiceCache缓存提高查询效率
  • ServiceInstanceListener:服务实例监听接口
  • AbstractServiceInstanceListener:服务实例接听接口实现

ServiceInfo

/**
 * 服务信息
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ServerInfo implements Serializable {

    private String host;

    private int port;

}

Registry

​ 注册相关接口方法定义:注册服务、取消注册服务、获取服务实例

public interface Registry<T> {

    void registerService(ServiceInstance<T> service) throws Exception;

    void unregisterService(ServiceInstance<T> service) throws Exception;

    List<ServiceInstance<T>> queryForInstances(String name) throws Exception;

}

ZookeeperRegistry

​ 基于Zookeeper实现的注册中心,填充服务注册、服务发现逻辑

public class ZookeeperRegistry<T> implements Registry<T> {

    private Map<String, List<ServiceInstanceListener<T>>> listeners = Maps.newConcurrentMap();

    private InstanceSerializer serializer = new JsonInstanceSerializer<>(ServerInfo.class);

    private ServiceDiscovery<T> serviceDiscovery;

    private ServiceCache<T> serviceCache;

    private String address = "localhost:2181";

    public void start() throws Exception {
        String root = "/demo/rpc";
        // 初始化CuratorFramework
        CuratorFramework client = CuratorFrameworkFactory.newClient(address, new ExponentialBackoffRetry(1000, 3));
        client.start();  // 启动Curator客户端
        // client.createContainers(root);

        // 初始化ServiceDiscovery
        serviceDiscovery = ServiceDiscoveryBuilder.builder(ServerInfo.class)
                .client(client).basePath(root)
                .serializer(serializer)
                .build();
        serviceDiscovery.start(); // 启动ServiceDiscovery

        // 创建ServiceCache,监Zookeeper相应节点的变化,也方便后续的读取
        serviceCache = serviceDiscovery.serviceCacheBuilder()
                .name("/demoService")
                .build();
//        client.start(); // 启动Curator客户端
        client.blockUntilConnected();  // 阻塞当前线程,等待连接成功
        serviceDiscovery.start(); // 启动ServiceDiscovery
        serviceCache.start(); // 启动ServiceCache
    }

    @Override
    public void registerService(ServiceInstance<T> service) throws Exception {
        serviceDiscovery.registerService(service);
    }

    @Override
    public void unregisterService(ServiceInstance service) throws Exception {
        serviceDiscovery.unregisterService(service);
    }

    @Override
    public List<ServiceInstance<T>> queryForInstances(String name) throws Exception {
        // 直接根据name进行过滤ServiceCache中的缓存数据
        return serviceCache.getInstances().stream()
                .filter(s -> s.getName().equals(name))
                .collect(Collectors.toList());
    }
}

ServiceInstanceListener

// 服务实例监听接口
public interface ServiceInstanceListener<T> {

    void onRegister(ServiceInstance<T> serviceInstance);

    void onRemove(ServiceInstance<T> serviceInstance);

    void onUpdate(ServiceInstance<T> serviceInstance);

    void onFresh(ServiceInstance<T> serviceInstance, ServerInfoEvent event);

    enum ServerInfoEvent {
        ON_REGISTER,
        ON_UPDATE,
        ON_REMOVE
    }

}

AbstractServiceInstanceListener

// 服务实例接听接口实现
public abstract class AbstractServiceInstanceListener<T> implements ServiceInstanceListener<T> {

    public void onFresh(ServiceInstance<T> serviceInstance, ServerInfoEvent event) {
        switch (event) {
            case ON_REGISTER:
                onRegister(serviceInstance);
                break;
            case ON_UPDATE:
                onUpdate(serviceInstance);
                break;
            case ON_REMOVE:
                onRemove(serviceInstance);
                break;
        }
    }
}

​ ZooKeeperRegistry 是基于 Curator 中的 ServiceDiscovery 组件与 ZooKeeper 进行交互的,并且对 Registry 接口的实现也是通过直接调用 ServiceDiscovery 的相关方法实现的。在查询时,直接读取 ServiceCache 中的缓存数据,ServiceCache 底层在本地维护了一个 ConcurrentHashMap 缓存,通过 PathChildrenCache 监听 ZooKeeper 中各个子节点的变化,同步更新本地缓存。ServiceCache核心实现参考如下:

public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheListener { // 实现PathChildrenCacheListener接口
    private final ListenerContainer<ServiceCacheListener> listenerContainer;
    private final ServiceDiscoveryImpl<T> discovery; // 关联的ServiceDiscovery实例
    private final AtomicReference<State> state;
    private final PathChildrenCache cache; // 底层的PathChildrenCache,用于监听子节点的变化
    private final ConcurrentMap<String, ServiceInstance<T>> instances; // 本地缓存

    private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory) {
        Preconditions.checkNotNull(threadFactory, "threadFactory cannot be null");
        return new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory));
    }

    // ...... 其他方法定义 ......

    // 返回本地缓存内容
    public List<ServiceInstance<T>> getInstances() {
        return Lists.newArrayList(this.instances.values());
    }

    ......

    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
        boolean notifyListeners = false;
        switch (event.getType()) {
            case CHILD_ADDED:
            case CHILD_UPDATED:
                this.addInstance(event.getData(), false); // 监听事件,更新本地缓存
                notifyListeners = true;
                break;
            case CHILD_REMOVED: 
                this.instances.remove(this.instanceIdFromData(event.getData())); // 监听事件,更新本地缓存
                notifyListeners = true;
        }
        
        // 通知ServiceCache上注册的监听器
        if (notifyListeners) {
            this.listenerContainer.forEach(new Function<ServiceCacheListener, Void>() {
                public Void apply(ServiceCacheListener listener) {
                    listener.cacheChanged();
                    return null;
                }
            });
        }

    }
    ...... 
}

(7)代理(proxy

​ 在简易版 rpc-simple-demo 框架中,Proxy 主要是为 Client 端创建一个代理,帮助客户端程序屏蔽底层的网络操作以及与注册中心之间的交互。

​ 简易版 rpc-simple-demo 使用 JDK 动态代理的方式生成代理,编写一个 InvocationHandler 接口的实现,即下面的 RpcProxy。其中有两个核心方法:一个是 newInstance() 方法,用于生成代理对象;另一个是 invoke() 方法,当调用目标对象的时候,会执行 invoke() 方法中的代理逻辑

public class RpcProxy implements InvocationHandler {

    private String serviceName; // 需要代理的服务(接口)名称

    public Map<Method, Header> headerCache = new ConcurrentHashMap<>();

    // 用于与Zookeeper交互,其中自带缓存
    private Registry<ServerInfo> registry;

    public RpcProxy(String serviceName,
                    Registry<ServerInfo> registry) throws Exception {
        this.serviceName = serviceName;
        this.registry = registry;
    }

    public static <T> T newInstance(Class<T> clazz, Registry<ServerInfo> registry) throws Exception {
        // 创建代理对象
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clazz}, new RpcProxy("demoService", registry));
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 从Zookeeper缓存中获取可用的Server地址,并随机从中选择一个
        List<ServiceInstance<ServerInfo>> serviceInstances = registry.queryForInstances(serviceName);
        ServiceInstance<ServerInfo> serviceInstance = serviceInstances.get(ThreadLocalRandom.current().nextInt(serviceInstances.size()));
        // 创建请求消息,然后调用remoteCall()方法请求上面选定的Server端
        String methodName = method.getName();
        Header header = headerCache.computeIfAbsent(method, h -> new Header(MAGIC, VERSION_1));
        Message<Request> message = new Message(header, new Request(serviceName, methodName, args));
        return remoteCall(serviceInstance.getPayload(), message);
    }

    protected Object remoteCall(ServerInfo serverInfo, Message message) throws Exception {
        if (serverInfo == null) {
            throw new RuntimeException("get available server error");
        }
        Object result;
        try {
            // RpcClient连接指定的Server端
            RpcClient demoRpcClient = new RpcClient(serverInfo.getHost(), serverInfo.getPort());
            ChannelFuture channelFuture = demoRpcClient.connect().awaitUninterruptibly();
            // 创建对应的Connection对象,并发送请求
            Connection connection = new Connection(channelFuture, true);
            NettyResponseFuture responseFuture = connection.request(message, Constants.DEFAULT_TIMEOUT);
            // 等待请求对应的响应
            result = responseFuture.getPromise().get(Constants.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw e;
        }
        return result;
    }
}

2.rpc-simple测试

① 接口 & 接口实现定义

public interface DemoService {
    String sayHello(String param);
}

public class DemoServiceImpl implements DemoService {
    public String sayHello(String param) {
        return "hello:" + param;
    }
}

② 服务提供方定义

public class Provider {
    public static void main(String[] args) throws Exception {
        // 创建DemoServiceImpl,并注册到BeanManager中
        BeanManager.registerBean("demoService", new DemoServiceImpl());

        // 创建ZookeeperRegistry,并将Provider的地址信息封装成ServerInfo对象注册到Zookeeper
        ZookeeperRegistry<ServerInfo> discovery = new ZookeeperRegistry<>();
        discovery.start();
        ServerInfo serverInfo = new ServerInfo("127.0.0.1", 20880);

        discovery.registerService(ServiceInstance.<ServerInfo>builder().name("demoService").payload(serverInfo).build());

        // 启动RpcServer,等待Client的请求
        RpcServer rpcServer = new RpcServer(20880);
        rpcServer.start();
        
        // 保持服务状态,让程序不要执行完就立刻退出
        Thread.sleep(100000000L);
    }
}

③ 服务消费方定义

public class Consumer {

    public static void main(String[] args) throws Exception {
        // 创建ZookeeperRegistry对象
        ZookeeperRegistry<ServerInfo> discovery = new ZookeeperRegistry<>();
        discovery.start();

        // 创建代理对象,通过代理调用远端Server
        DemoService demoService = RpcProxy.newInstance(DemoService.class, discovery);

        // 调用sayHello()方法,并输出结果
        String result = demoService.sayHello("hello");
        System.out.println(result);

        /*
        // 服务发现测试
        ZookeeperRegistry<ServerInfo> zcd = new ZookeeperRegistry<>();
        zcd.start();

        while(true){
            List<ServiceInstance<ServerInfo>> serverInfoList =  zcd.queryForInstances("demoService");
            for (ServiceInstance serverInfo : serverInfoList) {
                System.out.println("dddddddddddddddddd");
                System.out.println(serverInfo.getName() + serverInfo.getPort());
                Thread.sleep(3000);
            }
        }
         */
    }

}

测试说明

​ 启动Provider,随后启动Consumer,分别查看控制台输出日志,确认接口是否调用成功

常见问题 & 解决方案

① zookeeper 服务端 和 客户端版本兼容问题(zookeeper3.4.14版本,对照curator-x-discovery-server2.13.0版本解决兼容性问题)

image-20250103153300496

② ZookeeperRegistry出现NPE,需要注意主动调用discovery.start();,否则提示服务发现相关缓存异常

Exception in thread "main" java.lang.NullPointerException
at com.noob.rpc.simple.registry.ZookeeperRegistry.queryForInstances(ZookeeperRegistry.java:67)
at com.noob.rpc.simple.proxy.RpcProxy.invoke(RpcProxy.java:50)
at com.sun.proxy.$Proxy2.sayHello(Unknown Source)
at com.noob.rpc.simple.demo.test.Consumer.main(Consumer.java:21)

③ 保持 Provider 状态

​ Server 启动(如果直接启动程序注册完成就会退出),需要保证Provider服务端正常运行才可以正常接收响应,此处通过在Provider端启动中设定Thread.sleep(100000000L);保持服务提供者状态。否则客户端启动找不到可用的服务提供方

Exception in thread "main" java.lang.IllegalArgumentException: bound must be positive
	at java.util.concurrent.ThreadLocalRandom.nextInt(ThreadLocalRandom.java:351)
	at com.noob.rpc.simple.proxy.RpcProxy.invoke(RpcProxy.java:51)
	at com.sun.proxy.$Proxy2.sayHello(Unknown Source)
	at com.noob.rpc.simple.demo.test.Consumer.main(Consumer.java:21)

​ 定位发现在指定的代码行报错,是因为可用服务列表为空,直接通过迭代获取就会出现参数越界异常相关问题

ServiceInstance<ServerInfo> serviceInstance = serviceInstances.get(ThreadLocalRandom.current().nextInt(serviceInstances.size()));

​ 或者将Consumer都放在同一个main方法中测试,此处为了更好解释Provier和Consumer作用,单独拆开分别各自启动测试

io.netty.handler.codec.DecoderException: com.caucho.hessian.io.HessianProtocolException: expected integer at 0x74

​ 由于 hessinan 版本问题导致的错误,调整为hessian - 3.1.5

io.netty.handler.codec.DecoderException: com.caucho.hessian.io.HessianProtocolException: expected integer at 0x74
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:459)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:647)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:582)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:499)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:461)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.caucho.hessian.io.HessianProtocolException: expected integer at 0x74
	at com.caucho.hessian.io.Hessian2Input.error(Hessian2Input.java:2943)
	at com.caucho.hessian.io.Hessian2Input.expect(Hessian2Input.java:2891)
	at com.caucho.hessian.io.Hessian2Input.readInt(Hessian2Input.java:825)
	at com.caucho.hessian.io.Hessian2Input.readType(Hessian2Input.java:2438)
	at com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:1694)
	at com.noob.rpc.simple.serialization.HessianSerialization.deserialize(HessianSerialization.java:26)
	at com.noob.rpc.simple.codec.RpcDecoder.decode(RpcDecoder.java:53)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
	... 16 common frames omitted

io.netty.handler.codec.DecoderException: com.caucho.hessian.io.HessianProtocolException: 'com.noob.rpc.simple.protocol.Request' could not be instantiated

​ 切换到hessian - 3.1.5后提示下述错误,是由于相关实体定义缺少无参构造函数导致序列化异常(此处也需注意排查是否由于Lombok导致序列化异常问题)

io.netty.handler.codec.DecoderException: com.caucho.hessian.io.HessianProtocolException: 'com.noob.rpc.simple.protocol.Request' could not be instantiated
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:459)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:647)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:582)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:499)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:461)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.caucho.hessian.io.HessianProtocolException: 'com.noob.rpc.simple.protocol.Request' could not be instantiated
	at com.caucho.hessian.io.JavaDeserializer.instantiate(JavaDeserializer.java:275)
	at com.caucho.hessian.io.JavaDeserializer.readMap(JavaDeserializer.java:139)
	at com.caucho.hessian.io.Hessian2Input.readObject(Hessian2Input.java:1671)
	at com.noob.rpc.simple.serialization.HessianSerialization.deserialize(HessianSerialization.java:25)
	at com.noob.rpc.simple.codec.RpcDecoder.decode(RpcDecoder.java:53)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v3.1.3