侧边栏壁纸
博主头像
Leokoの小破站博主等级

行动起来,活在当下

  • 累计撰写 18 篇文章
  • 累计创建 10 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

LoadBalancer 源码分析

Leoko
2024-10-24 / 0 评论 / 0 点赞 / 116 阅读 / 59410 字

1. 使用

一般都是配置 RestTemplate 使用来达到负载均衡的效果。

<!--maven 依赖-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-loadbalancer</artifactId>
</dependency>
 @Configuration
 public class MainConfiguration {
     @Bean
     @LoadBalanced 
     public RestTemplate restTemplate() {
         return new RestTemplate();
     }
 }

加了 @LoadBalanced 注解后 RestTemplate 就不能使用 ip:port 的格式去访问服务了,而是需要使用 http://serviceName/路径 的格式,根据 serviceName 去注册中心获取实际的服务信息,最后转换为要请求的地址。

2. @LoadBalanced 注解

进入注解查看注释:

/**
 * Annotation to mark a RestTemplate or WebClient bean to be configured to use a
 * LoadBalancerClient.
 */
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
​
}

注释的意思是:用于将 RestTemplateWebClientbean 标记为使用 LoadBalancerClient 的注解。即最终使用的是 LoadBalancerClient

还有一个要注意的,@LoadBalanced 中包含了 @Qualifier 注解,@Qualifier 注解是当 SpringIOC 容器中有多个同类型的 bean 时,按名称注入,放到 @LoadBalanced 有什么用呢?这个后面再分析哦。

3. LoadBalancerClient

顾名思义:负载均衡的客户端。

public interface LoadBalancerClient extends ServiceInstanceChooser {
​
    /**
     * 根据服务名发送请求
     */
    <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
​
    /**
     * 根据服务名和服务实例发送请求
     */
    <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
​
    /**
     * URI 转换,将 serviceName 转换为 ip:port
     */
    URI reconstructURI(ServiceInstance instance, URI original);
​
}

根据接口分析,LoadBalancerClient 可以向特定的服务实例发送请求。且它继承自 ServiceInstanceChooser 即服务实例选择器,我们照样看下源码:

public interface ServiceInstanceChooser {
​
    /**
     * 根据服务名选择实例.
     */
    ServiceInstance choose(String serviceId);
​
    /**
     * 根据服务名和请求选择实例.
     */
    <T> ServiceInstance choose(String serviceId, Request<T> request);
}

即:ServiceInstanceChooser 用来选择实例,LoadBalancerClient 用来向实例发送请求。

4. 自动配置类

阅读 Spring 系列源码,一般都是从两个地方入手:

  • 注解

  • 自动配置类

基本注解前面讲过,接下来看一下 loadBalancer 中的自动配置类,它的自动配置类主要在两个地方。

spring-cloud-commons 包下:

image-20240108195238843

spring-cloud-loadBalancer 包下:

image-20240108195636855

4.1 spring-cloud-commons 中的 LoadBalancerAutoConfiguration

这里只关注和 RestTemplate 相关的简单配置,异步和 reactive 的配置这里不分析,有兴趣的可以自行研究。

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class LoadBalancerAutoConfiguration {
​
    @LoadBalanced
    @Autowired(required = false)
    private List<RestTemplate> restTemplates = Collections.emptyList();
​
    @Autowired(required = false)
    private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
​
    @Bean
    public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
            final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
        return () -> restTemplateCustomizers.ifAvailable(customizers -> {
            for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                for (RestTemplateCustomizer customizer : customizers) {
                    customizer.customize(restTemplate);
                }
            }
        });
    }
​
    @Bean
    @ConditionalOnMissingBean
    public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
        return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
    }
​
    @Configuration(proxyBeanMethods = false)
    @Conditional(RetryMissingOrDisabledCondition.class)
    static class LoadBalancerInterceptorConfig {
​
        @Bean
        public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
                LoadBalancerRequestFactory requestFactory) {
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }
​
        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
            return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }
​
    }
    
    // 还有重试配置
}

4.1.1 @ConditionalOnBean(LoadBalancerClient.class)

存在 LoadBalancerClientbean 的情况下负载均衡才会生效。

没有 LoadBalancerClient 的话,也就不能根据服务名发送请求,那还要负载均衡干什么。

4.1.2 注入 RestTemplate

筛选出所有添加了@LoadBalancedRestTemplatebean ,且 requiredfalse,即使容器中没有,也不会报错。

4.1.3 loadBalancedRestTemplateInitializerDeprecated

@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
    final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
    return () -> restTemplateCustomizers.ifAvailable(customizers -> {
        for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
            for (RestTemplateCustomizer customizer : customizers) {
                customizer.customize(restTemplate);
            }
        }
    });
}

注意这段代码,将 SmartInitializingSingleton 对象注册为一个 BeanSmartInitializingSingleton 实现了一个回调接口,在容器中所有单例 Bean 都初始化完成之后,Spring 会自动回调该接口的 afterSingletonsInstantiated() 方法,以执行一些额外的初始化操作,这里就是为每个 RestTemplate 添加 LoadBalancerInterceptor 拦截器。这里的方法体就是afterSingletonsInstantiated() 的方法逻辑,这里用的是 lamda 写法。

