package com.aps.gateway.config.loadBalancer;
|
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.http.util.Asserts;
|
import org.springframework.cloud.client.ServiceInstance;
|
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
|
import org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools;
|
import org.springframework.cloud.client.loadbalancer.Response;
|
import org.springframework.cloud.gateway.config.GatewayLoadBalancerProperties;
|
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
|
import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;
|
import org.springframework.cloud.gateway.support.DelegatingServiceInstance;
|
import org.springframework.cloud.gateway.support.NotFoundException;
|
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
|
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
|
import org.springframework.stereotype.Component;
|
import org.springframework.web.server.ServerWebExchange;
|
import reactor.core.publisher.Mono;
|
|
import java.net.URI;
|
|
|
@Slf4j
|
@Component
|
public class GrayReactiveLoadBalancerClientFilter extends ReactiveLoadBalancerClientFilter {
|
|
private final static String SCHEME = "lb";
|
|
private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150;
|
private final GrayLoadBalancer grayLoadBalancer;
|
private final GatewayLoadBalancerProperties loadBalancerProperties;
|
|
public GrayReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, GatewayLoadBalancerProperties loadBalancerProperties, GrayLoadBalancer grayLoadBalancer) {
|
super(clientFactory, loadBalancerProperties);
|
this.loadBalancerProperties = loadBalancerProperties;
|
this.grayLoadBalancer = grayLoadBalancer;
|
}
|
|
@Override
|
public int getOrder() {
|
return LOAD_BALANCER_CLIENT_FILTER_ORDER;
|
}
|
|
@Override
|
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
|
URI url = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
|
String schemePrefix = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
|
|
// 直接放行
|
if (url == null || (!SCHEME.equals(url.getScheme()) && !SCHEME.equals(schemePrefix))) {
|
return chain.filter(exchange);
|
}
|
// 保留原始url
|
ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
|
|
if (log.isTraceEnabled()) {
|
log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
|
}
|
|
return choose(exchange).doOnNext(response -> {
|
|
if (!response.hasServer()) {
|
throw NotFoundException.create(loadBalancerProperties.isUse404(),
|
"Unable to find instance for " + url.getHost());
|
}
|
|
URI uri = exchange.getRequest().getURI();
|
|
// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
|
// if the loadbalancer doesn't provide one.
|
String overrideScheme = null;
|
if (schemePrefix != null) {
|
overrideScheme = url.getScheme();
|
}
|
|
DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(response.getServer(),
|
overrideScheme);
|
|
URI requestUrl = LoadBalancerUriTools.reconstructURI(serviceInstance, uri);
|
|
if (log.isTraceEnabled()) {
|
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
|
}
|
exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
|
}).then(chain.filter(exchange));
|
}
|
|
/**
|
* 获取实例
|
* @param exchange ServerWebExchange
|
* @return ServiceInstance
|
*/
|
private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {
|
URI uri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
|
Asserts.notNull(uri, "uri");
|
ServiceInstance serviceInstance = grayLoadBalancer.choose(uri.getHost(), exchange.getRequest());
|
return Mono.just(new DefaultResponse(serviceInstance));
|
}
|
}
|