package com.aps.gateway.config.loadBalancer; import lombok.extern.slf4j.Slf4j; import org.apache.http.util.Asserts; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.loadbalancer.DefaultResponse; import org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools; import org.springframework.cloud.client.loadbalancer.Response; import org.springframework.cloud.gateway.config.GatewayLoadBalancerProperties; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter; import org.springframework.cloud.gateway.support.DelegatingServiceInstance; import org.springframework.cloud.gateway.support.NotFoundException; import org.springframework.cloud.gateway.support.ServerWebExchangeUtils; import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; import java.net.URI; @Slf4j @Component public class GrayReactiveLoadBalancerClientFilter extends ReactiveLoadBalancerClientFilter { private final static String SCHEME = "lb"; private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150; private final GrayLoadBalancer grayLoadBalancer; private final GatewayLoadBalancerProperties loadBalancerProperties; public GrayReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, GatewayLoadBalancerProperties loadBalancerProperties, GrayLoadBalancer grayLoadBalancer) { super(clientFactory, loadBalancerProperties); this.loadBalancerProperties = loadBalancerProperties; this.grayLoadBalancer = grayLoadBalancer; } @Override public int getOrder() { return LOAD_BALANCER_CLIENT_FILTER_ORDER; } @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI url = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR); String schemePrefix = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR); // 直接放行 if (url == null || (!SCHEME.equals(url.getScheme()) && !SCHEME.equals(schemePrefix))) { return chain.filter(exchange); } // 保留原始url ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url); if (log.isTraceEnabled()) { log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url); } return choose(exchange).doOnNext(response -> { if (!response.hasServer()) { throw NotFoundException.create(loadBalancerProperties.isUse404(), "Unable to find instance for " + url.getHost()); } URI uri = exchange.getRequest().getURI(); // if the `lb:` mechanism was used, use `` as the default, // if the loadbalancer doesn't provide one. String overrideScheme = null; if (schemePrefix != null) { overrideScheme = url.getScheme(); } DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(response.getServer(), overrideScheme); URI requestUrl = LoadBalancerUriTools.reconstructURI(serviceInstance, uri); if (log.isTraceEnabled()) { log.trace("LoadBalancerClientFilter url chosen: " + requestUrl); } exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl); }).then(chain.filter(exchange)); } /** * 获取实例 * @param exchange ServerWebExchange * @return ServiceInstance */ private Mono> choose(ServerWebExchange exchange) { URI uri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR); Asserts.notNull(uri, "uri"); ServiceInstance serviceInstance = grayLoadBalancer.choose(uri.getHost(), exchange.getRequest()); return Mono.just(new DefaultResponse(serviceInstance)); } }