为什么使用 SmartInitializingSingleton

  • afterSingletonsInstantiated(),该方法会在容器中所有单例 Bean 初始化完成后被调用。通过实现这个接口,可以确保在所有单例 Bean 初始化完成之后再进行相关的初始化操作,以避免可能的依赖问题。

  • 简化配置:通过将初始化代码封装在一个 SmartInitializingSingleton 对象中,可以将其声明为一个 Spring Bean,并通过 @Bean 注解进行配置。这样,Spring 在启动过程中会自动执行初始化操作,无需手动调用。除了这种,还可以用 @PostConstructBeanPostProcessor 实现,但是相比于在配置类中使用 SmartInitializingSingleton 更为繁琐。

4.1.4 LoadBalancerRequestFactory

public class LoadBalancerRequestFactory {
​
    private LoadBalancerClient loadBalancer;
​
    private List<LoadBalancerRequestTransformer> transformers;
​
    public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer,
            List<LoadBalancerRequestTransformer> transformers) {
        this.loadBalancer = loadBalancer;
        this.transformers = transformers;
    }
​
    public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer) {
        this.loadBalancer = loadBalancer;
    }
​
    public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) {
        return instance -> {
            HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
            if (this.transformers != null) {
                for (LoadBalancerRequestTransformer transformer : this.transformers) {
                    serviceRequest = transformer.transformRequest(serviceRequest, instance);
                }
            }
            return execution.execute(serviceRequest, body);
        };
    }
​
}

看到这个 createRequest 方法是不是很熟悉,和 RestTemplate 中创建请求的工厂中的方法类似,如果还没看过 RestTemplate 源码的小伙伴建议看一下哦,不看的话下面还有一些源码分析可能一知半解。

所以 LoadBalancerRequestFactory 就是创建负载均衡请求的工厂。

4.1.5 拦截器配置

@Configuration(proxyBeanMethods = false)
@Conditional(RetryMissingOrDisabledCondition.class)
static class LoadBalancerInterceptorConfig {
​
    @Bean
    public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
                                                           LoadBalancerRequestFactory requestFactory) {
        return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
    }
​
    @Bean
    @ConditionalOnMissingBean
    public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
        return restTemplate -> {
            List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
            list.add(loadBalancerInterceptor);
            restTemplate.setInterceptors(list);
        };
    }
​
}
  • 先创建 LoadBalancerInterceptor 拦截器并注入到容器。

  • 创建 RestTemplateCustomizer 将拦截器设置到 RestTemplate 中,而 RestTemplateCustomizer 是一个函数式接口,这里用的也是 lamda 写法。相当于 new 了一个 RestTemplateCustomizer 的实现类并返回。因此这里并没有真正的将拦截器设置到 RestTemplate,就想你写了一个方法,没有执行的话肯定不能生效。而真正执行的地方需要回到 4.3 章节处。

    img

注意这个内部配置类上还有一个 @Conditional(RetryMissingOrDisabledCondition.class) 注解,即在不开启重试机制的情况下负载均衡在会生效。为什么这样设计?我的理解是有两个原因:

  • 请求幂等问题:如果请求是幂等的,那么每次重试的结果应该是一样的。在这种情况下,负载均衡和重试之间应该不会存在冲突。但是如果不是幂等,在进行重试操作时同时进行了负载均衡,可能会导致请求被发送到不同的服务实例上,进而产生不一致的结果。

  • 网络延迟问题:如果网络延迟较高,请求在较长时间内没有响应,重试操作可能会导致多个请求同时发送到服务实例上,增加了服务端的负载压力。

但是,loadBalancer 本身也有重试机制,这不会冲突吗?这部分后面分析,关于重试的配置依旧在 LoadBalancerAutoConfiguration 中,前面的代码并没有贴出来。

// 下面两个条件任意一个满足时,RetryMissingOrDisabledCondition条件才会被满足
private static class RetryMissingOrDisabledCondition extends AnyNestedCondition {
​
    RetryMissingOrDisabledCondition() {
        super(ConfigurationPhase.REGISTER_BEAN);
    }
​
    @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    static class RetryTemplateMissing {
​
    }
​
    @ConditionalOnProperty(value = "spring.cloud.loadbalancer.retry.enabled", havingValue = "false")
    static class RetryDisabled {
​
    }
​
}

总结:spring-cloud-commons 中的 LoadBalancerAutoConfiguration 是用来配置 RestTemplate 拦截器相关的。

4.2 spring-cloud-loadBalancer 包下的 LoadBalancerAutoConfiguration

还有一个相同名字的 LoadBalancerAutoConfiguration 在这个包下面,来看下和 commons 包下的有什么不同。

