Spring Cloud Gateway-使用自定义过滤器通过Hystrix实现降级处理

前提

在微服务架构中,下游依赖出现问题如果上游调用方不做请求降级处理,下游的异常依赖没有被隔离,很有可能出现因为一两个服务或者小到一两个接口异常导致上游所有服务不可用,甚至影响整个业务线。请求降级处理目前比较主流的依然是Netfilx出品的HystrixHystrix的工作原理是:

  • 把请求基于线程池或者信号量隔离,一旦下游服务在指定配置的超时时间内无法响应会进入预设或者默认的降级实现。
  • 每个请求的状态都会记录下来,在一个滑动窗口内处理失败的比率超过设定的阈值就会触发熔断器(Circle Breaker)开启,熔断器开启之后所有请求都会直接进入预设或者默认的降级逻辑。
  • 熔断器打开后,且距离熔断器打开的时间或上一次试探请求放行的时间超过设定值,熔断器器进入半开状态,允许放行一个试探请求。
  • 请求成功率提高后,基于统计数据确定对熔断器进行关闭,所有请求正常放行。

这里不对Hystrix的细节做更深入分析,而是接着谈谈Spring Cloud Gateway中如何使用Hystrix,主要包括内置的Hystrix过滤器和定制过滤器结合Hystrix实现我们想要的功能。除了要引入spring-cloud-starter-gateway依赖之外,还需要引入spring-cloud-starter-netflix-hystrix

<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
</dependencies>

使用内置的Hystrix过滤器

内置的Hystrix过滤器是HystrixGatewayFilterFactory,它支持的配置是:

public static class Config {
// 如果下面的Setter配置为null的时候,name会作为Hystrix的HystrixCommandKey
private String name;
// Hystrix的Setter属性,主要用来配置命令的KEY和其他属性
private Setter setter;
// 降级的目标URI,必须以forward开头,URI会匹配到网关应用的控制器方法
private URI fallbackUri;

public String getName() {
return name;
}

public Config setName(String name) {
this.name = name;
return this;
}

public Config setFallbackUri(String fallbackUri) {
if (fallbackUri != null) {
setFallbackUri(URI.create(fallbackUri));
}
return this;
}

public URI getFallbackUri() {
return fallbackUri;
}

// 注意这个方法,配置的fallbackUri要以forward开始作为schema,否则会抛异常
public void setFallbackUri(URI fallbackUri) {
if (fallbackUri != null && !"forward".equals(fallbackUri.getScheme())) {
throw new IllegalArgumentException("Hystrix Filter currently only supports 'forward' URIs, found " + fallbackUri);
}
this.fallbackUri = fallbackUri;
}

public Config setSetter(Setter setter) {
this.setter = setter;
return this;
}
}

另外,(1)全局的Hystrix配置也会对HystrixGatewayFilterFactory生效;(2)HystrixGatewayFilterFactory可以作为默认过滤器(default-filters)对所有的路由配置作为兜底过滤器并发挥作用。

对于第(1)点,我们如果在application.yaml中配置如下:

// 执行超时时间为1秒,会对下面路由order_route绑定的HystrixGatewayFilterFactory生效
hystrix.command.fallbackcmd.execution.isolation.thread.timeoutInMilliseconds: 1000

