前言
Dubbo3.0中有个特别的机制“服务自省”,对于这个机制很多人都很陌生,我用自己的理解来讲讲这个机制。
版本说明
Dubbo版本:3.0.9
定义
所谓的服务自省就是一个元数据同步机制。在Dubbo3.0中,原来面向RPC定义的注册中心数据被拆分为服务元数据和服务地址信息,Dubbo的流量管理能力依赖服务的元数据,因此需要一个机制能很好的去把Provider端的元数据同步到Consumer端。
机制讲解
服务自省是Consumer端获取Provider端服务元数据的一个机制,那么肯定Provider端和Consumer端都有对应的处理。
Provider暴露MetadataService
Provider需要提供服务的元数据,所以在Provider端启动时会先暴露所有的业务服务,当所有业务服务都准备好之后(启动并且已经向注册中心注册),暴露MetadataService供Consumer端来查询元数据(此时业务服务也元数据服务都能正常调用,注册中心也有了元数据和应用地址数据)。
Consumer监听MetadataService
Consumer端需要在调用时决定调用哪个后端服务(也就是说要进行流量管理),因此在Consumer端启动时先通过应用名查询注册中心得到后端Provider的地址,并完成订阅。拿到Provider地址后,直接发起元数据查询,通过MetadataService得到服务端元数据,并且会有一个监听器,监听服务端元数据的变化(即reversion机制),一旦服务端元数据发生变化,Consumer就会重新发起MetadataService调用获取元数据。
源码解析
Provider暴露MetadataService
Dubbo暴露服务的起点是org.apache.dubbo.config.ServiceConfig#export
,当然实际的暴露动作是委托给org.apache.dubbo.rpc.Protocol#export
。但这是单个服务的暴露,MetadataService肯定是要在所有业务服务暴露完之后再暴露,因此我们需要从DubboBootstrap中去寻找答案(因为现在Dubbo的启动都是由这个类完成)。
DubboBootstrap这个类的结构我们这次不讲,我们需要了解的是Dubbo3.0有两个Deployer类负责启动。ApplicationDeployer负责应用的初始化和启动,ModuleDeployer负责服务的暴露和引用。因此我们可以到ModuleDeployer中找答案。
org.apache.dubbo.config.deploy.DefaultModuleDeployer#start
从这个模板方法里面我们可以看到在暴露和引用服务之后我们会调用org.apache.dubbo.config.deploy.DefaultModuleDeployer#onModuleStarted
后续的调用链路如下。
org.apache.dubbo.config.deploy.DefaultApplicationDeployer#notifyModuleChanged
-> org.apache.dubbo.config.deploy.DefaultApplicationDeployer#prepareApplicationInstance
-> org.apache.dubbo.config.deploy.DefaultApplicationDeployer#exportMetadataService
-> org.apache.dubbo.config.metadata.ExporterDeployListener#onModuleStarted
-> org.apache.dubbo.config.metadata.ConfigurableMetadataServiceExporter#export
最终暴露元数据服务的动作就在org.apache.dubbo.config.metadata.ConfigurableMetadataServiceExporter#export
中。
Consumer监听MetadataService
这里和Provider的分析过程刚好相反,因为Consumer需要让每个ReferenceConfig都要监听服务端的元数据变化,否则流量管理能力会失效。所以,我们直接从org.apache.dubbo.config.ReferenceConfig#get
看就好。
首先初始化过程会调用init
方法,我们直接跳过一些其他的配置代码,我们知道代理类的创建其实也是委托给org.apache.dubbo.rpc.Protocol#refer
,具体来说调用链路如下。
org.apache.dubbo.registry.integration.RegistryProtocol#refer
-> org.apache.dubbo.registry.integration.RegistryProtocol#doRefer
-> org.apache.dubbo.registry.integration.RegistryProtocol#interceptInvoker
-> org.apache.dubbo.registry.client.migration.MigrationRuleListener#onRefer
-> org.apache.dubbo.registry.client.migration.MigrationRuleHandler#doMigrate
-> org.apache.dubbo.registry.client.migration.MigrationRuleHandler#refreshInvoker
-> org.apache.dubbo.registry.client.migration.ServiceDiscoveryMigrationInvoker#migrateToApplicationFirstInvoker
-> org.apache.dubbo.registry.client.migration.MigrationInvoker#refreshServiceDiscoveryInvoker
-> org.apache.dubbo.registry.integration.RegistryProtocol#getServiceDiscoveryInvoker
-> org.apache.dubbo.registry.integration.RegistryProtocol#doCreateInvoker
-> org.apache.dubbo.registry.integration.DynamicDirectory#subscribe
->org.apache.dubbo.registry.client.ServiceDiscoveryRegistry#subscribe
-> org.apache.dubbo.registry.client.ServiceDiscoveryRegistry#doSubscribe
-> org.apache.dubbo.registry.client.ServiceDiscoveryRegistry#subscribeURLs
最终在org.apache.dubbo.registry.client.ServiceDiscoveryRegistry#subscribeURLs
中我们能看到整个Consumer端的服务自省过程。
关键代码如下:
public class ServiceDiscoveryRegistry {
protected void subscribeURLs(URL url, NotifyListener listener, Set<String> serviceNames) {
serviceNames = toTreeSet(serviceNames);
String serviceNamesKey = toStringKeys(serviceNames);
//...
Lock appSubscriptionLock = getAppSubscription(serviceNamesKey);
try {
appSubscriptionLock.lock();
ServiceInstancesChangedListener serviceInstancesChangedListener = serviceListeners.get(serviceNamesKey);
if (serviceInstancesChangedListener == null) {
serviceInstancesChangedListener = serviceDiscovery.createListener(serviceNames);
serviceInstancesChangedListener.setUrl(url);
for (String serviceName : serviceNames) {
List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
if (CollectionUtils.isNotEmpty(serviceInstances)) {
serviceInstancesChangedListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
}
}
serviceListeners.put(serviceNamesKey, serviceInstancesChangedListener);
}
if (!serviceInstancesChangedListener.isDestroyed()) {
serviceInstancesChangedListener.setUrl(url);
listener.addServiceListener(serviceInstancesChangedListener);
serviceInstancesChangedListener.addListenerAndNotify(protocolServiceKey, listener);
serviceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener);
} else {
logger.info(String.format("Listener of %s has been destroyed by another thread.", serviceNamesKey));
serviceListeners.remove(serviceNamesKey);
}
} finally {
appSubscriptionLock.unlock();
}
}
}
从上面看到org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener
被创建,并且在监听ServiceInstancesChangedEvent
,通过org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener#onEvent
-> org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener#doOnEvent
这里就是Consumer端发生服务自省的地方。从代码中可以看到对MetadataService的调用是通过org.apache.dubbo.registry.client.ServiceDiscovery#getRemoteMetadata(java.lang.String, java.util.List<org.apache.dubbo.registry.client.ServiceInstance>)