@Configuration(proxyBeanMethods = false)
@LoadBalancerClients
@EnableConfigurationProperties(LoadBalancerProperties.class)
@AutoConfigureBefore({ ReactorLoadBalancerClientAutoConfiguration.class,
        LoadBalancerBeanPostProcessorAutoConfiguration.class })
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.enabled", havingValue = "true", matchIfMissing = true)
public class LoadBalancerAutoConfiguration {
​
    private final ObjectProvider<List<LoadBalancerClientSpecification>> configurations;
​
    public LoadBalancerAutoConfiguration(ObjectProvider<List<LoadBalancerClientSpecification>> configurations) {
        this.configurations = configurations;
    }
​
    @Bean
    @ConditionalOnMissingBean
    public LoadBalancerZoneConfig zoneConfig(Environment environment) {
        return new LoadBalancerZoneConfig(environment.getProperty("spring.cloud.loadbalancer.zone"));
    }
​
    @ConditionalOnMissingBean
    @Bean
    public LoadBalancerClientFactory loadBalancerClientFactory() {
        LoadBalancerClientFactory clientFactory = new LoadBalancerClientFactory();
        clientFactory.setConfigurations(this.configurations.getIfAvailable(Collections::emptyList));
        return clientFactory;
    }
​
}

4.2.1 @LoadBalancerClients 注解

@Configuration(proxyBeanMethods = false)
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE })
@Documented
@Import(LoadBalancerClientConfigurationRegistrar.class)
public @interface LoadBalancerClients {
​
    LoadBalancerClient[] value() default {};
​
    /**
     * 所有负载均衡客户端的默认配置
     */
    Class<?>[] defaultConfiguration() default {};
​
}
​
@Configuration(proxyBeanMethods = false)
@Import(LoadBalancerClientConfigurationRegistrar.class)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LoadBalancerClient {
​
    /**
     * 负载均衡客户端名称,即服务名
     */
    @AliasFor("name")
    String value() default "";
​
    @AliasFor("value")
    String name() default "";
​
    /**
     * 对应服务使用的配置
     */
    Class<?>[] configuration() default {};
​
}
​

这个注解导入了一个 LoadBalancerClientConfigurationRegistrar 类,另外注解的 value 属性的类型又是另外一个注解 @LoadBalancerClient。这个注解时用来干嘛的呢?

在微服务架构中,通常会有多个服务,

5. RestTemplate 调用过程分析

前面部分相同的逻辑这里就不再分析,不清楚的小伙伴去看下 RestTemplate 源码分析的的文章哦。这里直接进入到 InterceptingRequestExecution # execute 方法处,这里会遍历所有的拦截器并执行。

image-20231208152018512

5.1 LoadBalancerInterceptor # intercept

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
​
    private LoadBalancerClient loadBalancer;
​
    private LoadBalancerRequestFactory requestFactory;
​
    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
        this.loadBalancer = loadBalancer;
        this.requestFactory = requestFactory;
    }
​
    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        // for backwards compatibility
        this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
    }
​
    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
        return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
    }
​
}

使用负载均衡的 RestTemplate 使用服务名调用,所以 originalUri.getHost() 得到的就是服务名,然后调用 LoadBalancerClientexecute 方法执行请求,注意 request 参数是通过 requestFactory 生成的。

5.2 LoadBalancerRequestFactory # createRequest

// LoadBalancerRequestFactory 
public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) {
    return instance -> {
        HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
        if (this.transformers != null) {
            for (LoadBalancerRequestTransformer transformer : this.transformers) {
                serviceRequest = transformer.transformRequest(serviceRequest, instance);
            }
        }
        return execution.execute(serviceRequest, body);
    };
}
​
// LoadBalancerRequest
public interface LoadBalancerRequest<T> {
​
    T apply(ServiceInstance instance) throws Exception;
}

又是一个函数式接口,在调用其 apply 方法时,进行回调。

5.3 LoadBalancerClient # execute

LoadBalancerClient 的唯一实现类是 BlockingLoadBalancerClient

// BlockingLoadBalancerClient
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
    String hint = getHint(serviceId);
    LoadBalancerRequestAdapter<T, DefaultRequestContext> lbRequest = new        LoadBalancerRequestAdapter<>(request,new DefaultRequestContext(request, hint));
    Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
    supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
    // 根据服务名称以及请求获取服务实例
    ServiceInstance serviceInstance = choose(serviceId, lbRequest);
    if (serviceInstance == null) {
        supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
            new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, new EmptyResponse())));
        throw new IllegalStateException("No instances available for " + serviceId);
    }
    return execute(serviceId, serviceInstance, lbRequest);
}

前面看 LoadBalancerAutoConfiguration 时需要使用到 LoadBalancerClient ,那 LoadBalancerClient 是何时注入的呢?

@Configuration(proxyBeanMethods = false)
@LoadBalancerClients
@AutoConfigureAfter(LoadBalancerAutoConfiguration.class)
@AutoConfigureBefore({ org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration.class,
        AsyncLoadBalancerAutoConfiguration.class })
@ConditionalOnClass(RestTemplate.class)
public class BlockingLoadBalancerClientAutoConfiguration {
​
    @Bean
    @ConditionalOnBean(LoadBalancerClientFactory.class)
    @ConditionalOnMissingBean
    public LoadBalancerClient blockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory,
            LoadBalancerProperties properties) {
        return new BlockingLoadBalancerClient(loadBalancerClientFactory, properties);
    }
    .....
}

