跳至主要內容

Dubbo源码解读与实战

holic-x...大约 32 分钟架构RPC

Dubbo源码解读与实战

学习核心

  • ① 本地缓存:降低Zookeeper压力的常用手段
  • ② 重试机制:网络操作的基本保证
  • ③ Zookeeper 注册中心(官方推荐注册中心实践)

学习资料

① 本地缓存:降低Zookeeper压力的常用手段

学习核心

  • 注册中心Registry在Dubbo架构中的核心位置,引入本地缓存概念
  • 核心接口功能说明:NodeRegistryRegistryServiceRegistryFactory核心接口功能
  • AbstractRegistry抽象类核心(提供的公共能力):本地缓存、注册/订阅、恢复/销毁

1.Dubbo 架构体系

​ Dubbo 架构图 核心主要包括四大部分:提供者、消费者、注册中心、监控中心

image-20250107150838264
  • Provider 从容器启动后的初始化阶段便会向注册中心完成注册操作,在 Provider 发生变化时,需要通知监听的 Consumer
  • Consumer 启动初始化阶段会完成对所需 Prov·ider 的订阅操作;

​ Registry 只是 Consumer 和 Provider 感知彼此状态变化的一种便捷途径而已,它们彼此的实际通讯交互过程是直接进行的,对于 Registry 来说是透明无感的。Provider 状态发生变化了,会由 Registry 主动推送订阅了该 Provider 的所有 Consumer,这保证了 Consumer 感知 Provider 状态变化的及时性,也将和具体业务需求逻辑交互解耦,提升了系统的稳定性。

​ Dubbo 中存在很多概念,但有些理解起来就特别费劲,如此处的 Registry,翻译过来的意思是“注册中心”,但它其实是应用本地的注册中心客户端真正的“注册中心”服务是其他独立部署的进程,或进程组成的集群,比如 ZooKeeper 集群。本地的 Registry 通过和 ZooKeeper 等进行实时的信息同步,维持这些内容的一致性,从而实现了注册中心这个特性。另外,就 Registry 而言,Consumer 和 Provider 只是个用户视角的概念,它们被抽象为了一条 URL 。

2.Registry 核心源码分析

​ 关注dubbo-registry源码实现:

image-20250107151338745

(1)核心接口

Node

​ 在 Dubbo 中,一般使用 Node 这个接口来抽象节点的概念。Node不仅可以表示 Provider 和 Consumer 节点,还可以表示注册中心节点。Node 接口中定义了三个非常基础的方法

/**
 * Node. (API/SPI, Prototype, ThreadSafe)
 */
public interface Node {

    URL getUrl(); // getUrl() 方法返回表示当前节点的 URL

    boolean isAvailable(); // isAvailable() 检测当前节点是否可用

    void destroy(); // destroy() 方法负责销毁当前节点并释放底层资源

}
RegistryService 接口
  • ① register() 方法和 unregister() 方法分别表示注册取消注册一个 URL
  • ② subscribe() 方法和 unsubscribe() 方法分别表示订阅取消订阅一个 URL
    • 订阅成功之后,当订阅的数据发生变化时,注册中心会主动通知第二个参数指定的 NotifyListener 对象,NotifyListener 接口中定义的 notify() 方法就是用来接收该通知的
  • ③ lookup() 方法能够查询符合条件的注册数据,它与 subscribe() 方法有一定的区别:
    • subscribe() 方法采用的是 push 模式,lookup() 方法采用的是 pull 模式
public interface RegistryService {

    void register(URL url); // 注册一个url

    void unregister(URL url); // 取消注册一个url

    void subscribe(URL url, NotifyListener listener); // 订阅一个url

    void unsubscribe(URL url, NotifyListener listener); // 取消订阅一个url

    List<URL> lookup(URL url); // 查询符合条件的注册数据

}
Registry 接口

Registry 接口继承了 RegistryService 接口和 Node 接口,它表示的就是一个拥有注册中心能力的节点,其中的 reExportRegister() 和 reExportUnregister() 方法都是委托给 RegistryService 中的相应方法

public interface Registry extends Node, RegistryService {
    default void reExportRegister(URL url) {
        register(url);
    }

    default void reExportUnregister(URL url) {
        unregister(url);
    }
}
RegistryFactory 工厂接口

RegistryFactory 接口是 Registry 的工厂接口,负责创建 Registry 对象,具体定义如下所示,其中 @SPI 注解指定了默认的扩展名为 dubbo,@Adaptive 注解表示会生成适配器类并根据 URL 参数中的 protocol 参数值选择相应的实现

@SPI("dubbo")
public interface RegistryFactory {

    @Adaptive({"protocol"})
    Registry getRegistry(URL url);

}

​ 每个 Registry 实现类都有对应的 RegistryFactory 工厂实现,每个 RegistryFactory 工厂实现只负责创建对应的 Registry 对象。RegistryFactory 继承关系图如下所示

​ Registry 继承关系图如下所示

​ 其中,RegistryFactoryWrapper 是 RegistryFactory 接口的 Wrapper 类,它在底层 RegistryFactory 创建的 Registry 对象外层封装了一个 ListenerRegistryWrapper ,ListenerRegistryWrapper 中维护了一个 RegistryServiceListener 集合,会将 register()、subscribe() 等事件通知到 RegistryServiceListener 监听器。

​ AbstractRegistryFactory 是一个实现了 RegistryFactory 接口的抽象类,提供了规范 URL 的操作以及缓存 Registry 对象的公共能力。其中,缓存 Registry 对象是使用 HashMap 集合实现的(REGISTRIES 静态字段)。在规范 URL 的实现逻辑中,AbstractRegistryFactory 会将 RegistryService 的类名设置为 URL path 和 interface 参数,同时删除 export 和 refer 参数

(2)AbstractRegistry

