解决网络型inputStream无法反复read的问题。mark/reset not supported

2022-07-26,,,,

# 一、基础知识说明

对于大部分inputStream反复读写的问题,我们可以采用mark(int readLimit)、reset()方法解决。但是网络型的inputStream却不支持这种方法。

# 二、问题的产生

由于业务需求,需要对restTemplate添加一个拦截器,以实现根据第三方服务的response中的code字段 来判断后续进行哪一步操作。

我这里的真实场景是,需要判断第三方的token是否过期。如果过期,则获取token后,重新访问第三方服务。拦截器简易代码如下:

@Component
public class TokenRefreshInterceptor implements ClientHttpRequestInterceptor {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private TokenServer tokenServer;

    @Override
    public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution execution) throws IOException {
        logger.info("进入拦截器TokenRefreshInterceptor。");
        ClientHttpResponse response = execution.execute(httpRequest, bytes);
        //读取网络inputStream
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(response.getBody()));
        StringBuilder builder = new StringBuilder();
        String temp;
        while ((temp = bufferedReader.readLine()) != null){
            builder.append(temp);
        }

        logger.info("TokenRefresh拦截器,收到response。httpRequest={}, response={}, body={}",httpRequest,response,builder.toString());

        //如果过期,重新生成token
        if(isTokenExpired(builder.toString())){
            //重新设置token请求头,并请求
            httpRequest.getHeaders().set("X-Access-Token",refreshToken);
            return execution.execute(httpRequest,bytes);
        }
        //把response返回给业务代码,继续操作
        return response;
    }

 

在上面代码中,已经对网络型的inputStream进行了读操作。所以在后续的业务代码中获取responseBody为null。

通过查阅资料得知:inputStream的read操作,底层实现是通过控制stream上的指针,来读取数据的。由于在拦截器中已经read过数据,指针已经处于stream的末端,业务代码中就无法再读取到数据了。

# 三、尝试解决1

既然问题找到了,那么重新改变stream上的指针位置即可解决。于是查阅资料,发现可以通过mark(int readLimit)、reset()方法解决。

修改代码如下:

....
ClientHttpResponse response = execution.execute(httpRequest, bytes);
InputStream bodyStream = response.getBody();
//标记位置
bodyStream.mark(bodyStream.available());
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(bodyStream));
StringBuilder builder = new StringBuilder();
String temp;
while ((temp = bufferedReader.readLine()) != null){
    builder.append(temp);
}
//重置指针
bodyStream.reset();
....

 

测试过程中报错如下:

Caused by: java.io.IOException: mark/reset not supported
    at java.io.InputStream.reset(InputStream.java:348)
    at java.io.FilterInputStream.reset(FilterInputStream.java:226)
    at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.reset(HttpURLConnection.java:3408)
    at cn.thecover.job.interceptor.TokenRefreshInterceptor.intercept(TokenRefreshInterceptor.java:43)
    at org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:92)
    at org.springframework.http.client.InterceptingClientHttpRequest.executeInternal(InterceptingClientHttpRequest.java:76)
    at org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48)
    at org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53)
    at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:735)

 

通过查阅资料和实践 得出结论:网络型inputStream不支持mark/reset

# 四、尝试解决2

在查阅资料的过程中,有文章说针对不支持mark/reset的inputStream,可以在外层封装BufferedInputStream。

于是修改代码:

...
//使用bufferedReader进行封装
BufferedInputStream bufferedInputStream = new BufferedInputStream(bodyStream);
bufferedInputStream.mark(bodyStream.available());
StringBuilder builder = new StringBuilder();
String temp;
while ((temp = bufferedReader.readLine()) != null){
    builder.append(temp);
}
//使用bufferedReader重置指针
bufferedInputStream.reset();
...

话外音:当时觉得不管怎么封装,其本质还是对网络型的inputStream的读写,真的能够成功?

抱着疑惑测试了一下,业务代码中获取的body果然还是为null。

# 五:尝试解决3

拦截器中复写方法的返回值为ClientHttpResponse,这是个接口。那么返回的实例哪一个呢?写demo进行debug。

进入SimpleClientHttpResponse源码

final class SimpleClientHttpResponse extends AbstractClientHttpResponse {

    private final HttpURLConnection connection;

    @Nullable
    private HttpHeaders headers;

    @Nullable
    private InputStream responseStream;


    SimpleClientHttpResponse(HttpURLConnection connection) {
        this.connection = connection;
    }