@AutoConfigureBefore 指定了改配置必须在 LoadBalancerAutoConfiguration 之前注入,保证 LoadBalancerAutoConfiguration 中使用的 LoadBalancerClient 不为空。

LoadBalancerClient 需要用到 LoadBalancerClientFactoryLoadBalancerClientFactory 的注入则在前面提到过的 spring-cloud-loadBalancer 包下的 LoadBalancerAutoConfiguration

5.3.1 LoadBalancerLifecycle

顾名思义,负载均衡器的生命周期,进入到这个接口看看方法:

public interface LoadBalancerLifecycle<RC, RES, T> {
    
    /**
     * 负载均衡前执行
     */
    void onStart(Request<RC> request);
    
    /**
     * 服务实例选择后,发送请求前执行
     */
    void onStartRequest(Request<RC> request, Response<T> lbResponse);
    
    /**
     * 负载均衡后执行
     */
    void onComplete(CompletionContext<RES, T, RC> completionContext);
}

onStart 方法在负载均衡(choose 方法)前执行, onComplete 方法在负载均衡后执行。默认的实现是 MicrometerStatsLoadBalancerLifecycle,用于监控指标收集。

5.3.2 服务实例选择

// BlockingLoadBalancerClient
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
    // 负载均衡策略选择
    ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
    if (loadBalancer == null) {
        return null;
    }
    // 实例选择
    Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
    if (loadBalancerResponse == null) {
        return null;
    }
    return loadBalancerResponse.getServer();
}

loadBalancerClientFactory.getInstance 底层使用的 NamedContextFactory 这个类,用来实现子容器隔离,即 SpringCloud LoadBalancer 为每个 serviceName 都分配了一个 ApplicationContext ,为什么需要这样做呢?

  • 不同的微服务应用配置可能需要不一样。不同服务的接口响应速度不一致,则需要配置不同的超时时间;还有对应服务发现的注册中心可能不一样,比如服务 A 用的是 Nacos,服务 B 用的是 Eureka。在这种需求下,不同微服务的客户端有不同的以及相同的配置有不同的 Bean,也有相同的 Bean。所以,我们可以针对每一个微服务将他们的 Bean 所处于 ApplicationContext 独立开来,不同微服务客户端使用不同的 ApplicationContextNamedContextFactory 就是用来实现这种机制的。

关于 NamedContextFactory 的使用可以看我的另一篇文章()。

ReactiveLoadBalancer 是一个接口,默认提供了两种负载均衡策略:随机 和 轮询。这里 getServiceInstance 方法传入了服务名,说明是要根据服务名选择负载策略,不同的服务选择的负载策略可以不一样(怎么选择的我们后续分析,这里先了解一下就行)。

image-20231208160011412

默认是轮询策略,具体配置在:

@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
public class LoadBalancerClientConfiguration {
​
    private static final int REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER = 193827465;
​
    @Bean
    @ConditionalOnMissingBean
    public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
            LoadBalancerClientFactory loadBalancerClientFactory) {
        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        return new RoundRobinLoadBalancer(
                loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
    }
}

接下来看一下 RoundRobinLoadBalancer 是怎么轮询的。

public Mono<Response<ServiceInstance>> choose(Request request) {
    ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
        .getIfAvailable(NoopServiceInstanceListSupplier::new);
    // serviceInstances 是服务列表
    return supplier.get(request).next()
        .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}
​
// 真正选择
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances) {
    Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
    if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
        ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
    }
    return serviceInstanceResponse;
}
​
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
    if (instances.isEmpty()) {
        if (log.isWarnEnabled()) {
            log.warn("No servers available for service: " + serviceId);
        }
        return new EmptyResponse();
    }
    int pos = Math.abs(this.position.incrementAndGet());
    // 取模算法轮询
    ServiceInstance instance = instances.get(pos % instances.size());
​
    return new DefaultResponse(instance);
}

先通过 ServiceInstanceListSupplier 获得一个服务实例列表,然后从服务列表中通过一定的策略(对于RoundRobinLoadBalancer来说,就是轮询策略)选择一个具体的服务实例。而 ServiceInstanceListSupplier 还是在 LoadBalancerClientConfiguration 中注入的,

@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
public class LoadBalancerClientConfiguration {
    .....
        
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnReactiveDiscoveryEnabled
    @Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER)
    public static class ReactiveSupportConfiguration {
​
        @Bean
        @ConditionalOnBean(ReactiveDiscoveryClient.class)
        @ConditionalOnMissingBean
        @ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "default",
                matchIfMissing = true)
        public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
                ConfigurableApplicationContext context) {
            return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching().build(context);
        }
    }
}

withBlockingDiscoveryClient() 会创建一个 DiscoveryClientServiceInstanceListSupplier 的实现类,如果用的是 Nacos 注册中心的话,就是从 Nacos 获取服务实例列表。

5.3.3 请求发送

服务实例选择完后,就可以发送请求了。

// BlockingLoadBalancerClient
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request)
            throws IOException {
    // ......
    try {
        T response = request.apply(serviceInstance);
        Object clientResponse = getClientResponse(response);
        supportedLifecycleProcessors
            .forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS, lbRequest, defaultResponse, clientResponse)));
        return response;
    }// catch ..... 
}

