由于最近博主在压测接口的时候发现我接口出现卡死状态,最开始以为是我自己接口出现问题,单独压测我自己的服务(不经过网关)200/qps/10 次循环 是没问题,但是加上网关(zuul 1.x) 去发现 经过两次循环基本就不能访问,同时其他接口也不能访问,由此问题出现在zuul ,接着开始排查之路。
在刚才背景当时只是怀疑zuul 有问题,因为zuul 没有加降级熔断。是否是它需要排查去确认,我当时(测试环境)通过arthas 查看了内存、线程,发现大量waiting 线程,查询具体waiting 线程详细信息后发现有大量http 请求连接没有唤醒,处于watting 根本原因是连接没有关闭。后来又在本地压测通过jconsole 定位如图
发现和测试环境一样的大量阻塞线程,为啥阻塞就需要看看Zuul 和Ribbon 他们交互逻辑。下面是分析过程。
基于之前对zuul 1.x了解执行流程图
可以看到整个流程也就是route 会建立http连接请求。根据源码之后流程只有两种情况一种是成功执行post 另一种是出现异常执行error。
核心出现也就是SendErrorFilter-run
正常不抛异常的话SendResponseFilter 理论是最后一个filter 他会执行关闭操作
private void writeResponse() throws Exception {
RequestContext context = RequestContext.getCurrentContext();
// there is no body to send
if (context.getResponseBody() == null
&& context.getResponseDataStream() == null) {
return;
}
HttpServletResponse servletResponse = context.getResponse();
if (servletResponse.getCharacterEncoding() == null) { // only set if not set
servletResponse.setCharacterEncoding("UTF-8");
}
String servletResponseContentEncoding = getResponseContentEncoding(context);
OutputStream outStream = servletResponse.getOutputStream();
InputStream is = null;
try {
if (context.getResponseBody() != null) {
String body = context.getResponseBody();
is = new ByteArrayInputStream(
body.getBytes(servletResponse.getCharacterEncoding()));
}
else {
is = context.getResponseDataStream();
if (is != null && context.getResponseGZipped()) {
// if origin response is gzipped, and client has not requested gzip,
// decompress stream before sending to client
// else, stream gzip directly to client
if (isGzipRequested(context)) {
servletResponseContentEncoding = "gzip";
}
else {
servletResponseContentEncoding = null;
is = handleGzipStream(is);
}
}
}
if (servletResponseContentEncoding != null) {
servletResponse.setHeader(ZuulHeaders.CONTENT_ENCODING,
servletResponseContentEncoding);
}
if (is != null) {
writeResponse(is, outStream);
}
}
finally {
/**
* We must ensure that the InputStream provided by our upstream pooling
* mechanism is ALWAYS closed even in the case of wrapped streams, which are
* supplied by pooled sources such as Apache's
* PoolingHttpClientConnectionManager. In that particular case, the underlying
* HTTP connection will be returned back to the connection pool iif either
* close() is explicitly called, a read error occurs, or the end of the
* underlying stream is reached. If, however a write error occurs, we will end
* up leaking a connection from the pool without an explicit close()
*
* @author Johannes Edmeier
*/
if (is != null) {
try {
//关闭流 同时org.apache.http.conn.EofSensorInputStream 也会清除http 连接
is.close();
}
catch (Exception ex) {
log.warn("Error while closing upstream input stream", ex);
}
}
// cleanup ThreadLocal when we are all done
if (buffers != null) {
buffers.remove();
}
try {
Object zuulResponse = context.get("zuulResponse");
if (zuulResponse instanceof Closeable) {
((Closeable) zuulResponse).close();
}
outStream.flush();
// The container will close the stream for us
}
catch (IOException ex) {
log.warn("Error while sending response to client: " + ex.getMessage());
}
}
}
EofSensorInputStream 关闭同时也会归还http连接。
通过上面代码分析,压测的时候发生异常,所以代码执行都会去SendErrorFilter run 方法 他会转发
dispatcher.forward(request, ctx.getResponse());
这个又会重新执行到ZuulServlet 中service 再次请求到之前的微服务接口。因此我们压测那个场景出现阻塞的原因就是:当并发线程高于配置资源后 rabbion http 连接池么有可用连接了,拿不到连接也没有熔断降级配置,抛异常最后执行到SendErrorFilter 这里没有对
public InputStream getResponseDataStream() {
return (InputStream) get("responseDataStream");
}
执行关闭。导致了连接泄露线程阻塞了,从而页面卡死。
不同情况具体分析
@Slf4j
public class CustomFallbackProvider implements FallbackProvider {
@Override
public String getRoute() {
return "*";
}
@Override
public ClientHttpResponse fallbackResponse(String route, Throwable cause) {
return new ClientHttpResponse() {
/**
*ClientHttpResponse的fallback的状态码,返回的是HttpStatus
* @return
*/
@Override
public HttpStatus getStatusCode() throws IOException {
return HttpStatus.INTERNAL_SERVER_ERROR;
}
/**
*ClientHttpResponse的fallback的状态码,返回的是int
* @return
*/
@Override
public int getRawStatusCode() throws IOException {
return this.getStatusCode().value();
}
/**
*ClientHttpResponse的fallback的状态码,返回的是String
* @return
*/
@Override
public String getStatusText() throws IOException {
return this.getStatusCode().getReasonPhrase();
}
@Override
public void close() {
}
/**
*设置响应体信息
* @return
*/
@Override
public InputStream getBody() {
String content = "网络异常,请稍后重试!";
return new ByteArrayInputStream(content.getBytes());
}
/**
*设置响应的头信息
* @return
*/
@Override
public HttpHeaders getHeaders() {
HttpHeaders headers = new HttpHeaders();
MediaType mediaType = new MediaType("application", "json", Charset.forName("utf-8"));
headers.setContentType(mediaType);
return headers;
}
};
}
}
为啥增加降级会减少(是大大降低但是不是完全解决)线程阻塞问题?通过代码分析
我们有自定义的FallbackProvider 返回ClientHttpResponse 这样不会执行到SendErrorFilter 最后走的还是SendResponseFilter run 方法中关闭流归还连接。
继承ZuulFilter 设置Error 类型 Order 设置-1 保证有异常不去执行SendErrorFilter (context.remove(“throwable”); 之后shouldFilter 返回false 也就不会执行了) 核心代码如下:
@Slf4j
@Component
public class ErrorFilter extends ZuulFilter {
@Override
public String filterType() {
return ERROR_TYPE;
}
@Override
public int filterOrder() {
return -1;
}
protected static final String SEND_ERROR_FILTER_RAN = "sendErrorFilter.ran";
@Override
public boolean shouldFilter() {
RequestContext ctx = RequestContext.getCurrentContext();
return ctx.getThrowable() != null && !ctx.getBoolean(SEND_ERROR_FILTER_RAN, false);
}
@Override
public Object run() {
RequestContext context = RequestContext.getCurrentContext();
PrintWriter writer = null;
InputStream is = null;
try {
context.remove("throwable");
context.set(SEND_ERROR_FILTER_RAN, true);
ZuulException exception = findZuulException(context.getThrowable());
HttpServletResponse response = context.getResponse();
response.setContentType("application/json; charset=utf8");
response.setStatus(exception.nStatusCode);
is = context.getResponseDataStream();
writer = response.getWriter();
Map<String, Object> map = new HashMap<>();
map.put("code", exception.nStatusCode);
map.put("msg", exception.errorCause);
map.put("detail", exception.getMessage());
String retStr = JSON.toJSONString(map);
writer.print(retStr);
writer.flush();
} catch (Exception e) {
log.error(e.getMessage());
} finally {
if (is != null) {
try {
is.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (writer != null) {
writer.close();
}
}
return null;
}
protected ZuulException findZuulException(Throwable throwable) {
if (Objects.isNull(throwable)) {
return null;
}
if (throwable.getCause() instanceof ZuulRuntimeException) {
Throwable cause = null;
if (throwable.getCause().getCause() != null) {
cause = throwable.getCause().getCause().getCause();
}
if (cause instanceof ClientException && cause.getCause() != null
&& cause.getCause().getCause() instanceof SocketTimeoutException) {
ZuulException zuulException = new ZuulException("", 504,
ZuulException.class.getName() + ": Hystrix Readed time out");
return zuulException;
}
if (throwable.getCause().getCause() instanceof ZuulException) {
return (ZuulException) throwable.getCause().getCause();
}
}
if (throwable.getCause() instanceof ZuulException) {
return (ZuulException) throwable.getCause();
}
if (throwable instanceof ZuulException) {
return (ZuulException) throwable;
}
return new ZuulException(throwable, HttpStatus.INTERNAL_SERVER_ERROR.value(), null);
}
}
目前熔断和重新写Error filter 基本可以保证高并发下不发生连接泄露,但是要是性能追求更高 可以使用Nocos、Zuul2.x 等基于Netty 的网关框架。