    @Override
    public int getRawStatusCode() throws IOException {
        return this.connection.getResponseCode();
    }

    @Override
    public String getStatusText() throws IOException {
        String result = this.connection.getResponseMessage();
        return (result != null) ? result : "";
    }

    @Override
    public HttpHeaders getHeaders() {
        if (this.headers == null) {
            this.headers = new HttpHeaders();
            // Header field 0 is the status line for most HttpURLConnections, but not on GAE
            String name = this.connection.getHeaderFieldKey(0);
            if (StringUtils.hasLength(name)) {
                this.headers.add(name, this.connection.getHeaderField(0));
            }
            int i = 1;
            while (true) {
                name = this.connection.getHeaderFieldKey(i);
                if (!StringUtils.hasLength(name)) {
                    break;
                }
                this.headers.add(name, this.connection.getHeaderField(i));
                i++;
            }
        }
        return this.headers;
    }

    @Override
    public InputStream getBody() throws IOException {
        InputStream errorStream = this.connection.getErrorStream();
        this.responseStream = (errorStream != null ? errorStream : this.connection.getInputStream());
        return this.responseStream;
    }

    @Override
    public void close() {
        try {
            if (this.responseStream == null) {
                getBody();
            }
            StreamUtils.drain(this.responseStream);
            this.responseStream.close();
        }
        catch (Exception ex) {
            // ignore
        }
    }

}

发现其中封装了HttpURLConnection、HttpHeaders、InputStream。

同时复写了方法:getRawStatusCode、getStatusText、getHeaders、getBody、close。

由于我这里主要是获取responseBody,故仔细阅读getBody(), 发现是通过connection来读取的。(关于restTemplate如何对inputStream进行读取操作,并转换为对应的body这里不做展开,可自行阅读源码)

但是connection在拦截器中已经使用过了,并且这里封装的inputStream在拦截器中也已经读取过了。我第一反应是偷梁换柱 把封装的inputStream给改了。但是这个类中又没有针对inputStream的set方法。

于是准备自定义个response来继承SimpleClientHttpResponse,然后改写其getBody()。

但是当我准备继承这个SimpleClientHttpResponse的时候,发现它是用final修饰的,无法进行继承。于是退而求其次:继承父类AbstractClientHttpResponse。

public class MyResponse extends AbstractClientHttpResponse {

    private Integer rawStatusCode;
    private String statusText;
    private InputStream body;
    private HttpHeaders headers;


    public MyResponse(Integer rawStatusCode, String statusText, InputStream body, HttpHeaders headers/*, String bodyString*/) {
        this.rawStatusCode = rawStatusCode;
        this.statusText = statusText;
        this.body = body;
        this.headers = headers;
    }

    @Override
    public int getRawStatusCode() throws IOException {
        return rawStatusCode;
    }

    @Override
    public String getStatusText() throws IOException {
        return statusText;
    }

    @Override
    public void close() {
        try {
            this.body.close();
        }
        catch (Exception ex) {
            // ignore
        }
    }

    @Override
    public InputStream getBody() throws IOException {
        return body;
    }

    @Override
    public HttpHeaders getHeaders() {
        return headers;
    }

}

为了不影响其他方法(需要复写的其他方法)。
这个类通过构造方法,约束rawStatusCode、statusText、headers的返回值。

最重要的getBody()同样通过构造方法传入,不过在这里,我们就可以进行偷梁换柱 将inputStream改写为我们需要的inputStream。

以下为重新改写的拦截器代码:

ClientHttpResponse response = execution.execute(httpRequest, bytes);
InputStream bodyStream = response.getBody();

BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(bodyStream));
StringBuilder builder = new StringBuilder();
String temp;
while ((temp = bufferedReader.readLine()) != null){
    builder.append(temp);
}

logger.info("TokenRefresh拦截器,收到response。httpRequest={}, response={}, body={}",httpRequest,response,builder.toString());

ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(builder.toString().getBytes(StandardCharsets.UTF_8));
//通过构造函数传入SimpleClientHttpResponse中的相关结果,保证数据统一
//使用ByteArrayInputStream进行偷梁换柱
return newMyResponse(response.getRawStatusCode()response.getStatusText(),byteArrayInpuStream,response.getHeaders());

 

业务代码获得body如下:

 

    

本文地址:https://blog.csdn.net/qq_34019552/article/details/110855319

《解决网络型inputStream无法反复read的问题。mark/reset not supported.doc》

下载本文的Word格式文档,以方便收藏与打印。