​ AbstractRegistry 实现了 Registry 接口,虽然 AbstractRegistry 本身在内存中实现了注册数据的读写功能,也没有什么抽象方法,但它依然被标记成了抽象类,从前面的Registry 继承关系图中可以看出,Registry 接口的所有实现类都继承了 AbstractRegistry。为了减轻注册中心组件的压力,AbstractRegistry 会把当前节点订阅的 URL 信息缓存到本地的 Properties 文件中,其核心字段如下:

  • registryUrl(URL类型)。 该 URL 包含了创建该 Registry 对象的全部配置信息,是 AbstractRegistryFactory 修改后的产物。
  • properties(Properties 类型)、file(File 类型)。 本地的 Properties 文件缓存,properties 是加载到内存的 Properties 对象,file 是磁盘上对应的文件,两者的数据是同步的。在 AbstractRegistry 初始化时,会根据 registryUrl 中的 file.cache 参数值决定是否开启文件缓存。如果开启文件缓存功能,就会立即将 file 文件中的 KV 缓存加载到 properties 字段中。当 properties 中的注册数据发生变化时,会写入本地的 file 文件进行同步。properties 是一个 KV 结构,其中 Key 是当前节点作为 Consumer 的一个 URL,Value 是对应的 Provider 列表,包含了所有 Category(例如,providers、routes、configurators 等) 下的 URL。properties 中有一个特殊的 Key 值为 registies,对应的 Value 是注册中心列表,其他记录的都是 Provider 列表。
  • syncSaveFile(boolean 类型)。 是否同步保存文件的配置,对应的是 registryUrl 中的 save.file 参数。
  • registryCacheExecutor(ExecutorService 类型)。 这是一个单线程的线程池,在一个 Provider 的注册数据发生变化的时候,会将该 Provider 的全量数据同步到 properties 字段和缓存文件中,如果 syncSaveFile 配置为 false,就由该线程池异步完成文件写入。
  • lastCacheChanged(AtomicLong 类型)。 注册数据的版本号,每次写入 file 文件时,都是全覆盖写入,而不是修改文件,所以需要版本控制,防止旧数据覆盖新数据。
  • registered(Set 类型)。 这个比较简单,它是注册的 URL 集合。
  • subscribed(ConcurrentMap 类型)。 表示订阅 URL 的监听器集合,其中 Key 是被监听的 URL, Value 是相应的监听器集合。
  • notified(ConcurrentMap>类型)。 该集合第一层 Key 是当前节点作为 Consumer 的一个 URL,表示的是该节点的某个 Consumer 角色(一个节点可以同时消费多个 Provider 节点);Value 是一个 Map 集合,该 Map 集合的 Key 是 Provider URL 的分类(Category),例如 providers、routes、configurators 等,Value 就是相应分类下的 URL 集合。

介绍完 AbstractRegistry 的核心字段之后,进一步关注 AbstractRegistry 依赖这些字段都提供了哪些公共能力:

  • ① 本地缓存
  • ② 注册/订阅
  • ③ 恢复/销毁
① 本地缓存

​ 作为一个 RPC 框架,Dubbo 在微服务架构中解决了各个服务间协作的难题;作为 Provider 和 Consumer 的底层依赖,它会与服务一起打包部署。dubbo-registry 也仅仅是其中一个依赖包,负责完成与 ZooKeeper、etcd、Consul 等服务发现组件的交互。当 Provider 端暴露的 URL 发生变化时,ZooKeeper 等服务发现组件会通知 Consumer 端的 Registry 组件,Registry 组件会调用 notify() 方法,被通知的 Consumer 能匹配到所有 Provider 的 URL 列表并写入 properties 集合中。

​ 查看AbstractRegistry#notify方法

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    
    // ...... 一系列边界检查(此处省略) ......
    
    // keep every provider's category.
    Map<String, List<URL>> result = new HashMap<>();
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) { // 需要校验 Consumer URL与Provider URL是否匹配
            String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY); // 根据Provider URL中的catrgory参数进行分类
            List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList); // 更新notified
        listener.notify(categoryList); // 调用NotifyListener
        saveProperties(url); // 更新properties集合以及底层的文件缓存
    }
}

​ 在 saveProperties() 方法中会取出 Consumer 订阅的各个分类的 URL 连接起来(中间以空格分隔),然后以 Consumer 的 ServiceKey 为键值写到 properties 中,同时 lastCacheChanged 版本号会自增。完成 properties 字段的更新之后,会根据 syncSaveFile 字段值来决定是在当前线程同步更新 file 文件,还是向 registryCacheExecutor 线程池提交任务,异步完成 file 文件的同步。本地缓存文件的具体路径是/.dubbo/dubbo-registry-[当前应用名]-[当前Registry所在的IP地址].cache

细节分析

(1)UrlUtils.isMatch():完成 Consumer URL 与 Provider URL 的匹配,依次匹配部分分析如下所示:

  • 匹配 Consumer 和 Provider 的接口(优先取 interface 参数,其次再取 path)。双方接口相同或者其中一方为“*”,则匹配成功,执行下一步
  • 匹配 Consumer 和 Provider 的 category
  • 检测 Consumer URL 和 Provider URL 中的 enable 参数是否符合条件
  • 检测 Consumer 和 Provider 端的 group、version 以及 classifier 是否符合条件

(2)URL.getServiceKey():该方法返回的 ServiceKey 是 properties 集合以及相应缓存文件中的 Key

  • ServiceKey 的格式如下:[group]/{interface(或path)}[:version]

(3)基于本地缓存提供的容错机制,保证服务的可靠性

AbstractRegistry 的核心是本地文件缓存的功能。 在 AbstractRegistry 的构造方法中,会调用 loadProperties() 方法将上面写入的本地缓存文件,加载到 properties 对象中。