代码有一行 request.apply(serviceInstance) 执行的就是 5.2 提到的函数式接口。

image-20231213110505489

将原始请求包装为 ServiceRequestWrapper,下面的执行逻辑还是 RestTemplate 那一套。

6. 重试机制下的流程

前面说过开启重试后 LoadBalancerInterceptorConfig 不会生效,拦截器也将不会配置到 RestTemplate 里面,那重试机制开启的情况下如何进行负载均衡呢?答案还是在 common 包下的 LoadBalancerAutoConfiguration,就在 LoadBalancerInterceptorConfig 这个静态类的下面。

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class LoadBalancerAutoConfiguration {
    
    ....
        
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(RetryTemplate.class)
    public static class RetryAutoConfiguration {
​
        @Bean
        @ConditionalOnMissingBean
        public LoadBalancedRetryFactory loadBalancedRetryFactory() {
            return new LoadBalancedRetryFactory() {};
        }
​
    }
    
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(RetryTemplate.class)
    @ConditionalOnBean(ReactiveLoadBalancer.Factory.class)
    @ConditionalOnProperty(value = "spring.cloud.loadbalancer.retry.enabled", matchIfMissing = true)
    public static class RetryInterceptorAutoConfiguration {
​
        @Bean
        @ConditionalOnMissingBean
        public RetryLoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
                LoadBalancerProperties properties, LoadBalancerRequestFactory requestFactory,
                LoadBalancedRetryFactory loadBalancedRetryFactory,
                ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory) {
            return new RetryLoadBalancerInterceptor(loadBalancerClient, properties, requestFactory,
                    loadBalancedRetryFactory, loadBalancerFactory);
        }
​
        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(
                final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
            return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }
​
    }
}

逻辑和之前讲过的差不多,配置了一个重试的工厂和一个重试的拦截器。

6.1 LoadBalancedRetryFactory

先来看一个这个接口:

public interface LoadBalancedRetryFactory {
    
    default LoadBalancedRetryPolicy createRetryPolicy(String service, ServiceInstanceChooser serviceInstanceChooser) {
        return null;
    }
​
    default RetryListener[] createRetryListeners(String service) {
        return new RetryListener[0];
    }
    
    default BackOffPolicy createBackOffPolicy(String service) {
        return new NoBackOffPolicy();
    }
​
}

根据方法名,可以判断是根据服务名创建对应的重试策略、重试监听器和退避策略。

6.2 RetryLoadBalancerInterceptor

LoadBalancerInterceptor 相比,RetryLoadBalancerInterceptor 的构造方法多了三个参数:

  • LoadBalancerProperties:负载均衡配置

  • LoadBalancedRetryFactory:重试工厂

  • ReactiveLoadBalancer.Factory:负载策略的工厂

进去看 intercept 方法,方法里面大部分代码都是在最后一行中的 lamda 方法,这里拆开来看:

// RetryLoadBalancerInterceptor
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
    final URI originalUri = request.getURI();
    final String serviceName = originalUri.getHost();
    Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
    final LoadBalancedRetryPolicy retryPolicy = lbRetryFactory.createRetryPolicy(serviceName, loadBalancer);
    RetryTemplate template = createRetryTemplate(serviceName, request, retryPolicy);
    // 注意这里简写了retryCallback 和 recoveryCallback,实际上是两个函数式接口
    return template.execute(retryCallback, recoveryCallback);
}

前几行都是一样的,而第四行使用重试工厂创建了一个重试策略。

6.2.1 重试配置

LoadBalancerProperties 里面包含重试相关的配置,可以通过配置文件修改:

@ConfigurationProperties("spring.cloud.loadbalancer")
public class LoadBalancerProperties {
    .....
    private Retry retry = new Retry();
    
    public static class Retry {
​
        // 重试开关
        private boolean enabled = true;
        // 
        private boolean retryOnAllOperations = false;
        // 在相同实例上的最大重试次数
        private int maxRetriesOnSameServiceInstance = 0;
        // 在下一个实例上重试的最大次数
        private int maxRetriesOnNextServiceInstance = 1;
​
        private Set<Integer> retryableStatusCodes = new HashSet<>();
        
        private Backoff backoff = new Backoff();
    }   
}

6.2.2 创建重试策略

// 默认实现
public class BlockingLoadBalancedRetryFactory implements LoadBalancedRetryFactory {
​
    private final LoadBalancerProperties loadBalancerProperties;
​
    public BlockingLoadBalancedRetryFactory(LoadBalancerProperties loadBalancerProperties){
        this.loadBalancerProperties = loadBalancerProperties;
    }
​
    @Override
    public LoadBalancedRetryPolicy createRetryPolicy(String serviceId, ServiceInstanceChooser serviceInstanceChooser) {
        return new BlockingLoadBalancedRetryPolicy(loadBalancerProperties);
    }
​
}

这个直接 new 了一个 BlockingLoadBalancedRetryPolicy 返回。

6.2.3 创建RetryTemplate

