bluejay
2025-04-08 2aaef5e1d0cca4dd422920aec5c210d9c0b49de0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.aps.gateway.config.loadBalancer;
 
import lombok.extern.slf4j.Slf4j;
import org.apache.http.util.Asserts;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.gateway.config.GatewayLoadBalancerProperties;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;
import org.springframework.cloud.gateway.support.DelegatingServiceInstance;
import org.springframework.cloud.gateway.support.NotFoundException;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
 
import java.net.URI;
 
 
@Slf4j
@Component
public class GrayReactiveLoadBalancerClientFilter extends ReactiveLoadBalancerClientFilter  {
 
    private final static String SCHEME = "lb";
 
    private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150;
    private final GrayLoadBalancer grayLoadBalancer;
    private final GatewayLoadBalancerProperties loadBalancerProperties;
 
    public GrayReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, GatewayLoadBalancerProperties loadBalancerProperties, GrayLoadBalancer grayLoadBalancer) {
        super(clientFactory, loadBalancerProperties);
        this.loadBalancerProperties = loadBalancerProperties;
        this.grayLoadBalancer = grayLoadBalancer;
    }
 
    @Override
    public int getOrder() {
        return LOAD_BALANCER_CLIENT_FILTER_ORDER;
    }
 
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI url = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        String schemePrefix = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
 
        // 直接放行
        if (url == null || (!SCHEME.equals(url.getScheme()) && !SCHEME.equals(schemePrefix))) {
            return chain.filter(exchange);
        }
        // 保留原始url
        ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
 
        if (log.isTraceEnabled()) {
            log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
        }
 
        return choose(exchange).doOnNext(response -> {
 
            if (!response.hasServer()) {
                throw NotFoundException.create(loadBalancerProperties.isUse404(),
                        "Unable to find instance for " + url.getHost());
            }
 
            URI uri = exchange.getRequest().getURI();
 
            // if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
            // if the loadbalancer doesn't provide one.
            String overrideScheme = null;
            if (schemePrefix != null) {
                overrideScheme = url.getScheme();
            }
 
            DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(response.getServer(),
                    overrideScheme);
 
            URI requestUrl = LoadBalancerUriTools.reconstructURI(serviceInstance, uri);
 
            if (log.isTraceEnabled()) {
                log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
            }
            exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
        }).then(chain.filter(exchange));
    }
 
    /**
     * 获取实例
     * @param exchange ServerWebExchange
     * @return ServiceInstance
     */
    private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {
        URI uri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        Asserts.notNull(uri, "uri");
        ServiceInstance serviceInstance = grayLoadBalancer.choose(uri.getHost(), exchange.getRequest());
        return Mono.just(new DefaultResponse(serviceInstance));
    }
}