​ 在网络抖动等原因而导致订阅失败时,Consumer 端的 Registry 就可以调用 getCacheUrls() 方法获取本地缓存,从而得到最近注册的 Provider URL。可见,AbstractRegistry 通过本地缓存提供了一种容错机制,保证了服务的可靠性

② 注册/订阅

​ AbstractRegistry 实现了 Registry 接口,四个方法都是集合操作:

  • registry() 方法会将当前节点要注册的 URL 缓存到 registered 集合

  • unregistry() 方法会从 registered 集合删除指定的 URL,例如当前节点下线的时候

  • subscribe() 方法会将当前节点作为 Consumer 的 URL 以及相关的 NotifyListener 记录到 subscribed 集合

  • unsubscribe() 方法会将当前节点的 URL 以及关联的 NotifyListener 从 subscribed 集合删除

​ 单看 AbstractRegistry 的实现,上述四个基础的注册、订阅方法都是内存操作,但是 Java 有继承和多态的特性,AbstractRegistry 的子类会覆盖上述四个基础的注册、订阅方法进行增强。

image-20250107160325591

③ 恢复/销毁

​ AbstractRegistry 中还有另外两个需要关注的方法:recover() 方法destroy() 方法

​ 在 Provider 因为网络问题与注册中心断开连接之后,会进行重连,重新连接成功之后,会调用 recover() 方法将 registered 集合中的全部 URL 重新走一遍 register() 方法,恢复注册数据。同样,recover() 方法也会将 subscribed 集合中的 URL 重新走一遍 subscribe() 方法,恢复订阅监听器

​ 在当前节点下线的时候,会调用 Node.destroy() 方法释放底层资源。AbstractRegistry 实现的 destroy() 方法会调用 unregister() 方法和 unsubscribe() 方法将当前节点注册的 URL 以及订阅的监听全部清理掉,其中不会清理非动态注册的 URL(即 dynamic 参数明确指定为 false)

② 重试机制:网络操作的基本保证

学习核心

  • AbstractRegistry 的 实现类 FailbackRegistry 的核心实现,其在AbstractRegistry 基础上提供了重试机制
  • 基于模板方法设计概念,FailbackRegistry 基于时间轮,在 register()/ unregister()、subscribe()/ unsubscribe() 等核心方法失败时,捕获异常并添加重试定时任务、实现重试机制,同时也在相关入口添加了相应的定时任务清理逻辑(避免无限重试)

1.重试机制核心设计

​ 在真实的微服务系统中, ZooKeeper、etcd 等服务发现组件一般会独立部署成一个集群,业务服务通过网络连接这些服务发现节点,完成注册和订阅操作。但即使是机房内部的稳定网络,也无法保证两个节点之间的请求一定成功,因此 Dubbo 这类 RPC 框架在稳定性和容错性方面,就受到了比较大的挑战。为了保证服务的可靠性,重试机制就变得必不可少了

​ 所谓的 “重试机制”就是在请求失败时,客户端重新发起一个一模一样的请求,尝试调用相同或不同的服务端,完成相应的业务操作。能够使用重试机制的业务接口得是“幂等”的,也就是无论请求发送多少次,得到的结果都是一样的,例如查询操作。

dubbo-registry 将重试机制的相关实现放到了 AbstractRegistry 的子类—— FailbackRegistry 中。如下图所示,接入 ZooKeeper、etcd 等开源服务发现组件的 Registry 实现,都继承了 FailbackRegistry,也就都拥有了失败重试的能力

FailbackRegistry 设计核心是:覆盖了 AbstractRegistry 中 register()/unregister()、subscribe()/unsubscribe() 以及 notify() 这五个核心方法,基于时间轮实现失败重试的能力;真正与服务发现组件的交互能力则是放到了 doRegister()/doUnregister()、doSubscribe()/doUnsubscribe() 以及 doNotify() 这五个抽象方法中,由具体子类实现。这是一种典型的模板方法模式的应用

public abstract class FailbackRegistry extends AbstractRegistry {

    @Override
    public void register(URL url) {
        ...... 
        try {
            // Sending a registration request to the server side
            doRegister(url);  // 与服务发现组件进行交互,具体由子类实现
        } catch (Exception e) {
            ......
        }
    }	
    
    @Override
    public void unregister(URL url) {
        ......
        try {
            // Sending a cancellation request to the server side
            doUnregister(url);
        } catch (Exception e) {
            ......
        }
    }
    
    
    @Override
    public void subscribe(URL url, NotifyListener listener) {
        ......
        try {
            // Sending a subscription request to the server side
            doSubscribe(url, listener);
        } catch (Exception e) {
            ......
        }
    }
    
    
    @Override
    public void unsubscribe(URL url, NotifyListener listener) {
        ......
        try {
            // Sending a canceling subscription request to the server side
            doUnsubscribe(url, listener);
        } catch (Exception e) {
            ......
        }
    }

    @Override
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        ......
        try {
            // FailbackRegistry.doNotify()方法实际上就是调用父类
            // AbstractRegistry.notify()方法,没有其他逻辑
            doNotify(url, listener, urls);
        } catch (Exception t) {
           ......
        }
    }

}
(1)FailbackRegistry 核心字段
  • retryTimer(HashedWheelTimer 类型):用于定时执行失败重试操作的时间轮
  • retryPeriod(int 类型):重试操作的时间间隔
  • failedRegistered(ConcurrentMap类型):注册失败的 URL 集合,其中 Key 是注册失败的 URL,Value 是对应的重试任务
  • failedUnregistered(ConcurrentMap类型):取消注册失败的 URL 集合,其中 Key 是取消注册失败的 URL,Value 是对应的重试任务
  • failedSubscribed(ConcurrentMap类型):订阅失败 URL 集合,其中 Key 是订阅失败的 URL + Listener 集合,Value 是相应的重试任务
  • failedUnsubscribed(ConcurrentMap类型):取消订阅失败的 URL 集合,其中 Key 是取消订阅失败的 URL + Listener 集合,Value 是相应的重试任务
  • failedNotified(ConcurrentMap类型):通知失败的 URL 集合,其中 Key 是通知失败的 URL + Listener 集合,Value 是相应的重试任务