spring:
cloud:
gateway:
routes:
- id: order_route
uri: http://localhost:9091
predicates:
- Path=/order/**
filters:
- name: Hystrix
args:
name: HystrixCommand
fallbackUri: forward:/fallback

配置的hystrix.command.fallbackcmd.execution.isolation.thread.timeoutInMilliseconds会对绑定在路由order_route中的HystrixGatewayFilterFactory生效。

对于第(2)点,我们可以把HystrixGatewayFilterFactory配置为默认过滤器,这样子所有的路由都会关联此过滤器,但是非必要时建议不要这样做

spring:
cloud:
gateway:
routes:
- id: order_route
uri: http://localhost:9091
predicates:
- Path=/order/**
default-filters:
- name: Hystrix
args:
name: HystrixCommand
fallbackUri: forward:/fallback

笔者在测试的时候,发现上面提到的Setter无法配置,估计是由于HystrixSetter对象是经过多重包装,暂时没有办法设置该属性。接着我们要在网关服务加一个控制器方法用于处理重定向的/fallback请求:

@RestController
public class FallbackController {

@RequestMapping(value = "/fallback")
@ResponseStatus
public Mono<Map<String, Object>> fallback(ServerWebExchange exchange, Throwable throwable) {
Map<String, Object> result = new HashMap<>(8);
ServerHttpRequest request = exchange.getRequest();
result.put("path", request.getPath().pathWithinApplication().value());
result.put("method", request.getMethodValue());
if (null != throwable.getCause()) {
result.put("message", throwable.getCause().getMessage());
} else {
result.put("message", throwable.getMessage());
}
return Mono.just(result);
}
}

控制器方法入参会被Spring Cloud Gateway的内部组件处理,可以回调一些有用的类型例如ServerWebExchange实例、具体的异常实例等等。

使用Hystrix定制过滤器

HystrixGatewayFilterFactory在大多数情况下应该可以满足业务需要,但是这里也做一次定制一个整合Hystrix的过滤器,实现的功能如下:

  • 基于每个请求URL创建一个新的Hystrix命令实例进行调用。
  • 每个URL可以指定特有的线程池配置,如果不指定则使用默认的。
  • 每个URL可以配置单独的Hystrix超时时间。

也就是通过Hystrix使用线程池对每种不同的外部请求URL进行隔离。当然,这样的过滤器仅仅在外部请求的不同URL的数量有限的情况下才比较合理,否则有可能创建过多的线程池造成系统性能的下降,适得其反。改造如下:

@Component
public class CustomHystrixFilter extends AbstractGatewayFilterFactory<CustomHystrixFilter.Config> {

private static final String FORWARD_KEY = "forward";
private static final String NAME = "CustomHystrix";
private static final int TIMEOUT_MS = 1000;
private final ObjectProvider<DispatcherHandler> dispatcherHandlerProvider;
private volatile DispatcherHandler dispatcherHandler;
private boolean processConfig = false;

public CustomHystrixFilter(ObjectProvider<DispatcherHandler> dispatcherHandlerProvider) {
super(Config.class);
this.dispatcherHandlerProvider = dispatcherHandlerProvider;
}

private DispatcherHandler getDispatcherHandler() {
if (dispatcherHandler == null) {
dispatcherHandler = dispatcherHandlerProvider.getIfAvailable();
}

return dispatcherHandler;
}

@Override
public List<String> shortcutFieldOrder() {
return Collections.singletonList(NAME_KEY);
}


@Override
public GatewayFilter apply(Config config) {
processConfig(config);
return (exchange, chain) -> {
ServerHttpRequest request = exchange.getRequest();
String path = request.getPath().pathWithinApplication().value();
int timeout = config.getTimeout().getOrDefault(path, TIMEOUT_MS);
CustomHystrixCommand command = new CustomHystrixCommand(config.getFallbackUri(), exchange, chain, timeout, path);
return Mono.create(s -> {
Subscription sub = command.toObservable().subscribe(s::success, s::error, s::success);
s.onCancel(sub::unsubscribe);
}).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> {
if (throwable instanceof HystrixRuntimeException) {
HystrixRuntimeException e = (HystrixRuntimeException) throwable;
HystrixRuntimeException.FailureType failureType = e.getFailureType();
switch (failureType) {
case TIMEOUT:
return Mono.error(new TimeoutException());
case COMMAND_EXCEPTION: {
Throwable cause = e.getCause();
if (cause instanceof ResponseStatusException || AnnotatedElementUtils
.findMergedAnnotation(cause.getClass(), ResponseStatus.class) != null) {
return Mono.error(cause);
}
}
default:
break;
}
}
return Mono.error(throwable);
}).then();
};
}

/**
* YAML解析的时候MAP的KEY不支持'/',这里只能用'-'替代
*
* @param config config
*/
private void processConfig(Config config) {
if (!processConfig) {
processConfig = true;
if (null != config.getTimeout()) {
Map<String, Integer> timeout = new HashMap<>(8);
config.getTimeout().forEach((k, v) -> {
String key = k.replace("-", "/");
if (!key.startsWith("/")) {
key = "/" + key;
}
timeout.put(key, v);
});
config.setTimeout(timeout);
}
}
}