private RetryTemplate createRetryTemplate(String serviceName, HttpRequest request,
            LoadBalancedRetryPolicy retryPolicy) {
    RetryTemplate template = new RetryTemplate();
    // 创建退避策略,默认是 NoBackOffPolicy--发生异常后立即重试,不会等待一段时间
    BackOffPolicy backOffPolicy = lbRetryFactory.createBackOffPolicy(serviceName);
    template.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy);
    // 重试次数达到上限而仍然失败,则抛出最后一次重试过程中产生的异常
    template.setThrowLastExceptionOnExhausted(true);
    // 重试监听器,默认是空
    RetryListener[] retryListeners = lbRetryFactory.createRetryListeners(serviceName);
    if (retryListeners != null && retryListeners.length != 0) {
        template.setListeners(retryListeners);
    }
    // 将前面创建的 retryPolicy 封装为 InterceptorRetryPolicy
    template.setRetryPolicy(!properties.getRetry().isEnabled() || retryPolicy == null ? new NeverRetryPolicy(): new InterceptorRetryPolicy(request, retryPolicy, loadBalancer, serviceName));
    return template;
}

InterceptorRetryPolicy 实现了 spring-retry 包下的 RetryPolicy 接口 ,我们看一下它判断是否需要进行重试的方法:

// InterceptorRetryPolicy
public boolean canRetry(RetryContext context) {
    LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
    if (lbContext.getRetryCount() == 0 && lbContext.getServiceInstance() == null) {
        lbContext.setServiceInstance(null);
        return true;
    }
    // 调用 BlockingLoadBalancedRetryFactory 的 canRetryNextServer 方法
    return policy.canRetryNextServer(lbContext);
}
​
// BlockingLoadBalancedRetryFactory
public boolean canRetryNextServer(LoadBalancedRetryContext context) {
    return nextServerCount <= properties.getRetry().getMaxRetriesOnNextServiceInstance() && canRetry(context);
}
​
public boolean canRetrySameServer(LoadBalancedRetryContext context) {
    return sameServerCount < properties.getRetry().getMaxRetriesOnSameServiceInstance() && canRetry(context);
}
​
public boolean canRetry(LoadBalancedRetryContext context) {
    HttpMethod method = context.getRequest().getMethod();
    return HttpMethod.GET.equals(method) || properties.getRetry().isRetryOnAllOperations();
}

可以清晰的看到 LoadBalancer 默认的重试条件是 :

  • 没有在下一个实例达到最大重试次数

  • GET 请求或 retryOnAllOperations 为 true。

retryOnAllOperationstrue 时,会对所有请求(PUT等)都执行重试。当retryOnAllOperationsfalse 时,只会对标记为可重试的请求执行重试。

6.2.4 执行请求

最后一行执行请求,execute 方法的两个参数都是回调接口,RetryLoadBalancerInterceptor 使用的是 lamda 函数。

// RetryTemplate
public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback) throws E {
    return this.doExecute(retryCallback, recoveryCallback, (RetryState)null);
}
​
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
            RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {
​
    RetryPolicy retryPolicy = this.retryPolicy;
    BackOffPolicy backOffPolicy = this.backOffPolicy;
    // 创建重试上下文
    RetryContext context = open(retryPolicy, state);
    ........
    RetrySynchronizationManager.register(context);
​
    Throwable lastException = null;
​
    boolean exhausted = false;
    try {
        // 执行监听器的 open 方法
        boolean running = doOpenInterceptors(retryCallback, context);
        // 每个监听器都返回 true 才继续执行
        if (!running) {
            throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");
        }
        ......
        // 满足重试策略的情况下进行重试
        while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
            try {
                ......
                lastException = null;
                return retryCallback.doWithRetry(context);
            }
            catch (Throwable e) {
                lastException = e;
                // 记录异常
                try {
                    registerThrowable(retryPolicy, state, context, e);
                }
                catch (Exception ex) {
                    throw new TerminatedRetryException("Could not register throwable", ex);
                }
                finally {
                    doOnErrorInterceptors(retryCallback, context, e);
                }
                // 执行退避策略
                if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
                    try {
                        backOffPolicy.backOff(backOffContext);
                    } catch ......
                }
                ......
                if (shouldRethrow(retryPolicy, context, state)) {
                    ....
                    throw RetryTemplate.<E>wrapIfNecessary(e);
                }
​
            }
            if (state != null && context.hasAttribute(GLOBAL_STATE)) {
                break;
            }
        }
        ......
        exhausted = true;
        return handleRetryExhausted(recoveryCallback, context, state);
​
    }
    catch (Throwable e) {
        throw RetryTemplate.<E>wrapIfNecessary(e);
    }
    finally {
        close(retryPolicy, context, state, lastException == null || exhausted);
        doCloseInterceptors(retryCallback, context, lastException);
        RetrySynchronizationManager.clear();
    }
​
}