​ 在 FailbackRegistry 的构造方法中,首先会调用父类 AbstractRegistry 的构造方法完成本地缓存相关的初始化操作,然后从传入的 URL 参数中获取重试操作的时间间隔(即retry.period 参数)来初始化 retryPeriod 字段,最后初始化 retryTimer 时间轮

public FailbackRegistry(URL url) {
    super(url);
    this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);

    // since the retry task will not be very much. 128 ticks is enough.
    retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
}
(2)核心方法实现分析

​ FailbackRegistry 对 register()/unregister() 方法和 subscribe()/unsubscribe() 方法的具体实现非常类似,因此此处择选其中的register() 方法的具体实现流程进行分析

  • ① 根据 registryUrl 中 accepts 参数指定的匹配模式,决定是否接受当前要注册的 Provider URL
  • ② 调用父类 AbstractRegistry 的 register() 方法,将 Provider URL 写入 registered 集合中
  • ③ 调用 removeFailedRegistered() 方法和 removeFailedUnregistered() 方法,将该 Provider URL 从 failedRegistered 集合和 failedUnregistered 集合中删除,并停止相关的重试任务
  • ④ 调用 doRegister() 方法,与服务发现组件进行交互。该方法由子类实现,每个子类只负责接入一个特定的服务发现组件
  • ⑤ 在 doRegister() 方法出现异常的时候,会根据 URL 参数以及异常的类型,进行分类处理:待注册 URL 的 check 参数为 true(默认值为 true);待注册的 URL 不是 consumer 协议;registryUrl 的 check 参数也为 true(默认值为 true)。若满足这三个条件或者抛出的异常为 SkipFailbackWrapperException,则直接抛出异常。否则,就会创建重试任务并添加到 failedRegistered 集合中

明确 register() 方法的核心流程之后,继续关注 register() 方法的具体代码实现

@Override
public void register(URL url) {
    if (!acceptable(url)) {
        logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
        return;
    }
    super.register(url);  // 完成本地文件缓存的初始化
    // 清理failedRegistered集合和failedUnregistered集合,并取消相关任务
    removeFailedRegistered(url); // 清理FailedRegisteredTask定时任务
    removeFailedUnregistered(url); // 清理FailedUnregisteredTask定时任务
    try {
        // Sending a registration request to the server side
        doRegister(url);  // 与服务发现组件进行交互,具体由子类实现
    } catch (Exception e) {
        Throwable t = e;

        // If the startup detection is opened, the Exception is thrown directly.
        // 检测check参数,决定是否直接抛出异常
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
            && url.getParameter(Constants.CHECK_KEY, true)
            && !CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // 如果抛出异常,则创建失败重试的任务,并添加到failedRegistered集合中
        addFailedRegistered(url);
    }
}

​ 从以上代码可以看出,当 Provider 向 Registry 注册 URL 的时候,如果注册失败,且未设置 check 属性,则创建一个定时任务,添加到时间轮中(调用addFailedRegistered(url);):

private void addFailedRegistered(URL url) {
    FailedRegisteredTask oldOne = failedRegistered.get(url);
    if (oldOne != null) { // 已经存在重试任务,则无须创建,直接返回
        return;
    }
    FailedRegisteredTask newTask = new FailedRegisteredTask(url, this);
    oldOne = failedRegistered.putIfAbsent(url, newTask);
    if (oldOne == null) {
        // 如果是新建的重试任务,则提交到时间轮中,等待retryPeriod毫秒后执行
        retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
    }
}

2.重试任务

FailbackRegistry.addFailedRegistered() 方法中创建的 FailedRegisteredTask 任务以及其他的重试任务,都继承了 AbstractRetryTask 抽象类,如下图所示

​ 关注 dubbo-registry-api # org\apache\dubbo\registry # retry包下的相关实现

/**
 * FailedRegisteredTask
 */
public final class FailedRegisteredTask extends AbstractRetryTask {

    private static final String NAME = "retry register";

    public FailedRegisteredTask(URL url, FailbackRegistry registry) {
        super(url, registry, NAME);
    }

    @Override
    protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
        registry.doRegister(url); // 重新注册
        registry.removeFailedRegisteredTask(url); // 删除重试任务
    }
}

​ 在AbstractRetryTask抽象类中维护了当前任务关联的 URL、当前重试的次数等信息,在其 run() 方法中,会根据重试 URL 中指定的重试次数(retry.times 参数,默认值为 3)、任务是否被取消以及时间轮的状态,决定此次任务的 doRetry() 方法是否正常执行。

@Override
public void run(Timeout timeout) throws Exception {
    // 检测定时任务状态和时间轮状态
    if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) { 
        return;
    }
    if (times > retryTimes) { // 检查重试次数
        logger.warn("Final failed to execute task " + taskName + ", url: " + url + ", retry " + retryTimes + " times.");
        return;
    }
    if (logger.isInfoEnabled()) {
        logger.info(taskName + " : " + url);
    }
    try {
        doRetry(url, registry, timeout); // 执行重试
    } catch (Throwable t) { 
        reput(timeout, retryPeriod);  // 重新添加定时任务,等待重试
    }
}

​ 如果任务的 doRetry() 方法执行出现异常,AbstractRetryTask 会通过 reput() 方法将当前任务重新放入时间轮中,并递增当前任务的执行次数

protected void reput(Timeout timeout, long tick) {
    if (timeout == null) { // 边界检查
        throw new IllegalArgumentException();
    }

    Timer timer = timeout.timer(); // 检查定时任务
    if (timer.isStop() || timeout.isCancelled() || isCancel()) {
        return;
    }
    times++; // 递增times
    // 添加定时任务
    timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS);
}

