1. 使用
一般都是配置 RestTemplate
使用来达到负载均衡的效果。
<!--maven 依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-loadbalancer</artifactId>
</dependency>
@Configuration
public class MainConfiguration {
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}
}
加了 @LoadBalanced
注解后 RestTemplate
就不能使用 ip:port
的格式去访问服务了,而是需要使用 http://serviceName/路径
的格式,根据 serviceName
去注册中心获取实际的服务信息,最后转换为要请求的地址。
2. @LoadBalanced 注解
进入注解查看注释:
/**
* Annotation to mark a RestTemplate or WebClient bean to be configured to use a
* LoadBalancerClient.
*/
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
注释的意思是:用于将 RestTemplate
或 WebClient
的 bean
标记为使用 LoadBalancerClient
的注解。即最终使用的是 LoadBalancerClient
。
还有一个要注意的,@LoadBalanced
中包含了 @Qualifier
注解,@Qualifier
注解是当 SpringIOC
容器中有多个同类型的 bean
时,按名称注入,放到 @LoadBalanced
有什么用呢?这个后面再分析哦。
3. LoadBalancerClient
顾名思义:负载均衡的客户端。
public interface LoadBalancerClient extends ServiceInstanceChooser {
/**
* 根据服务名发送请求
*/
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
/**
* 根据服务名和服务实例发送请求
*/
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
/**
* URI 转换,将 serviceName 转换为 ip:port
*/
URI reconstructURI(ServiceInstance instance, URI original);
}
根据接口分析,LoadBalancerClient
可以向特定的服务实例发送请求。且它继承自 ServiceInstanceChooser
即服务实例选择器,我们照样看下源码:
public interface ServiceInstanceChooser {
/**
* 根据服务名选择实例.
*/
ServiceInstance choose(String serviceId);
/**
* 根据服务名和请求选择实例.
*/
<T> ServiceInstance choose(String serviceId, Request<T> request);
}
即:ServiceInstanceChooser
用来选择实例,LoadBalancerClient
用来向实例发送请求。
4. 自动配置类
阅读 Spring
系列源码,一般都是从两个地方入手:
注解
自动配置类
基本注解前面讲过,接下来看一下 loadBalancer
中的自动配置类,它的自动配置类主要在两个地方。
spring-cloud-commons
包下:
spring-cloud-loadBalancer
包下:
4.1 spring-cloud-commons 中的 LoadBalancerAutoConfiguration
这里只关注和 RestTemplate
相关的简单配置,异步和 reactive
的配置这里不分析,有兴趣的可以自行研究。
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class LoadBalancerAutoConfiguration {
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Autowired(required = false)
private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
}
@Configuration(proxyBeanMethods = false)
@Conditional(RetryMissingOrDisabledCondition.class)
static class LoadBalancerInterceptorConfig {
@Bean
public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
// 还有重试配置
}
4.1.1 @ConditionalOnBean(LoadBalancerClient.class)
存在 LoadBalancerClient
的 bean
的情况下负载均衡才会生效。
没有
LoadBalancerClient
的话,也就不能根据服务名发送请求,那还要负载均衡干什么。
4.1.2 注入 RestTemplate
筛选出所有添加了@LoadBalanced
的 RestTemplate
的 bean
,且 required
为 false
,即使容器中没有,也不会报错。
4.1.3 loadBalancedRestTemplateInitializerDeprecated
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
注意这段代码,将 SmartInitializingSingleton
对象注册为一个 Bean
,SmartInitializingSingleton
实现了一个回调接口,在容器中所有单例 Bean
都初始化完成之后,Spring
会自动回调该接口的 afterSingletonsInstantiated()
方法,以执行一些额外的初始化操作,这里就是为每个 RestTemplate
添加 LoadBalancerInterceptor
拦截器。这里的方法体就是afterSingletonsInstantiated()
的方法逻辑,这里用的是 lamda
写法。
为什么使用
SmartInitializingSingleton
?
afterSingletonsInstantiated()
,该方法会在容器中所有单例Bean
初始化完成后被调用。通过实现这个接口,可以确保在所有单例Bean
初始化完成之后再进行相关的初始化操作,以避免可能的依赖问题。简化配置:通过将初始化代码封装在一个
SmartInitializingSingleton
对象中,可以将其声明为一个Spring Bean
,并通过@Bean
注解进行配置。这样,Spring
在启动过程中会自动执行初始化操作,无需手动调用。除了这种,还可以用@PostConstruct
、BeanPostProcessor
实现,但是相比于在配置类中使用SmartInitializingSingleton
更为繁琐。
4.1.4 LoadBalancerRequestFactory
public class LoadBalancerRequestFactory {
private LoadBalancerClient loadBalancer;
private List<LoadBalancerRequestTransformer> transformers;
public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer,
List<LoadBalancerRequestTransformer> transformers) {
this.loadBalancer = loadBalancer;
this.transformers = transformers;
}
public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer) {
this.loadBalancer = loadBalancer;
}
public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) {
return instance -> {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
if (this.transformers != null) {
for (LoadBalancerRequestTransformer transformer : this.transformers) {
serviceRequest = transformer.transformRequest(serviceRequest, instance);
}
}
return execution.execute(serviceRequest, body);
};
}
}
看到这个 createRequest
方法是不是很熟悉,和 RestTemplate
中创建请求的工厂中的方法类似,如果还没看过 RestTemplate
源码的小伙伴建议看一下哦,不看的话下面还有一些源码分析可能一知半解。
所以 LoadBalancerRequestFactory
就是创建负载均衡请求的工厂。
4.1.5 拦截器配置
@Configuration(proxyBeanMethods = false)
@Conditional(RetryMissingOrDisabledCondition.class)
static class LoadBalancerInterceptorConfig {
@Bean
public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
先创建
LoadBalancerInterceptor
拦截器并注入到容器。创建
RestTemplateCustomizer
将拦截器设置到RestTemplate
中,而RestTemplateCustomizer
是一个函数式接口,这里用的也是lamda
写法。相当于new
了一个RestTemplateCustomizer
的实现类并返回。因此这里并没有真正的将拦截器设置到RestTemplate
,就想你写了一个方法,没有执行的话肯定不能生效。而真正执行的地方需要回到4.3
章节处。
注意这个内部配置类上还有一个
@Conditional(RetryMissingOrDisabledCondition.class)
注解,即在不开启重试机制的情况下负载均衡在会生效。为什么这样设计?我的理解是有两个原因:
请求幂等问题:如果请求是幂等的,那么每次重试的结果应该是一样的。在这种情况下,负载均衡和重试之间应该不会存在冲突。但是如果不是幂等,在进行重试操作时同时进行了负载均衡,可能会导致请求被发送到不同的服务实例上,进而产生不一致的结果。
网络延迟问题:如果网络延迟较高,请求在较长时间内没有响应,重试操作可能会导致多个请求同时发送到服务实例上,增加了服务端的负载压力。
但是,
loadBalancer
本身也有重试机制,这不会冲突吗?这部分后面分析,关于重试的配置依旧在LoadBalancerAutoConfiguration
中,前面的代码并没有贴出来。
// 下面两个条件任意一个满足时,RetryMissingOrDisabledCondition条件才会被满足
private static class RetryMissingOrDisabledCondition extends AnyNestedCondition {
RetryMissingOrDisabledCondition() {
super(ConfigurationPhase.REGISTER_BEAN);
}
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class RetryTemplateMissing {
}
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.retry.enabled", havingValue = "false")
static class RetryDisabled {
}
}
总结:
spring-cloud-commons
中的LoadBalancerAutoConfiguration
是用来配置RestTemplate
拦截器相关的。
4.2 spring-cloud-loadBalancer 包下的 LoadBalancerAutoConfiguration
还有一个相同名字的 LoadBalancerAutoConfiguration
在这个包下面,来看下和 commons
包下的有什么不同。
@Configuration(proxyBeanMethods = false)
@LoadBalancerClients
@EnableConfigurationProperties(LoadBalancerProperties.class)
@AutoConfigureBefore({ ReactorLoadBalancerClientAutoConfiguration.class,
LoadBalancerBeanPostProcessorAutoConfiguration.class })
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.enabled", havingValue = "true", matchIfMissing = true)
public class LoadBalancerAutoConfiguration {
private final ObjectProvider<List<LoadBalancerClientSpecification>> configurations;
public LoadBalancerAutoConfiguration(ObjectProvider<List<LoadBalancerClientSpecification>> configurations) {
this.configurations = configurations;
}
@Bean
@ConditionalOnMissingBean
public LoadBalancerZoneConfig zoneConfig(Environment environment) {
return new LoadBalancerZoneConfig(environment.getProperty("spring.cloud.loadbalancer.zone"));
}
@ConditionalOnMissingBean
@Bean
public LoadBalancerClientFactory loadBalancerClientFactory() {
LoadBalancerClientFactory clientFactory = new LoadBalancerClientFactory();
clientFactory.setConfigurations(this.configurations.getIfAvailable(Collections::emptyList));
return clientFactory;
}
}
4.2.1 @LoadBalancerClients 注解
@Configuration(proxyBeanMethods = false)
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE })
@Documented
@Import(LoadBalancerClientConfigurationRegistrar.class)
public @interface LoadBalancerClients {
LoadBalancerClient[] value() default {};
/**
* 所有负载均衡客户端的默认配置
*/
Class<?>[] defaultConfiguration() default {};
}
@Configuration(proxyBeanMethods = false)
@Import(LoadBalancerClientConfigurationRegistrar.class)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LoadBalancerClient {
/**
* 负载均衡客户端名称,即服务名
*/
@AliasFor("name")
String value() default "";
@AliasFor("value")
String name() default "";
/**
* 对应服务使用的配置
*/
Class<?>[] configuration() default {};
}
这个注解导入了一个 LoadBalancerClientConfigurationRegistrar
类,另外注解的 value
属性的类型又是另外一个注解 @LoadBalancerClient
。这个注解时用来干嘛的呢?
在微服务架构中,通常会有多个服务,
5. RestTemplate 调用过程分析
前面部分相同的逻辑这里就不再分析,不清楚的小伙伴去看下 RestTemplate
源码分析的的文章哦。这里直接进入到 InterceptingRequestExecution # execute
方法处,这里会遍历所有的拦截器并执行。
5.1 LoadBalancerInterceptor # intercept
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
}
使用负载均衡的 RestTemplate
使用服务名调用,所以 originalUri.getHost()
得到的就是服务名,然后调用 LoadBalancerClient
的 execute
方法执行请求,注意 request
参数是通过 requestFactory
生成的。
5.2 LoadBalancerRequestFactory # createRequest
// LoadBalancerRequestFactory
public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) {
return instance -> {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
if (this.transformers != null) {
for (LoadBalancerRequestTransformer transformer : this.transformers) {
serviceRequest = transformer.transformRequest(serviceRequest, instance);
}
}
return execution.execute(serviceRequest, body);
};
}
// LoadBalancerRequest
public interface LoadBalancerRequest<T> {
T apply(ServiceInstance instance) throws Exception;
}
又是一个函数式接口,在调用其 apply
方法时,进行回调。
5.3 LoadBalancerClient # execute
LoadBalancerClient
的唯一实现类是 BlockingLoadBalancerClient
。
// BlockingLoadBalancerClient
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
String hint = getHint(serviceId);
LoadBalancerRequestAdapter<T, DefaultRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(request,new DefaultRequestContext(request, hint));
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
// 根据服务名称以及请求获取服务实例
ServiceInstance serviceInstance = choose(serviceId, lbRequest);
if (serviceInstance == null) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, new EmptyResponse())));
throw new IllegalStateException("No instances available for " + serviceId);
}
return execute(serviceId, serviceInstance, lbRequest);
}
前面看 LoadBalancerAutoConfiguration
时需要使用到 LoadBalancerClient
,那 LoadBalancerClient
是何时注入的呢?
@Configuration(proxyBeanMethods = false)
@LoadBalancerClients
@AutoConfigureAfter(LoadBalancerAutoConfiguration.class)
@AutoConfigureBefore({ org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration.class,
AsyncLoadBalancerAutoConfiguration.class })
@ConditionalOnClass(RestTemplate.class)
public class BlockingLoadBalancerClientAutoConfiguration {
@Bean
@ConditionalOnBean(LoadBalancerClientFactory.class)
@ConditionalOnMissingBean
public LoadBalancerClient blockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory,
LoadBalancerProperties properties) {
return new BlockingLoadBalancerClient(loadBalancerClientFactory, properties);
}
.....
}
@AutoConfigureBefore
指定了改配置必须在 LoadBalancerAutoConfiguration
之前注入,保证 LoadBalancerAutoConfiguration
中使用的 LoadBalancerClient
不为空。
而 LoadBalancerClient
需要用到 LoadBalancerClientFactory
, LoadBalancerClientFactory
的注入则在前面提到过的 spring-cloud-loadBalancer
包下的 LoadBalancerAutoConfiguration
。
5.3.1 LoadBalancerLifecycle
顾名思义,负载均衡器的生命周期,进入到这个接口看看方法:
public interface LoadBalancerLifecycle<RC, RES, T> {
/**
* 负载均衡前执行
*/
void onStart(Request<RC> request);
/**
* 服务实例选择后,发送请求前执行
*/
void onStartRequest(Request<RC> request, Response<T> lbResponse);
/**
* 负载均衡后执行
*/
void onComplete(CompletionContext<RES, T, RC> completionContext);
}
onStart
方法在负载均衡(即 choose
方法)前执行, onComplete
方法在负载均衡后执行。默认的实现是 MicrometerStatsLoadBalancerLifecycle
,用于监控指标收集。
5.3.2 服务实例选择
// BlockingLoadBalancerClient
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
// 负载均衡策略选择
ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
if (loadBalancer == null) {
return null;
}
// 实例选择
Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
if (loadBalancerResponse == null) {
return null;
}
return loadBalancerResponse.getServer();
}
loadBalancerClientFactory.getInstance
底层使用的NamedContextFactory
这个类,用来实现子容器隔离,即SpringCloud LoadBalancer
为每个serviceName
都分配了一个 ApplicationContext ,为什么需要这样做呢?
不同的微服务应用配置可能需要不一样。不同服务的接口响应速度不一致,则需要配置不同的超时时间;还有对应服务发现的注册中心可能不一样,比如服务
A
用的是Nacos
,服务B
用的是Eureka
。在这种需求下,不同微服务的客户端有不同的以及相同的配置,有不同的Bean
,也有相同的Bean
。所以,我们可以针对每一个微服务将他们的Bean
所处于ApplicationContext
独立开来,不同微服务客户端使用不同的ApplicationContext
。NamedContextFactory
就是用来实现这种机制的。关于
NamedContextFactory
的使用可以看我的另一篇文章()。
ReactiveLoadBalancer
是一个接口,默认提供了两种负载均衡策略:随机 和 轮询。这里 getServiceInstance
方法传入了服务名,说明是要根据服务名选择负载策略,不同的服务选择的负载策略可以不一样(怎么选择的我们后续分析,这里先了解一下就行)。
默认是轮询策略,具体配置在:
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
public class LoadBalancerClientConfiguration {
private static final int REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER = 193827465;
@Bean
@ConditionalOnMissingBean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new RoundRobinLoadBalancer(
loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
}
接下来看一下 RoundRobinLoadBalancer
是怎么轮询的。
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
// serviceInstances 是服务列表
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}
// 真正选择
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances) {
Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + serviceId);
}
return new EmptyResponse();
}
int pos = Math.abs(this.position.incrementAndGet());
// 取模算法轮询
ServiceInstance instance = instances.get(pos % instances.size());
return new DefaultResponse(instance);
}
先通过 ServiceInstanceListSupplier
获得一个服务实例列表,然后从服务列表中通过一定的策略(对于RoundRobinLoadBalancer
来说,就是轮询策略)选择一个具体的服务实例。而 ServiceInstanceListSupplier
还是在 LoadBalancerClientConfiguration
中注入的,
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
public class LoadBalancerClientConfiguration {
.....
@Configuration(proxyBeanMethods = false)
@ConditionalOnReactiveDiscoveryEnabled
@Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER)
public static class ReactiveSupportConfiguration {
@Bean
@ConditionalOnBean(ReactiveDiscoveryClient.class)
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "default",
matchIfMissing = true)
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching().build(context);
}
}
}
withBlockingDiscoveryClient()
会创建一个 DiscoveryClientServiceInstanceListSupplier
的实现类,如果用的是 Nacos
注册中心的话,就是从 Nacos
获取服务实例列表。
5.3.3 请求发送
服务实例选择完后,就可以发送请求了。
// BlockingLoadBalancerClient
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request)
throws IOException {
// ......
try {
T response = request.apply(serviceInstance);
Object clientResponse = getClientResponse(response);
supportedLifecycleProcessors
.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS, lbRequest, defaultResponse, clientResponse)));
return response;
}// catch .....
}
代码有一行 request.apply(serviceInstance)
执行的就是 5.2
提到的函数式接口。
将原始请求包装为 ServiceRequestWrapper
,下面的执行逻辑还是 RestTemplate
那一套。
6. 重试机制下的流程
前面说过开启重试后 LoadBalancerInterceptorConfig
不会生效,拦截器也将不会配置到 RestTemplate
里面,那重试机制开启的情况下如何进行负载均衡呢?答案还是在 common
包下的 LoadBalancerAutoConfiguration
,就在 LoadBalancerInterceptorConfig
这个静态类的下面。
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class LoadBalancerAutoConfiguration {
....
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RetryTemplate.class)
public static class RetryAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public LoadBalancedRetryFactory loadBalancedRetryFactory() {
return new LoadBalancedRetryFactory() {};
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RetryTemplate.class)
@ConditionalOnBean(ReactiveLoadBalancer.Factory.class)
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.retry.enabled", matchIfMissing = true)
public static class RetryInterceptorAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public RetryLoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
LoadBalancerProperties properties, LoadBalancerRequestFactory requestFactory,
LoadBalancedRetryFactory loadBalancedRetryFactory,
ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory) {
return new RetryLoadBalancerInterceptor(loadBalancerClient, properties, requestFactory,
loadBalancedRetryFactory, loadBalancerFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
}
逻辑和之前讲过的差不多,配置了一个重试的工厂和一个重试的拦截器。
6.1 LoadBalancedRetryFactory
先来看一个这个接口:
public interface LoadBalancedRetryFactory {
default LoadBalancedRetryPolicy createRetryPolicy(String service, ServiceInstanceChooser serviceInstanceChooser) {
return null;
}
default RetryListener[] createRetryListeners(String service) {
return new RetryListener[0];
}
default BackOffPolicy createBackOffPolicy(String service) {
return new NoBackOffPolicy();
}
}
根据方法名,可以判断是根据服务名创建对应的重试策略、重试监听器和退避策略。
6.2 RetryLoadBalancerInterceptor
跟 LoadBalancerInterceptor
相比,RetryLoadBalancerInterceptor
的构造方法多了三个参数:
LoadBalancerProperties
:负载均衡配置LoadBalancedRetryFactory
:重试工厂ReactiveLoadBalancer.Factory
:负载策略的工厂
进去看 intercept
方法,方法里面大部分代码都是在最后一行中的 lamda
方法,这里拆开来看:
// RetryLoadBalancerInterceptor
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
final String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
final LoadBalancedRetryPolicy retryPolicy = lbRetryFactory.createRetryPolicy(serviceName, loadBalancer);
RetryTemplate template = createRetryTemplate(serviceName, request, retryPolicy);
// 注意这里简写了retryCallback 和 recoveryCallback,实际上是两个函数式接口
return template.execute(retryCallback, recoveryCallback);
}
前几行都是一样的,而第四行使用重试工厂创建了一个重试策略。
6.2.1 重试配置
LoadBalancerProperties
里面包含重试相关的配置,可以通过配置文件修改:
@ConfigurationProperties("spring.cloud.loadbalancer")
public class LoadBalancerProperties {
.....
private Retry retry = new Retry();
public static class Retry {
// 重试开关
private boolean enabled = true;
//
private boolean retryOnAllOperations = false;
// 在相同实例上的最大重试次数
private int maxRetriesOnSameServiceInstance = 0;
// 在下一个实例上重试的最大次数
private int maxRetriesOnNextServiceInstance = 1;
private Set<Integer> retryableStatusCodes = new HashSet<>();
private Backoff backoff = new Backoff();
}
}
6.2.2 创建重试策略
// 默认实现
public class BlockingLoadBalancedRetryFactory implements LoadBalancedRetryFactory {
private final LoadBalancerProperties loadBalancerProperties;
public BlockingLoadBalancedRetryFactory(LoadBalancerProperties loadBalancerProperties){
this.loadBalancerProperties = loadBalancerProperties;
}
@Override
public LoadBalancedRetryPolicy createRetryPolicy(String serviceId, ServiceInstanceChooser serviceInstanceChooser) {
return new BlockingLoadBalancedRetryPolicy(loadBalancerProperties);
}
}
这个直接 new
了一个 BlockingLoadBalancedRetryPolicy
返回。
6.2.3 创建RetryTemplate
private RetryTemplate createRetryTemplate(String serviceName, HttpRequest request,
LoadBalancedRetryPolicy retryPolicy) {
RetryTemplate template = new RetryTemplate();
// 创建退避策略,默认是 NoBackOffPolicy--发生异常后立即重试,不会等待一段时间
BackOffPolicy backOffPolicy = lbRetryFactory.createBackOffPolicy(serviceName);
template.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy);
// 重试次数达到上限而仍然失败,则抛出最后一次重试过程中产生的异常
template.setThrowLastExceptionOnExhausted(true);
// 重试监听器,默认是空
RetryListener[] retryListeners = lbRetryFactory.createRetryListeners(serviceName);
if (retryListeners != null && retryListeners.length != 0) {
template.setListeners(retryListeners);
}
// 将前面创建的 retryPolicy 封装为 InterceptorRetryPolicy
template.setRetryPolicy(!properties.getRetry().isEnabled() || retryPolicy == null ? new NeverRetryPolicy(): new InterceptorRetryPolicy(request, retryPolicy, loadBalancer, serviceName));
return template;
}
InterceptorRetryPolicy
实现了 spring-retry
包下的 RetryPolicy
接口 ,我们看一下它判断是否需要进行重试的方法:
// InterceptorRetryPolicy
public boolean canRetry(RetryContext context) {
LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
if (lbContext.getRetryCount() == 0 && lbContext.getServiceInstance() == null) {
lbContext.setServiceInstance(null);
return true;
}
// 调用 BlockingLoadBalancedRetryFactory 的 canRetryNextServer 方法
return policy.canRetryNextServer(lbContext);
}
// BlockingLoadBalancedRetryFactory
public boolean canRetryNextServer(LoadBalancedRetryContext context) {
return nextServerCount <= properties.getRetry().getMaxRetriesOnNextServiceInstance() && canRetry(context);
}
public boolean canRetrySameServer(LoadBalancedRetryContext context) {
return sameServerCount < properties.getRetry().getMaxRetriesOnSameServiceInstance() && canRetry(context);
}
public boolean canRetry(LoadBalancedRetryContext context) {
HttpMethod method = context.getRequest().getMethod();
return HttpMethod.GET.equals(method) || properties.getRetry().isRetryOnAllOperations();
}
可以清晰的看到 LoadBalancer
默认的重试条件是 :
没有在下一个实例达到最大重试次数
GET
请求或retryOnAllOperations
为 true。
当
retryOnAllOperations
为true
时,会对所有请求(PUT
等)都执行重试。当retryOnAllOperations
为false
时,只会对标记为可重试的请求执行重试。
6.2.4 执行请求
最后一行执行请求,execute
方法的两个参数都是回调接口,RetryLoadBalancerInterceptor
使用的是 lamda
函数。
// RetryTemplate
public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback) throws E {
return this.doExecute(retryCallback, recoveryCallback, (RetryState)null);
}
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {
RetryPolicy retryPolicy = this.retryPolicy;
BackOffPolicy backOffPolicy = this.backOffPolicy;
// 创建重试上下文
RetryContext context = open(retryPolicy, state);
........
RetrySynchronizationManager.register(context);
Throwable lastException = null;
boolean exhausted = false;
try {
// 执行监听器的 open 方法
boolean running = doOpenInterceptors(retryCallback, context);
// 每个监听器都返回 true 才继续执行
if (!running) {
throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");
}
......
// 满足重试策略的情况下进行重试
while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
......
lastException = null;
return retryCallback.doWithRetry(context);
}
catch (Throwable e) {
lastException = e;
// 记录异常
try {
registerThrowable(retryPolicy, state, context, e);
}
catch (Exception ex) {
throw new TerminatedRetryException("Could not register throwable", ex);
}
finally {
doOnErrorInterceptors(retryCallback, context, e);
}
// 执行退避策略
if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
backOffPolicy.backOff(backOffContext);
} catch ......
}
......
if (shouldRethrow(retryPolicy, context, state)) {
....
throw RetryTemplate.<E>wrapIfNecessary(e);
}
}
if (state != null && context.hasAttribute(GLOBAL_STATE)) {
break;
}
}
......
exhausted = true;
return handleRetryExhausted(recoveryCallback, context, state);
}
catch (Throwable e) {
throw RetryTemplate.<E>wrapIfNecessary(e);
}
finally {
close(retryPolicy, context, state, lastException == null || exhausted);
doCloseInterceptors(retryCallback, context, lastException);
RetrySynchronizationManager.clear();
}
}
方法虽然长,但是逻辑比较清晰,首先在循环中调用拦截到的目标方法,前提是要满足执行条件(满足重试条件并且重试资格没有耗尽),如果发生异常,在上下文中记录异常,然后通知监听器发生异常,如果发生异常后仍旧满足重试条件,则执行退避策略(比如到下次重试之前休眠一段时间),接着检查是否需要重新抛出异常中断重试逻辑(有状态并且遇到需要回滚异常),如果是则中断重试流程,然后重试执行完毕后执行恢复操作,如果没有恢复操作则重新抛出异常到主线程,最后清楚缓存并关闭上下文、关闭监听器和清理线程缓存。
6.2.4.1 创建重试上下文
第一步就是调用 open
方法创建重试上下文,来看一下这个 RetryContext
,它是一个接口,查看接口实现,可以发现它是跟重试策略绑定的。
这些 RetryContext
都继承自 RetryContextSupport
。
public class RetryContextSupport extends AttributeAccessorSupport implements RetryContext {
private final RetryContext parent;
private volatile boolean terminate = false;
// 重试次数
private volatile int count;
// 最后一次异常信息
private volatile Throwable lastException;
// 记录异常
public void registerThrowable(Throwable throwable) {
this.lastException = throwable;
if (throwable != null)
count++;
}
.......
}
6.2.4.2 监听器 open
执行监听器的 open
方法,且都返回 true
时才继续往下执行,否则抛出异常。前面说过默认的监听器集合中没有元素,所以会继续往下执行。
6.2.4.3 判断重试条件
重试条件有两个:canRetry(retryPolicy, context)
和 !context.isExhaustedOnly()
。先看第一个:
// InterceptorRetryPolicy
public boolean canRetry(RetryContext context) {
LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
if (lbContext.getRetryCount() == 0 && lbContext.getServiceInstance() == null) {
lbContext.setServiceInstance(null);
return true;
}
// 调用 BlockingLoadBalancedRetryPolicy 的方法
return policy.canRetryNextServer(lbContext);
}
第一次调用时重试次数为 0 且还未选择服务实例,所以会进 if
中的逻辑,直接返回 true
,代表可以重试。后续重试时就走的最下面一行,下面的代码前面讲过。
// BlockingLoadBalancedRetryPolicy
public boolean canRetryNextServer(LoadBalancedRetryContext context) {
return nextServerCount <= properties.getRetry().getMaxRetriesOnNextServiceInstance() && canRetry(context);
}
注意这里重试的判断是直接判断是否能在下一个服务实例上重试而不是当前服务实例。因为当前服务实例重试次数耗尽后,就会选择下一个服务实例,无论配置的下一个服务实例重试次数是0还是大于0,起到的效果和先判断当前服务实例重试次数是一样的。
6.2.4.4 retryCallback 执行
满足重试条件后,执行请求,对应的就是前面说到过的函数式接口,看一下它的具体逻辑(RetryLoadBalancerInterceptor#intercept
最后一行方法调用的第一个参数)。方法较长,这里拆开来分析较为清楚,具体的逻辑和非重试的逻辑相似,只不过多加了一点代码。
ServiceInstance serviceInstance = null;
if (context instanceof LoadBalancedRetryContext) {
LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
serviceInstance = lbContext.getServiceInstance();
// ....logger
}
// 获取负载均衡器的生命周期处理器
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
.getSupportedLifecycleProcessors(
loadBalancerFactory.getInstances(serviceName, LoadBalancerLifecycle.class),
RetryableRequestContext.class, ResponseData.class, ServiceInstance.class);
String hint = getHint(serviceName);
先判断上下文类型是否是 LoadBalancedRetryContext
,前面在 重试上下文
章节分析过上下文的类型就是 LoadBalancedRetryContext
,所以会进行 if
方法体。从上下文中取出实例信息进行重试,由于当前是第一次,所以实例信息为空,非第一次且在当前服务实例进行重试才能取到。
// 第一次调用时,serviceInstance 必为 null
if (serviceInstance == null) {
// ...logger
ServiceInstance previousServiceInstance = null;
if (context instanceof LoadBalancedRetryContext) {
// 获取前一个实例,达到当前实例重试次数前均为 null
LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
previousServiceInstance = lbContext.getPreviousServiceInstance();
}
DefaultRequest<RetryableRequestContext> lbRequest = new DefaultRequest<>(
new RetryableRequestContext(previousServiceInstance, new RequestData(request), hint));
// 执行生命周期接口的 onstart 方法
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
// 选择服务实例
serviceInstance = loadBalancer.choose(serviceName, lbRequest);
// ....logger
if (context instanceof LoadBalancedRetryContext) {
LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
lbContext.setServiceInstance(serviceInstance);
}
Response<ServiceInstance> lbResponse = new DefaultResponse(serviceInstance);
if (serviceInstance == null) {
// 获取的实例信息为 null,异常情况
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
.onComplete(new CompletionContext<ResponseData, ServiceInstance, RetryableRequestContext>(
CompletionContext.Status.DISCARD,
new DefaultRequest<>(
new RetryableRequestContext(null, new RequestData(request), hint)),
lbResponse)));
}
}
总体来说也很清晰:
设置上下文;
回调负载均衡生命周期接口的
onStart
方法;服务实例的选择,
choose
方法,和前面讲过的非重试的流程一样;回调负载均衡生命周期接口的
onComplete
方法;
// 创建请求
LoadBalancerRequestAdapter<ClientHttpResponse, RetryableRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(
requestFactory.createRequest(request, body, execution),
new RetryableRequestContext(null, new RequestData(request), hint));
ServiceInstance finalServiceInstance = serviceInstance;
ClientHttpResponse response =
// 执行请求
RetryLoadBalancerInterceptor.this.loadBalancer.execute(serviceName,finalServiceInstance,lbRequest);
int statusCode = response.getRawStatusCode();
if (retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) {
// ...logger
byte[] bodyCopy = StreamUtils.copyToByteArray(response.getBody());
response.close();
throw new ClientHttpResponseStatusCodeException(serviceName, response, bodyCopy);
}
这段逻辑和就是 5.3.3
章节的内容,不再赘述。如果请求成功,直接返回请求结果。
6.2.5 异常情况
异常逻辑看 catch
块里面的代码:
catch (Throwable e) {
lastException = e;
try {
// 记录异常信息
registerThrowable(retryPolicy, state, context, e);
}
catch (Exception ex) {
throw new TerminatedRetryException("Could not register throwable", ex);
}
finally {
// 调用监听器的 error 方法
doOnErrorInterceptors(retryCallback, context, e);
}
if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
// 退避策略
backOffPolicy.backOff(backOffContext);
}
catch (BackOffInterruptedException ex) {
lastException = e;
// ....... logger
throw ex;
}
}
// ....... logger
if (shouldRethrow(retryPolicy, context, state)) {
// ....... logger
throw RetryTemplate.<E>wrapIfNecessary(e);
}
}
registerThrowable
会在当前上下文记录异常信息和重试次数。而由于外层是 while
循环,所以重试次数耗尽前执行的都是 6.2.4.4
这段逻辑。
其中可以看到有一个
state
变量,这个就涉及到了 有状态重试,这里先跳过,感兴趣的可以先自行研究。
6.2.6 recoveryCallback 执行
当重试次数耗尽后,会触发 recoveryCallback
的执行,也是一个回调。
protected <T> T handleRetryExhausted(RecoveryCallback<T> recoveryCallback, RetryContext context, RetryState state) throws Throwable {
context.setAttribute(RetryContext.EXHAUSTED, true);
if (state != null && !context.hasAttribute(GLOBAL_STATE)) {
this.retryContextCache.remove(state.getKey());
}
if (recoveryCallback != null) {
T recovered = recoveryCallback.recover(context);
context.setAttribute(RetryContext.RECOVERED, true);
return recovered;
}
if (state != null) {
this.logger.debug("Retry exhausted after last attempt with no recovery path.");
rethrow(context, "Retry exhausted after last attempt with no recovery path");
}
throw wrapIfNecessary(context.getLastThrowable());
}
就是在上下文中更新相关属性,并回调 recoveryCallback
的 recover
接口:
// LoadBalancedRecoveryCallback
public T recover(RetryContext context) throws Exception {
Throwable lastThrowable = context.getLastThrowable();
if (lastThrowable != null) {
if (lastThrowable instanceof RetryableStatusCodeException) {
RetryableStatusCodeException ex = (RetryableStatusCodeException) lastThrowable;
return createResponse((R) ex.getResponse(), ex.getUri());
}
else if (lastThrowable instanceof Exception) {
throw (Exception) lastThrowable;
}
}
throw new RetryException("Could not recover", lastThrowable);
}
如果最后一次异常信息不为空,且异常类型是 RetryableStatusCodeException
,则调用 createResponse
方法创建恢复响应对象。否则直接抛出异常。
使用
createResponse
方法创建恢复响应对象的主要目的是为了在发生可重试的状态码异常时,提供一种用于恢复请求的方法。恢复响应对象将包含有关恢复操作的信息,以帮助系统更好地处理重试过程。恢复响应对象将包含一些重试信息和恢复操作的建议,例如重试间隔时间、恢复操作是否成功等。这些信息可以帮助系统更好地处理重试操作。
我们看一下 RetryableStatusCodeException
是在哪里抛出的,它有一个子类 ClientHttpResponseStatusCodeException
,在 retryCallback
中(6.2.4.4
最后),会判断返回的响应码是否可重试,响应码则是在配置文件中指定。
if (retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) {
byte[] bodyCopy = StreamUtils.copyToByteArray(response.getBody());
response.close();
throw new ClientHttpResponseStatusCodeException(serviceName, response, bodyCopy);
}
// BlockingLoadBalancedRetryPolicy
public boolean retryableStatusCode(int statusCode) {
return properties.getRetry().getRetryableStatusCodes().contains(statusCode);
}
评论区