方法虽然长,但是逻辑比较清晰,首先在循环中调用拦截到的目标方法,前提是要满足执行条件(满足重试条件并且重试资格没有耗尽),如果发生异常,在上下文中记录异常,然后通知监听器发生异常,如果发生异常后仍旧满足重试条件,则执行退避策略(比如到下次重试之前休眠一段时间),接着检查是否需要重新抛出异常中断重试逻辑(有状态并且遇到需要回滚异常),如果是则中断重试流程,然后重试执行完毕后执行恢复操作,如果没有恢复操作则重新抛出异常到主线程,最后清楚缓存并关闭上下文、关闭监听器和清理线程缓存。

6.2.4.1 创建重试上下文

第一步就是调用 open 方法创建重试上下文,来看一下这个 RetryContext ,它是一个接口,查看接口实现,可以发现它是跟重试策略绑定的

image-20231216162609734

这些 RetryContext 都继承自 RetryContextSupport

public class RetryContextSupport extends AttributeAccessorSupport implements RetryContext {
​
    private final RetryContext parent;
​
    private volatile boolean terminate = false;
    // 重试次数
    private volatile int count;
    // 最后一次异常信息
    private volatile Throwable lastException;
    // 记录异常
    public void registerThrowable(Throwable throwable) {
        this.lastException = throwable;
        if (throwable != null)
            count++;
    }
    .......
}
6.2.4.2 监听器 open

执行监听器的 open 方法,且都返回 true 时才继续往下执行,否则抛出异常。前面说过默认的监听器集合中没有元素,所以会继续往下执行。

6.2.4.3 判断重试条件

重试条件有两个:canRetry(retryPolicy, context)!context.isExhaustedOnly()。先看第一个:

// InterceptorRetryPolicy
public boolean canRetry(RetryContext context) {
    LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
    if (lbContext.getRetryCount() == 0 && lbContext.getServiceInstance() == null) {
        lbContext.setServiceInstance(null);
        return true;
    }
    // 调用 BlockingLoadBalancedRetryPolicy 的方法
    return policy.canRetryNextServer(lbContext);
}

第一次调用时重试次数为 0 且还未选择服务实例,所以会进 if 中的逻辑,直接返回 true,代表可以重试。后续重试时就走的最下面一行,下面的代码前面讲过。

// BlockingLoadBalancedRetryPolicy
public boolean canRetryNextServer(LoadBalancedRetryContext context) {
    return nextServerCount <= properties.getRetry().getMaxRetriesOnNextServiceInstance() && canRetry(context);
}

注意这里重试的判断是直接判断是否能在下一个服务实例上重试而不是当前服务实例。因为当前服务实例重试次数耗尽后,就会选择下一个服务实例,无论配置的下一个服务实例重试次数是0还是大于0,起到的效果和先判断当前服务实例重试次数是一样的。

6.2.4.4 retryCallback 执行

满足重试条件后,执行请求,对应的就是前面说到过的函数式接口,看一下它的具体逻辑(RetryLoadBalancerInterceptor#intercept 最后一行方法调用的第一个参数)。方法较长,这里拆开来分析较为清楚,具体的逻辑和非重试的逻辑相似,只不过多加了一点代码。

ServiceInstance serviceInstance = null;
if (context instanceof LoadBalancedRetryContext) {
    LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
    serviceInstance = lbContext.getServiceInstance();
    // ....logger
}
// 获取负载均衡器的生命周期处理器
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
    .getSupportedLifecycleProcessors(
    loadBalancerFactory.getInstances(serviceName, LoadBalancerLifecycle.class),
    RetryableRequestContext.class, ResponseData.class, ServiceInstance.class);
String hint = getHint(serviceName);

先判断上下文类型是否是 LoadBalancedRetryContext,前面在 重试上下文 章节分析过上下文的类型就是 LoadBalancedRetryContext,所以会进行 if 方法体。从上下文中取出实例信息进行重试,由于当前是第一次,所以实例信息为空,非第一次且在当前服务实例进行重试才能取到。

// 第一次调用时,serviceInstance 必为 null
if (serviceInstance == null) {
    // ...logger
    ServiceInstance previousServiceInstance = null;
    if (context instanceof LoadBalancedRetryContext) {
        // 获取前一个实例,达到当前实例重试次数前均为 null
        LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
        previousServiceInstance = lbContext.getPreviousServiceInstance();
    }
    DefaultRequest<RetryableRequestContext> lbRequest = new DefaultRequest<>(
        new RetryableRequestContext(previousServiceInstance, new RequestData(request), hint));
    // 执行生命周期接口的 onstart 方法
    supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
    // 选择服务实例
    serviceInstance = loadBalancer.choose(serviceName, lbRequest);
    // ....logger
    if (context instanceof LoadBalancedRetryContext) {
        LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
        lbContext.setServiceInstance(serviceInstance);
    }
    Response<ServiceInstance> lbResponse = new DefaultResponse(serviceInstance);
    if (serviceInstance == null) {
        // 获取的实例信息为 null,异常情况
        supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
                                             .onComplete(new CompletionContext<ResponseData, ServiceInstance, RetryableRequestContext>(
                                                 CompletionContext.Status.DISCARD,
                                                 new DefaultRequest<>(
                                                     new RetryableRequestContext(null, new RequestData(request), hint)),
                                                 lbResponse)));
    }
}