​ AbstractRetryTask 将 doRetry() 方法作为抽象方法,留给子类实现具体的重试逻辑,这也是模板方法的使用。在子类 FailedRegisteredTask 的 doRetry() 方法实现中,会再次执行关联 Registry 的 doRegister() 方法,完成与服务发现组件交互。如果注册成功,则会调用 removeFailedRegisteredTask() 方法将当前关联的 URL 以及当前重试任务从 failedRegistered 集合中删除。如果注册失败,则会抛出异常,执行 reput ()方法重试。

/**
 * FailedRegisteredTask
 */
public final class FailedRegisteredTask extends AbstractRetryTask {

    private static final String NAME = "retry register";

    public FailedRegisteredTask(URL url, FailbackRegistry registry) {
        super(url, registry, NAME);
    }

    @Override
    protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
        registry.doRegister(url); // 重新注册
        registry.removeFailedRegisteredTask(url); // 删除重试任务
    }
}

​ 继续回到FailbackRegistry#registry方法,可以看到在registry方法入口处会主动调用 removeFailedRegistered() 方法和 removeFailedUnregistered() 方法来清理指定 URL 关联的定时任务(核心代码说明如下所示)

public void register(URL url) {

    super.register(url);
    removeFailedRegistered(url); // 清理FailedRegisteredTask定时任务
    removeFailedUnregistered(url); // 清理FailedUnregisteredTask定时任务

    try {
        doRegister(url); // 与服务发现组件交互逻辑
    } catch (Exception e) {
        addFailedRegistered(url); // 异常情况下触发重试机制(创建重试任务)
    }

}

3.其他核心方法

(1)subscribe 方法

​ unregister() 方法以及 unsubscribe() 方法的实现方式与 register() 方法类似,只是调用的 do*() 抽象方法、依赖的 AbstractRetryTask 有所不同而已,但实现思路都是大同小异。

​ 上述中提到 AbstractRegistry 通过本地文件缓存实现的容错机制。FailbackRegistry.subscribe() 方法在处理异常的时候,会先获取缓存的订阅数据并调用 notify() 方法,如果没有缓存相应的订阅数据,才会检查 check 参数决定是否抛出异常。其核心逻辑之一就是回调 NotifyListener,此处进一步查看 FailbackRegistry 对 notify() 方法的覆盖,核心代码如下:

@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    // 检查url和listener不为空
    ......
        
    try {
        // FailbackRegistry.doNotify()方法实际上就是调用父类
        // AbstractRegistry.notify()方法,没有其他逻辑
        doNotify(url, listener, urls);
    } catch (Exception t) {
        // doNotify()方法出现异常,则会添加一个定时任务
        addFailedNotified(url, listener, urls);
    }
}

​ addFailedNotified() 方法会创建相应的 FailedNotifiedTask 任务,添加到 failedNotified 集合中,同时也会添加到时间轮中等待执行。如果已存在相应的 FailedNotifiedTask 重试任务,则会更新任务需要处理的 URL 集合。在 FailedNotifiedTask 中维护了一个 URL 集合,用来记录当前任务一次运行需要通知的 URL,每执行完一次任务,就会清空该集合,具体实现如下:

@Override
protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
    // 如果urls集合为空,则会通知所有Listener,该任务也就啥都不做了
    if (CollectionUtils.isNotEmpty(urls)) {
        listener.notify(urls);
        urls.clear();
    }
    reput(timeout, retryPeriod); // 将任务重新添加到时间轮中等待执行
}

​ 从上面的代码可以看出,FailedNotifiedTask 重试任务一旦被添加,就会一直运行下去,但真的是这样吗?在 FailbackRegistry 的 subscribe()、unsubscribe() 方法中,可以看到 removeFailedNotified() 方法的调用,这里就是清理 FailedNotifiedTask 任务的地方。以 FailbackRegistry.subscribe() 方法为例进行介绍:

public void subscribe(URL url, NotifyListener listener) {

    super.subscribe(url, listener);

    removeFailedSubscribed(url, listener); // 关注这个方法

    try {

        doSubscribe(url, listener);

    } catch (Exception e) {

        addFailedSubscribed(url, listener);

    }

}

// removeFailedSubscribed()方法中会清理FailedSubscribedTask、FailedUnsubscribedTask、FailedNotifiedTask三类定时任务

private void removeFailedSubscribed(URL url, NotifyListener listener) {

    Holder h = new Holder(url, listener); // 清理FailedSubscribedTask

    FailedSubscribedTask f = failedSubscribed.remove(h);

    if (f != null) {

        f.cancel();

    }

    removeFailedUnsubscribed(url, listener);// 清理FailedUnsubscribedTask

    removeFailedNotified(url, listener); // 清理FailedNotifiedTask

}

(2)recover 和 destroy 方法

​ recover 方法会直接通过 FailedRegisteredTask 任务处理 registered 集合中的全部 URL,通过 FailedSubscribedTask 任务处理 subscribed 集合中的 URL 以及关联的 NotifyListener。

​ FailbackRegistry 在生命周期结束时,会调用自身的 destroy() 方法,其中除了调用父类的 destroy() 方法之外,还会调用时间轮(即 retryTimer 字段)的 stop() 方法,释放时间轮相关的资源

③ Zookeeper 注册中心(官方推荐注册中心实践)

学习核心

  • Dubbo 接入 Zookeeper 作为注册中心的核心实现
  • 存储相关:ZookeeperRegistryFactory 核心
  • 组件使用相关:ZookeeperTransporter 、ZookeeperClient 核心,其底层依赖Apache Curator与 Zookeeper 完成交互
  • ZookeeperRegistry 是如何通过 ZookeeperClient 接入 Zookeeper,实现 Registry 的相关功能