@Override
public String name() {
return NAME;
}

private class CustomHystrixCommand extends HystrixObservableCommand<Void> {

private final URI fallbackUri;
private final ServerWebExchange exchange;
private final GatewayFilterChain chain;

public CustomHystrixCommand(URI fallbackUri,
ServerWebExchange exchange,
GatewayFilterChain chain,
int timeout,
String key) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(key))
.andCommandKey(HystrixCommandKey.Factory.asKey(key))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(timeout)));
this.fallbackUri = fallbackUri;
this.exchange = exchange;
this.chain = chain;
}

@Override
protected Observable<Void> construct() {
return RxReactiveStreams.toObservable(this.chain.filter(exchange));
}

@Override
protected Observable<Void> resumeWithFallback() {
if (null == fallbackUri) {
return super.resumeWithFallback();
}
URI uri = exchange.getRequest().getURI();
boolean encoded = containsEncodedParts(uri);
URI requestUrl = UriComponentsBuilder.fromUri(uri)
.host(null)
.port(null)
.uri(this.fallbackUri)
.build(encoded)
.toUri();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
ServerHttpRequest request = this.exchange.getRequest().mutate().uri(requestUrl).build();
ServerWebExchange mutated = exchange.mutate().request(request).build();
return RxReactiveStreams.toObservable(getDispatcherHandler().handle(mutated));
}
}

public static class Config {

private String id;
private URI fallbackUri;
/**
* url -> timeout ms
*/
private Map<String, Integer> timeout;

public String getId() {
return id;
}

public Config setId(String id) {
this.id = id;
return this;
}

public URI getFallbackUri() {
return fallbackUri;
}

public Config setFallbackUri(URI fallbackUri) {
if (fallbackUri != null && !FORWARD_KEY.equals(fallbackUri.getScheme())) {
throw new IllegalArgumentException("Hystrix Filter currently only supports 'forward' URIs, found " + fallbackUri);
}
this.fallbackUri = fallbackUri;
return this;
}

public Map<String, Integer> getTimeout() {
return timeout;
}

public Config setTimeout(Map<String, Integer> timeout) {
this.timeout = timeout;
return this;
}
}
}

其实大部分代码和内置的Hystrix过滤器差不多,只是改了命令改造函数部分和配置加载处理的部分。配置文件如下:

spring:
cloud:
gateway:
routes:
- id: hystrix_route
uri: http://localhost:9091
predicates:
- Host=localhost:9090
filters:
- name: CustomHystrix
args:
id: CustomHystrix
fallbackUri: forward:/fallback
timeout:
# 这里暂时用-分隔URL,因为/不支持
order-remote: 2000
application:
name: route-server
server:
port: 9090

网关添加一个/fallback处理控制器如下:

@RestController
public class FallbackController {

@RequestMapping(value = "/fallback")
@ResponseStatus
public Mono<Map<String, Object>> fallback(ServerWebExchange exchange, Throwable throwable) {
Map<String, Object> result = new HashMap<>(8);
ServerHttpRequest request = exchange.getRequest();
result.put("path", request.getPath().pathWithinApplication().value());
result.put("method", request.getMethodValue());
if (null != throwable.getCause()) {
result.put("message", throwable.getCause().getMessage());
} else {
result.put("message", throwable.getMessage());
}
return Mono.just(result);
}
}

故意在下游服务打断点:

curl http://localhost:9090/order/remote

响应结果:
{
"path": "/fallback",
"method": "GET",
"message": null # <== 这里由于是超时异常,message就是null
}

刚好符合预期结果。

小结

这篇文章仅仅是对Hystrix和过滤器应用提供一个可用的例子和解决问题的思路,具体如何使用还是需要针对真实的场景。

(本文完 c-2-d e-a-20190522)

文章作者: throwable
文章链接: http://www.throwable.club/2019/05/25/spring-cloud-gateway-hystrix/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Throwable
❤支付宝打赏❤
❤微信打赏❤