同步阻塞代码,http 请求返回响应才继续执行。
1.基于 Reactor 和 Netty。
2.响应式 web 客户端。异步执行不阻塞代码,少量的线程数处理高并发的 Http 请求。
3.集成 Spring WebFlux 框架,可与其他 Spring 组件无缝协作。
4.可通过自定义 ExchangeFilterFunction 对请求和响应进行拦截和处理。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
HTTP 底库
// 选择 HTTP 底库; 默认底层用Netty,切换Jetty。
WebClient
.builder()
.clientConnector(new JettyClientHttpConnector())
.build();
全局的请求配置
// 设置基础的全局的web请求配置,如cookie、header、baseUrl。
WebClient
.builder()
.defaultCookie("kl","kl")
.defaultUriVariables(ImmutableMap.of("name","kl"))
.defaultHeader("header","kl")
.defaultHeaders(httpHeaders -> {
httpHeaders.add("header1","kl");
httpHeaders.add("header2","kl");
})
.defaultCookies(cookie ->{
cookie.add("cookie1","kl");
cookie.add("cookie2","kl");
})
.baseUrl("http://www.kailing.pub")
.build();
Filter
// Filter 过滤器,统一修改拦截请求。
WebClient
.builder()
.baseUrl("http://www.kailing.pub")
.filter((request, next) -> {
ClientRequest filtered = ClientRequest.from(request)
.header("foo", "bar")
.build();
return next.exchange(filtered);
})
.filters(filters ->{
filters.add(ExchangeFilterFunctions.basicAuthentication("username","password"));
filters.add(ExchangeFilterFunctions.limitResponseSize(800));
})
.build().get()
.uri("/article/index/arcid/{id}.html", 254)
.retrieve()
.bodyToMono(String.class)
.subscribe(System.err::println);
Netty 库配置
// 配置动态连接池 ConnectionProvider provider = ConnectionProvider.elastic("elastic pool"); 配置固定大小连接池,如最大连接数、连接获取超时、空闲连接死亡时间等
ConnectionProvider provider = ConnectionProvider.fixed("fixed", 45, 4000, Duration.ofSeconds(6));
HttpClient httpClient = HttpClient.create(provider)
.secure(sslContextSpec -> {
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient().trustManager(new File("E://server.truststore"));
sslContextSpec.sslContext(sslContextBuilder);
}).tcpConfiguration(tcpClient -> {
// 指定Netty的 select 和 work 线程数量
LoopResources loop = LoopResources.create("kl-event-loop", 1, 4, true);
return tcpClient.doOnConnected(connection -> {
// 读写超时设置
connection
.addHandlerLast(new ReadTimeoutHandler(10, TimeUnit.SECONDS))
.addHandlerLast(new WriteTimeoutHandler(10));
})
// 连接超时设置
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.option(ChannelOption.TCP_NODELAY, true)
.runOn(loop);
});
WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
uriBuilderFactory:自定义UriBuilderFactory用作基本URL(BaseUrl)。
defaultHeader:每个请求的标题。
defaultCookie:针对每个请求的Cookie。
defaultRequest:Consumer自定义每个请求。
filter:针对每个请求的客户端过滤器。
exchangeStrategies:HTTP消息读取器/写入器定制。
clientConnector:HTTP客户端库设置。
.block()
阻塞当前程序等待结果
.retrieve()
直接获取响应body
.exchange()
可访问整个ClientResponse
// 简单传参。默认异步,通过 Mono 的 subscribe 订阅响应值。block() 阻塞当前线程获取返回值。
WebClient client = WebClient.create("http://www.kailing.pub");
Mono<String> result = client.get()
.uri("/article/arcid/{id}", 256)
.acceptCharset(StandardCharsets.UTF_8)
.accept(MediaType.TEXT_HTML)
.retrieve() // 同步
.bodyToMono(String.class);
result.subscribe(System.err::println);
// 复杂传参 — MultiValueMap。默认异步,通过 Mono 的 subscribe 订阅响应值。block() 阻塞当前线程获取返回值。
MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
params.add("name", "kl");
params.add("age", "19");
// 定义 url 参数
Map<String, Object> uriVariables = new HashMap<>();
uriVariables.put("id", 200);
String uri = UriComponentsBuilder.fromUriString("/article/arcid/{id}")
.queryParams(params)
.uriVariables(uriVariables)
.toUriString();
Mono<String> result = client.get()
.uri(uri)
.acceptCharset(StandardCharsets.UTF_8)
.accept(MediaType.TEXT_HTML)
.retrieve()
.bodyToMono(String.class);
result.subscribe(System.err::println);
// 复杂传参 — UriBuilder。默认异步,通过 Mono 的 subscribe 订阅响应值。block() 阻塞当前线程获取返回值。
Mono<String> resp = WebClient.create()
.get()
.uri(uriBuilder -> uriBuilder
.scheme("http")
.host("www.baidu.com")
.path("/s")
.queryParam("wd", "北京天气")
.queryParam("other", "test")
.build())
.retrieve()
.bodyToMono(String.class);
// 表单 默认异步,通过 Mono 的 subscribe 订阅响应值。block() 阻塞当前线程获取返回值。
MultiValueMap<String, String> formData = new LinkedMultiValueMap<>();
formData.add("name1","value1");
formData.add("name2","value2");
Mono<String> resp = WebClient.create().post()
.uri("http://www.w3school.com.cn/test/demo_form.asp")
.contentType(MediaType.APPLICATION_FORM_URLENCODED)
.body(BodyInserters.fromFormData(formData))
.retrieve().bodyToMono(String.class);
LOGGER.info("result:{}",resp.block());
// FormInserter 表单:参数、文件。默认异步,通过 Mono 的 subscribe 订阅响应值。block() 阻塞当前线程获取返回值。
WebClient client = WebClient.create("http://www.kailing.pub");
FormInserter formInserter = fromMultipartData("name","kl")
.with("age",19)
.with("map",ImmutableMap.of("xx","xx"))
.with("file",new File("C://xxx.doc"));
Mono<String> result = client.post()
.uri("/article/index/arcid/{id}.html", 256)
.contentType(MediaType.APPLICATION_JSON)
.body(formInserter)
//.bodyValue(ImmutableMap.of("name","kl"))
.retrieve()
.bodyToMono(String.class);
result.subscribe(System.err::println);
// json — 实体类。默认异步,通过 Mono 的 subscribe 订阅响应值。block() 阻塞当前线程获取返回值。
User user = new User();
user.setName("aaa");
user.setTitle("AAAAAA");
Mono<String> resp = WebClient.create()
.post()
.uri("http://localhost:8080/demo/json")
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(Mono.just(user),User.class)
.retrieve().bodyToMono(String.class);
System.out.println("---resp.block(): "+resp.block());
// json — raw。默认异步,通过 Mono 的 subscribe 订阅响应值。block() 阻塞当前线程获取返回值。
Mono<String> resp = WebClient.create()
.post()
.uri("http://localhost:8080/demo/json")
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(BodyInserters.fromObject("{\n" +
" \"title\" : \"this is title\",\n" +
" \"author\" : \"this is author\"\n" +
"}"))
.retrieve().bodyToMono(String.class);
System.out.println("---resp.block(): "+resp.block());
// 二进制上传文件
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.IMAGE_PNG);
HttpEntity<ClassPathResource> entity = new HttpEntity<>(new ClassPathResource("parallel.png"), headers);
MultiValueMap<String, Object> parts = new LinkedMultiValueMap<>();
parts.add("file", entity);
Mono<String> resp = WebClient.create()
.post()
.uri("http://localhost:8080/upload")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(parts))
.retrieve().bodyToMono(String.class);
System.out.println("---resp.block(): "+resp.block());
WebSocketClient?client?=?new?ReactorNettyWebSocketClient();
URI?url?=?new?URI("ws://localhost:8080/path");
client.execute(url,?session?->session.receive().doOnNext(System.out::println).then());
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
/**
*
*/
public class WebClientUtils {
private WebClient webClient;
public WebClientUtils(String baseUrl) {
this.webClient = WebClient.builder()
.baseUrl(baseUrl)
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.filter(logRequest())
.build();
}
public <T> Mono<T> get(String uri, Class<T> responseType) {
return webClient.get()
.uri(uri)
.retrieve()
.bodyToMono(responseType);
}
public <T> Mono<T> post(String uri, Object request, Class<T> responseType) {
return webClient.post()
.uri(uri)
.body(BodyInserters.fromValue(request))
.retrieve()
.bodyToMono(responseType);
}
public <T> Mono<T> put(String uri, Object request, Class<T> responseType) {
return webClient.put()
.uri(uri)
.body(BodyInserters.fromValue(request))
.retrieve()
.bodyToMono(responseType);
}
public <T> Mono<T> delete(String uri, Class<T> responseType) {
return webClient.delete()
.uri(uri)
.retrieve()
.bodyToMono(responseType);
}
private ExchangeFilterFunction logRequest() {
return (clientRequest, next) -> {
logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
clientRequest.headers().forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
return next.exchange(clientRequest);
};
}
}
public class Test001 {
public static void main(String[] args) {
WebClientUtils webClientUtils = new WebClientUtils("https://api.example.com");
// 发起 GET 请求
webClientUtils.get("/users/1", User.class).subscribe(user -> System.out.println("GET response: " + user));
// 发起 POST 请求
User newUser = new User("John", "Doe");
webClientUtils.post("/users", newUser, User.class).subscribe(user -> System.out.println("POST response: " + user));
// 发起 PUT 请求
User updatedUser = new User("Jane", "Doe");
webClientUtils.put("/users/1", updatedUser, User.class).subscribe(user -> System.out.println("PUT response: " + user));
// 发起 DELETE 请求
webClientUtils.delete("/users/1", Void.class).subscribe(response -> System.out.println("DELETE response: " + response));
}
}