1.Zookeeper 在 Dubbo中的应用

​ Dubbo 支持 ZooKeeper 作为注册中心服务,这也是 Dubbo 推荐使用的注册中心。ZooKeeper 是为分布式应用所设计的高可用且一致性的开源协调服务。它是一个树型的目录服务,支持变更推送,非常适合应用在生产环境中。

image-20250107170204519

Zookeeper 存储的 Dubbo 数据

  • 图中的"dubbo"节点是 Dubbo 在 Zookeeper 中的根节点,"dubbo"是这个根节点的默认名称,也可以通过配置进行修改
  • 图中 Service 这一层的节点名称是服务接口的全名(例如 demo 示例中,该节点的名称为org.apache.dubbo.demo.DemoService
  • 图中 Type 这一层的节点是 URL 的分类,一共有四种分类,分别是:providers(服务提供者列表)、consumers(服务消费者列表)、routes(路由规则列表)和 configurations(配置规则列表)
    • 根据不同的 Type 节点,图中 URL 这一层中的节点包括:Provider URL 、Consumer URL 、Routes URL 和 Configurations URL

2.dubbo-registry-zookeeper 源码解读

(1)ZookeeperRegistryFactory

​ 在讲解注册中心相关逻辑的时候介绍到,AbstractRegistryFactory 仅仅是提供了缓存 Registry 对象的功能,并未真正实现 Registry 的创建,具体的创建逻辑是由子类完成的。在 dubbo-registry-zookeeper 模块中的 SPI 配置文件(目录位置如下图所示)中,指定了RegistryFactory 的实现类—— ZookeeperRegistryFactory

image-20250107171500467

​ ZookeeperRegistryFactory 实现了 AbstractRegistryFactory,其中的 createRegistry() 方法会创建 ZookeeperRegistry 实例,后续将由该 ZookeeperRegistry 实例完成与 Zookeeper 的交互。

​ 另外,ZookeeperRegistryFactory 中还提供了一个 setZookeeperTransporter() 方法,基于 Dubbo SPI 机制,这个方法会通过 SPI 或 Spring Ioc 的方式完成自动装载

(2)ZookeeperTransporter

dubbo-remoting-zookeeper 模块是 dubbo-remoting 模块的子模块,但它并不依赖 dubbo-remoting 中的其他模块,是相对独立的

​ 简单来说,dubbo-remoting-zookeeper 模块是在 Apache Curator 的基础上封装了一套 Zookeeper 客户端,将与 Zookeeper 的交互融合到 Dubbo 的体系之中。

dubbo-remoting-zookeeper 模块中有两个核心接口:ZookeeperTransporter 接口和 ZookeeperClient 接口

image-20250107172926772

🚀 ZookeeperTransporter

ZookeeperTransporter

​ ZookeeperTransporter 只负责一件事情,那就是创建 ZookeeperClient 对象

@SPI("curator")
public interface ZookeeperTransporter {

    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    ZookeeperClient connect(URL url);

}

​ 从代码中可以看到,ZookeeperTransporter 接口被 @SPI 注解修饰,成为一个扩展点,默认选择扩展名 “curator” 的实现,其中的 connect() 方法用于创建 ZookeeperClient 实例(该方法被 @Adaptive 注解修饰,可以通过 URL 参数中的 client 或 transporter 参数覆盖 @SPI 注解指定的默认扩展名)。

​ 按照前面对 Registry 分析的思路,作为一个抽象实现,AbstractZookeeperTransporter 肯定是实现了创建 ZookeeperClient 之外的其他一些增强功能,然后由子类继承。不然的话,直接由 CuratorZookeeperTransporter 实现 ZookeeperTransporter 接口创建 ZookeeperClient 实例并返回即可,没必要在继承关系中再增加一层抽象类

public class CuratorZookeeperTransporter extends AbstractZookeeperTransporter {
    @Override
    public ZookeeperClient createZookeeperClient(URL url) {
        // 创建CuratorZookeeperClient实例
        return new CuratorZookeeperClient(url);
    }
}

AbstractZookeeperTransporter

​ AbstractZookeeperTransporter 的核心功能有如下:

  • ① 缓存 ZookeeperClient 实例;
  • ② 在某个 Zookeeper 节点无法连接时,切换到备用 Zookeeper 地址;

在配置 Zookeeper 地址的时候,可以配置多个 Zookeeper 节点的地址,这样的话,当一个 Zookeeper 节点宕机之后,Dubbo 就可以主动切换到其他 Zookeeper 节点。例如,下述 URL 配置:

zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?backup=127.0.0.1:8989,127.0.0.1:9999

​ AbstractZookeeperTransporter 的 connect() 方法首先会得到上述 URL 中配置的 127.0.0.1:2181127.0.0.1:8989127.0.0.1:9999 这三个 Zookeeper 节点地址,然后从 ZookeeperClientMap 缓存(这是一个 Map,Key 为 Zookeeper 节点地址,Value 是相应的 ZookeeperClient 实例)中查找一个可用 ZookeeperClient 实例。如果查找成功,则复用 ZookeeperClient 实例;如果查找失败,则创建一个新的 ZookeeperClient 实例返回并更新 ZookeeperClientMap 缓存

​ ZookeeperClient 实例连接到 Zookeeper 集群之后,就可以了解整个 Zookeeper 集群的拓扑,后续再出现 Zookeeper 节点宕机的情况,就是由 Zookeeper 集群本身以及 Apache Curator 共同完成故障转移

(3)ZookeeperClient

ZookeeperClient 核心

从名字就可以看出,ZookeeperClient 接口是 Dubbo 封装的 Zookeeper 客户端,该接口定义了大量的方法,都是用来与 Zookeeper 进行交互的

  • create() 方法:创建 ZNode 节点,还提供了创建临时 ZNode 节点的重载方法
  • getChildren() 方法:获取指定节点的子节点集合
  • getContent() 方法:获取某个节点存储的内容
  • delete() 方法:删除节点
  • addListener() / removeListener() 方法:添加/删除监听器
  • close() 方法:关闭当前 ZookeeperClient 实例

AbstractZookeeperClient

AbstractZookeeperClient 作为 ZookeeperClient 接口的抽象实现,主要提供了如下几项能力:

  • ① 缓存当前 ZookeeperClient 实例创建的持久 ZNode 节点;

  • ② 管理当前 ZookeeperClient 实例添加的各类监听器;

  • ③ 管理当前 ZookeeperClient 的运行状态;

  • AbstractZookeeperClient 核心字段分析:

    • Set<String> persistentExistNodePath = new ConcurrentHashSet<>();:缓存了当前 ZookeeperClient 创建的持久 ZNode 节点路径,在创建 ZNode 节点之前,会先查这个缓存,而不是与 Zookeeper 交互来判断持久 ZNode 节点是否存在,这就减少了一次与 Zookeeper 的交互

    • Set<StateListener> stateListeners = new CopyOnWriteArraySet<StateListener>();:dubbo-remoting-zookeeper 对外提供了 StateListener、DataListener 和 ChildListener 三种类型的监听器

      • StateListener:主要负责监听 Dubbo 与 Zookeeper 集群的连接状态,包括 SESSION_LOST、CONNECTED、RECONNECTED、SUSPENDED 和 NEW_SESSION_CREATED

      • DataListener:主要监听某个节点存储的数据变化

      • **ChildListener:**主要监听某个 ZNode 节点下的子节点变化

      • AbstractZookeeperClient 中维护了 stateListeners、listeners 以及 childListeners 三个集合,分别管理上述三种类型的监听器。虽然监听内容不同,但是它们的管理方式是类似的,以addDataListener为例进行分析

        @Override
        public void addDataListener(String path, DataListener listener) {
            this.addDataListener(path, listener, null);
        }
        
        @Override
        public void addDataListener(String path, DataListener listener, Executor executor) {
            // 获取指定path上的DataListener集合
            ConcurrentMap<DataListener, TargetDataListener> dataListenerMap =
                listeners.computeIfAbsent(path, k -> new ConcurrentHashMap<>());
            // 查询该DataListener关联的TargetDataListener
            TargetDataListener targetListener =
                dataListenerMap.computeIfAbsent(listener, k -> createTargetDataListener(path, k));
            // 通过TargetDataListener在指定的path上添加监听
            addTargetDataListener(path, targetListener, executor);
        }
        

        ​ 这里的 createTargetDataListener() 方法和 addTargetDataListener() 方法都是抽象方法,由 AbstractZookeeperClient 的子类实现,TargetDataListener 是 AbstractZookeeperClient 中标记的一个泛型。

        ​ 为什么 AbstractZookeeperClient 要使用泛型定义?这是因为不同的 ZookeeperClient 实现可能依赖不同的 Zookeeper 客户端组件,不同 Zookeeper 客户端组件的监听器实现也有所不同,而整个 dubbo-remoting-zookeeper 模块对外暴露的监听器是统一的。因此,这时就需要一层转换进行解耦,这层解耦就是通过 TargetDataListener 完成的。虽然在 Dubbo 2.7.7 版本中只支持 Curator,但是在 Dubbo 2.6.5 版本的源码中可以看到,ZookeeperClient 还有使用 ZkClient 的实现

CuratorZookeeperClient

CuratorZookeeperClient构造器:

public CuratorZookeeperClient(URL url) {

    super(url);
    int timeout = url.getParameter("timeout", 5000);
    int sessionExpireMs = url.getParameter("zk.session.expire", 60000);
    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
        .connectString(url.getBackupAddress())//zk地址(包括备用地址)
        .retryPolicy(new RetryNTimes(1, 1000)) // 重试配置
        .connectionTimeoutMs(timeout) // 连接超时时长
        .sessionTimeoutMs(sessionExpireMs); // session过期时间
    ... // 省略处理身份验证的逻辑
        client = builder.build();

    // 添加连接状态的监听
    client.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(url));
    client.start();
    boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
    ... // 检测connected这个返回值,连接失败抛出异常
}