总体来说也很清晰:

  • 设置上下文;

  • 回调负载均衡生命周期接口的 onStart 方法;

  • 服务实例的选择,choose 方法,和前面讲过的非重试的流程一样;

  • 回调负载均衡生命周期接口的 onComplete 方法;

// 创建请求
LoadBalancerRequestAdapter<ClientHttpResponse, RetryableRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(
                    requestFactory.createRequest(request, body, execution),
                    new RetryableRequestContext(null, new RequestData(request), hint));
ServiceInstance finalServiceInstance = serviceInstance;
ClientHttpResponse response = 
// 执行请求  
RetryLoadBalancerInterceptor.this.loadBalancer.execute(serviceName,finalServiceInstance,lbRequest);
int statusCode = response.getRawStatusCode();
if (retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) {
    // ...logger
    byte[] bodyCopy = StreamUtils.copyToByteArray(response.getBody());
    response.close();
    throw new ClientHttpResponseStatusCodeException(serviceName, response, bodyCopy);
}

这段逻辑和就是 5.3.3 章节的内容,不再赘述。如果请求成功,直接返回请求结果。

6.2.5 异常情况

异常逻辑看 catch 块里面的代码:

catch (Throwable e) {
    lastException = e;
    try {
        // 记录异常信息
        registerThrowable(retryPolicy, state, context, e);
    }
    catch (Exception ex) {
        throw new TerminatedRetryException("Could not register throwable", ex);
    }
    finally {
        // 调用监听器的 error 方法
        doOnErrorInterceptors(retryCallback, context, e);
    }
​
    if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
        try {
            // 退避策略
            backOffPolicy.backOff(backOffContext);
        }
        catch (BackOffInterruptedException ex) {
            lastException = e;
            // ....... logger
            throw ex;
        }
    }
    // ....... logger
    if (shouldRethrow(retryPolicy, context, state)) {
        // ....... logger
        throw RetryTemplate.<E>wrapIfNecessary(e);
    }
​
}

registerThrowable 会在当前上下文记录异常信息和重试次数。而由于外层是 while 循环,所以重试次数耗尽前执行的都是 6.2.4.4 这段逻辑。

其中可以看到有一个 state 变量,这个就涉及到了 有状态重试,这里先跳过,感兴趣的可以先自行研究。

6.2.6 recoveryCallback 执行

当重试次数耗尽后,会触发 recoveryCallback 的执行,也是一个回调。

protected <T> T handleRetryExhausted(RecoveryCallback<T> recoveryCallback, RetryContext context, RetryState state) throws Throwable {
    context.setAttribute(RetryContext.EXHAUSTED, true);
    if (state != null && !context.hasAttribute(GLOBAL_STATE)) {
        this.retryContextCache.remove(state.getKey());
    }
    if (recoveryCallback != null) {
        T recovered = recoveryCallback.recover(context);
        context.setAttribute(RetryContext.RECOVERED, true);
        return recovered;
    }
    if (state != null) {
        this.logger.debug("Retry exhausted after last attempt with no recovery path.");
        rethrow(context, "Retry exhausted after last attempt with no recovery path");
    }
    throw wrapIfNecessary(context.getLastThrowable());
}

就是在上下文中更新相关属性,并回调 recoveryCallbackrecover 接口:

// LoadBalancedRecoveryCallback
public T recover(RetryContext context) throws Exception {
    Throwable lastThrowable = context.getLastThrowable();
    if (lastThrowable != null) {
        if (lastThrowable instanceof RetryableStatusCodeException) {
            RetryableStatusCodeException ex = (RetryableStatusCodeException) lastThrowable;
            return createResponse((R) ex.getResponse(), ex.getUri());
        }
        else if (lastThrowable instanceof Exception) {
            throw (Exception) lastThrowable;
        }
    }
    throw new RetryException("Could not recover", lastThrowable);
}

如果最后一次异常信息不为空,且异常类型是 RetryableStatusCodeException,则调用 createResponse 方法创建恢复响应对象。否则直接抛出异常。

image-20231225185311617

使用 createResponse 方法创建恢复响应对象的主要目的是为了在发生可重试的状态码异常时,提供一种用于恢复请求的方法。恢复响应对象将包含有关恢复操作的信息,以帮助系统更好地处理重试过程。恢复响应对象将包含一些重试信息和恢复操作的建议,例如重试间隔时间、恢复操作是否成功等。这些信息可以帮助系统更好地处理重试操作。

我们看一下 RetryableStatusCodeException 是在哪里抛出的,它有一个子类 ClientHttpResponseStatusCodeException,在 retryCallback 中(6.2.4.4 最后),会判断返回的响应码是否可重试,响应码则是在配置文件中指定。

if (retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) {
    byte[] bodyCopy = StreamUtils.copyToByteArray(response.getBody());
    response.close();
    throw new ClientHttpResponseStatusCodeException(serviceName, response, bodyCopy);
}
​
// BlockingLoadBalancedRetryPolicy
public boolean retryableStatusCode(int statusCode) {
    return properties.getRetry().getRetryableStatusCodes().contains(statusCode);
}

0

评论区