​ CuratorZookeeperClient 与 Zookeeper 交互的全部操作,都是围绕着这个 Apache Curator 客户端展开的,内部类 CuratorWatcherImpl 就是 CuratorZookeeperClient 实现 AbstractZookeeperClient 时指定的泛型类,它实现了 TreeCacheListener 接口,可以添加到 TreeCache 上监听自身节点以及子节点的变化。在 childEvent() 方法的实现中我们可以看到,当 TreeCache 关注的树型结构发生变化时,会将触发事件的路径、节点内容以及事件类型传递给关联的 DataListener 实例进行回调

@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
    if (dataListener != null) {
        if (logger.isDebugEnabled()) {
            logger.debug("listen the zookeeper changed. The changed data:" + event.getData());
        }
        TreeCacheEvent.Type type = event.getType();
        EventType eventType = null;
        String content = null;
        String path = null;
        switch (type) { // 监听各类事件
            case NODE_ADDED:
                eventType = EventType.NodeCreated;
                path = event.getData().getPath();
                content = event.getData().getData() == null ? "" : new String(event.getData().getData(), CHARSET);
                break;
            case NODE_UPDATED:
                eventType = EventType.NodeDataChanged;
                path = event.getData().getPath();
                content = event.getData().getData() == null ? "" : new String(event.getData().getData(), CHARSET);
                break;
            case NODE_REMOVED:
                path = event.getData().getPath();
                eventType = EventType.NodeDeleted;
                break;
            case INITIALIZED:
                eventType = EventType.INITIALIZED;
                break;
            case CONNECTION_LOST:
                eventType = EventType.CONNECTION_LOST;
                break;
            case CONNECTION_RECONNECTED:
                eventType = EventType.CONNECTION_RECONNECTED;
                break;
            case CONNECTION_SUSPENDED:
                eventType = EventType.CONNECTION_SUSPENDED;
                break;

        }
        // 回调DataListener,传递触发事件的path、节点内容以及事件类型
        dataListener.dataChanged(path, content, eventType);
    }
}

​ 在 CuratorZookeeperClient 的 addTargetDataListener() 方法实现中,可以看到 TreeCache 的创建、启动逻辑以及添加 CuratorWatcherImpl 监听的逻辑

@Override
protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener) {
    this.addTargetDataListener(path, treeCacheListener, null);
}

@Override
protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener, Executor executor) {
    try {
        // 创建TreeCache
        TreeCache treeCache = TreeCache.newBuilder(client, path).setCacheData(false).build();
        // 缓存TreeCache对象
        treeCacheMap.putIfAbsent(path, treeCache);

        if (executor == null) {
            // 添加监听
            treeCache.getListenable().addListener(treeCacheListener);
        } else {
            treeCache.getListenable().addListener(treeCacheListener, executor);
        }

        treeCache.start(); // 启动TreeCache
    } catch (Exception e) {
        throw new IllegalStateException("Add treeCache listener for path:" + path, e);
    }
}

(4)ZookeeperRegistry

​ 回归dubbo-registry-zookeeper模块,分析基于Zookeeper的注册中心实现。

​ 在 ZookeeperRegistry 的构造方法中,会通过 ZookeeperTransporter 创建 ZookeeperClient 实例并连接到 Zookeeper 集群,同时还会添加一个连接状态的监听器。在该监听器中主要关注 RECONNECTED 状态 和 NEW_SESSION_CREATED 状态,在当前 Dubbo 节点与 Zookeeper 的连接恢复或是 Session 恢复的时候,会重新进行注册/订阅,防止数据丢失

​ doRegister() 方法和 doUnregister() 方法的实现都是通过 ZookeeperClient 找到合适的路径,然后创建(或删除)相应的 ZNode 节点。这里唯一需要注意的是,doRegister() 方法注册 Provider URL 的时候,会根据 dynamic 参数决定创建临时 ZNode 节点还是持久 ZNode 节点(默认创建临时 ZNode 节点),这样当 Provider 端与 Zookeeper 会话关闭时,可以快速将变更推送到 Consumer 端

​ 此处注意一下 toUrlPath() 这个方法得到的路径,是由下图中展示的方法拼装而成的,其中每个方法对应 Zookeeper 节点层级图中的一层

image-20250109145635450

​ doSubscribe() 方法的核心是通过 ZookeeperClient 在指定的 path 上添加 ChildListener 监听器,当订阅的节点发现变化的时候,会通过 ChildListener 监听器触发 notify() 方法,在 notify() 方法中会触发传入的 NotifyListener 监听器。

​ 从 doSubscribe() 方法的代码结构可看出,doSubscribe() 方法的逻辑分为了两个大的分支:

​ ① 一个分支是处理:订阅 URL 中明确指定了 Service 层接口的订阅请求。该分支会从 URL 拿到 Consumer 关注的 category 节点集合,然后在每个 category 节点上添加 ChildListener 监听器。下面是 Demo 示例中 Consumer 订阅的三个 path,图中展示了构造 path 各个部分的相关方法:

List<URL> urls = new ArrayList<>();

for (String path : toCategoriesPath(url)) { // 要订阅的所有path
    // 一个NotifyListener关联一个ChildListener,这个ChildListener会回调
    // ZookeeperRegistry.notify()方法,其中会回调当前NotifyListener
    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
    ChildListener zkListener = listeners.computeIfAbsent(listener,
                                  k -> (parentPath, currentChilds) ->
                                  ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
    // 尝试创建持久节点,主要是为了确保当前path在Zookeeper上存在
    zkClient.create(path, false);
    // 这一个ChildListener会添加到多个path上
    List<String> children = zkClient.addChildListener(path, zkListener);
    if (children != null) {
        // 如果没有Provider注册,toUrlsWithEmpty()方法会返回empty协议的URL
        urls.addAll(toUrlsWithEmpty(url, path, children));
    }
}
// 初次订阅的时候,会主动调用一次notify()方法,通知NotifyListener处理当前已有的
// URL等注册数据
notify(url, listener, urls);

​ ② 另一个分支是处理:监听所有 Service 层节点的订阅请求,例如,Monitor 就会发出这种订阅请求,因为它需要监控所有 Service 节点的变化。这个分支的处理逻辑是在根节点上添加一个 ChildListener 监听器,当有 Service 层的节点出现的时候,会触发这个 ChildListener,其中会重新触发 doSubscribe() 方法执行上一个分支的逻辑(即前面分析的针对确定的 Service 层接口订阅分支)

String root = toRootPath(); // 获取根节点
// 获取NotifyListener对应的ChildListener
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
    for (String child : currentChilds) {
        child = URL.decode(child);
        if (!anyServices.contains(child)) {
            anyServices.add(child); // 记录该节点已经订阅过
            // 该ChildListener要做的就是触发对具体Service节点的订阅
            subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                                       Constants.CHECK_KEY, String.valueOf(false)), k);
        }
    }
});
zkClient.create(root, false); // 保证根节点存在
// 第一次订阅的时候,要处理当前已有的Service层节点
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
    for (String service : services) {
        service = URL.decode(service);
        anyServices.add(service);
        subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                                                     Constants.CHECK_KEY, String.valueOf(false)), listener);
    }
}

​ ZookeeperRegistry 提供的 doUnsubscribe() 方法实现会将 URL 和 NotifyListener 对应的 ChildListener 从相关的 path 上删除,从而达到不再监听该 path 的效果

评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v3.1.3