在当今的软件开发中,高效的网络通信是至关重要的。本文将介绍一种基于高性能底层 Netty 的 Vertx 框架的简单 HTTP 请求封装方法,它能够承载更多的网络 IO 请求,为我们的应用程序提供更强大的网络通信能力。
随着互联网的发展,应用程序对网络通信的要求越来越高。传统的同步阻塞式 IO 模型在处理大量并发请求时往往会出现性能瓶颈,虽然有些支持异步获取结果的框架,但是在高并发情况负载较高的情况下也会出现一些问题。而 Vertx 框架基于 Netty 实现,采用异步非阻塞的 IO 模型,能够有效地处理大量并发连接,提高系统的吞吐量和响应速度,之前在系统中使用的okhttp同步/异步模式,但在系统负载较大的情况下基于 Netty 底层的http客户端表现更为出色,当然还有其他底层是Netty的http请求框架,这里就不多介绍了,主要介绍的是Vertx框架。 官网介绍:vertx.io/docs/vertx-…
二、使用示例
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>4.5.10</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-client</artifactId>
<version>4.5.10</version>
</dependency>
构建全局的 Vertx:通过配置VertxOptions参数,调整事件循环线程池大小,以适应阻塞型 IO 的需求。
同时,创建HttpClientOptions配置客户端的连接超时、读超时、写超时等参数,并使用WebClient.wrap方法创建WebClient,解决配置不生效的问题。
GET 请求封装:接收请求地址、请求头和请求参数,构建请求 URL,设置请求头,发送 GET 请求。在请求成功时,检查 HTTP 响应状态码,获取响应体并封装成VertxResponse对象返回。在请求失败时,抛出异常。
POST 请求封装:接收请求地址、请求头、请求体和请求体类型,设置请求头和请求体类型,发送 POST 请求。在请求成功时,检查 HTTP 响应状态码,获取响应体并封装成VertxResponse对象返回。在请求失败时,抛出异常。
请求体转换为 Buffer:根据请求体的类型,将其转换为io.vertx.core.buffer.Buffer对象,以便发送请求。
构建请求头:接收请求头参数,创建HeadersMultiMap对象,添加默认的User-Agent,并将传入的请求头添加到对象中。
构建请求 URL:接收请求地址和请求参数,将请求参数拼接成查询字符串,并添加到请求地址中。
以下是使用 Java 实现的基于 Vertx 框架的简单 HTTP 请求封装代码,没有完全的测试哈,生产环境不建议使用,只建议学习探索。
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
import io.vertx.core.http.impl.headers.HeadersMultiMap;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SimpleHttpClient {
// 构建全局的Vertx
private static WebClient client ;
//全局超时时间
private static final long TimeOut = 3000L;
static {
//配置vertxOptions参数
VertxOptions vertxOptions = new VertxOptions();
//The default number of event loop threads to be used = 2 * number of cores on the machine
//public static final int DEFAULT_EVENT_LOOP_POOL_SIZE = 2 * CpuCoreSensor.availableProcessors();
vertxOptions.setEventLoopPoolSize(100); //默认是cpu核心数*2,
Vertx vertx = Vertx.vertx(vertxOptions);
//配置客户端的配置信息
HttpClientOptions options = new HttpClientOptions();
options.setConnectTimeout(2500); // 设置连接时间
options.setReadIdleTimeout(2500); //设置读时间
options.setWriteIdleTimeout(2500); //设置写超时时间
options.setName("Vertx");
options.setDecompressionSupported(true); //压缩支持
options.setPoolEventLoopSize(350); //设置事件循环事件池大小
options.setMaxPoolSize(600);//如果是网络阻塞型IO这里我们可以适当的调整pool线程数,这里设置个600。
options.setMaxWaitQueueSize(15000); //最大的队列大小,这个队列不宜设置过大,目前演示1.2万请求专门这么设置的。
HttpClient vertxHttpClient = vertx.createHttpClient(options);
//使用wrap方法创建WebClient,解决WebClientOptions配置setPoolEventLoopSize不生效的问题。
client = WebClient.wrap(vertxHttpClient);
}
/**
* GET请求
*
* @param url 请求地址
* @param headers 请求头
* @param params 请求参数
* @return
*/
public static VertxResponse get(String url, Map<String, String> headers, Map<String, Object> params) throws ExecutionException, InterruptedException, TimeoutException {
final String buildUrl = buildUrlByParams(url, params);
HeadersMultiMap headersMultiMap = buildHeaders(headers);
CompletableFuture<VertxResponse> future = new CompletableFuture<>();
Number startTime = System.currentTimeMillis();
client.getAbs(buildUrl)
.putHeaders(headersMultiMap)
.send()
.onSuccess(response -> {
HttpResponse<Buffer> httpResponse = response;
Number endTime = System.currentTimeMillis();
Number elapsedTime = (endTime.longValue() - startTime.longValue());
// 检查 HTTP 响应状态码
if (httpResponse.statusCode() >= 400) {
future.completeExceptionally(new RuntimeException("HTTP Error: " + httpResponse.statusCode()));
} else {
// 获取响应体
String responseBody = httpResponse.body() == null ? "" : httpResponse.body().toString();
VertxResponse vertxResponseObj = new VertxResponse(httpResponse.statusCode(), responseBody, httpResponse.headers(), elapsedTime.longValue());
future.complete(vertxResponseObj);
}
})
.onFailure(err -> {
future.completeExceptionally(err);
});
VertxResponse vertxResponse = future.get(TimeOut, TimeUnit.MILLISECONDS);
return vertxResponse;
}
/**
* POST请求
*
* @param url 请求地址
* @param headers 请求头
* @param requestBody 请求体
* @param contentType 请求体类型
* @return
*/
public static VertxResponse post(String url, Map<String, String> headers, Object requestBody, String contentType) throws ExecutionException, InterruptedException, TimeoutException {
HeadersMultiMap headersMultiMap = buildHeaders(headers);
CompletableFuture<VertxResponse> future = new CompletableFuture<>();
Number startTime = System.currentTimeMillis();
client.postAbs(url)
.putHeaders(headersMultiMap)
.putHeader("Content-Type", contentType)
.sendBuffer(convertRequestBodyToBuffer(requestBody), response -> {
if (response.succeeded()) {
HttpResponse<Buffer> httpResponse = response.result();
Number endTime = System.currentTimeMillis();
Number elapsedTime = (endTime.longValue() - startTime.longValue());
// 检查 HTTP 响应状态码
if (httpResponse.statusCode() >= 400) {
future.completeExceptionally(new RuntimeException("HTTP Error: " + httpResponse.statusCode()));
} else {
// 获取响应体
String responseBody = httpResponse.body() == null ? "" : httpResponse.body().toString();
VertxResponse vertxResponseObj = new VertxResponse(httpResponse.statusCode(), responseBody, httpResponse.headers(), elapsedTime.longValue());
future.complete(vertxResponseObj);
}
} else {
future.completeExceptionally(response.cause());
}
});
VertxResponse vertxResponse = future.get(TimeOut, TimeUnit.MILLISECONDS);
return vertxResponse;
}
/**
* 将请求体转换为Buffer
*
* @param requestBody
* @return
*/
private static io.vertx.core.buffer.Buffer convertRequestBodyToBuffer(Object requestBody) {
if (requestBody instanceof String) {
return io.vertx.core.buffer.Buffer.buffer((String) requestBody, "UTF-8");
} else if (requestBody instanceof byte[]) {
return io.vertx.core.buffer.Buffer.buffer((byte[]) requestBody);
} else if (requestBody instanceof JsonObject) {
return io.vertx.core.buffer.Buffer.buffer(((JsonObject) requestBody).encodePrettily());
} else {
return io.vertx.core.buffer.Buffer.buffer();
}
}
/**
* 构建请求头
*
* @param headers
* @return
*/
private static HeadersMultiMap buildHeaders(Map<String, String> headers) {
HeadersMultiMap headersMultiMap = new HeadersMultiMap();
if (Objects.isNull(headers)) {
headers = new HashMap<>();
}
headersMultiMap.addAll(headers);
headersMultiMap.add("User-Agent", "SimpleHttpClient");
return headersMultiMap;
}
/**
* 构建请求URL
*
* @param url
* @param params
* @return
*/
private static String buildUrlByParams(String url, Map<String, Object> params) {
if (Objects.nonNull(params) && !params.isEmpty()) {
StringBuilder queryParams = new StringBuilder();
if (params != null && !params.isEmpty()) {
params.forEach((key, value) -> {
if (queryParams.length() > 0) {
queryParams.append("&");
}
queryParams.append(key).append("=").append(value);
});
if (queryParams.length() > 0) {
url += "?" + queryParams;
}
}
}
return url;
}
static class VertxResponse {
private int statusCode;
private String body;
private MultiMap headers;
private long elapsedTime;
public VertxResponse(int statusCode, String body, MultiMap headers, long elapsedTime) {
this.elapsedTime = elapsedTime;
this.statusCode = statusCode;
this.headers = headers;
this.body = body;
}
public int getStatusCode() {
return statusCode;
}
public String getBody() {
return body;
}
public MultiMap getHeaders() {
return headers;
}
public void setStatusCode(int statusCode) {
this.statusCode = statusCode;
}
public void setBody(String body) {
this.body = body;
}
public void setHeaders(MultiMap headers) {
this.headers = headers;
}
public long getElapsedTime() {
return elapsedTime;
}
public void setElapsedTime(long elapsedTime) {
this.elapsedTime = elapsedTime;
}
@Override
public String toString() {
return "VertxResponse{" +
"statusCode=" + statusCode +
", body='" + body + ''' +
", headers=" + headers +
", elapsedTime=" + elapsedTime +
'}';
}
}
}
以下是使用封装好的SimpleHttpClient
类发送 GET 和 POST 请求的示例代码,我们模拟向获取时间的api发送请求。
// 发送 GET 请求
Map<String, String> headers = new HashMap<>();
headers.put("Authorization", "Bearer your_token");
Map<String, Object> params = new HashMap<>();
params.put("param1", "value1");
params.put("param2", "value2");
SimpleHttpClient.VertxResponse getResponse = SimpleHttpClient.get("https://vv.video.qq.com/checktime?otype=json", headers, params);
System.out.println("GET Response: " + getResponse);
// 发送 POST 请求
headers = new HashMap<>();
headers.put("Content-Type", "application/json");
headers.put("Authorization", "Bearer your_token");
String requestBody = "{"key":"value"}";
SimpleHttpClient.VertxResponse postResponse = SimpleHttpClient.post("https://vv.video.qq.com/checktime?otype=json", headers, requestBody, "application/json");
System.out.println("POST Response: " + postResponse);
模拟主线程600并发,子线程20并发,总共12000请求网络IO表现,1.7s获取所有的结果,比之前使用异步编排+okhttp同步请求快了2-5S。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class TestExample {
java 代码解读复制代码private static final int INITIAL_CORE_POOL_SIZE = 500;
private static final int INITIAL_MAX_POOL_SIZE = 500;
private static final long KEEP_ALIVE_TIME = 0L;
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
private static final BlockingQueue<Runnable> WORK_QUEUE = new LinkedBlockingQueue<>(12000);
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(
INITIAL_CORE_POOL_SIZE,
INITIAL_MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TIME_UNIT,
WORK_QUEUE,
new MyNamedThreadFactory()
);
public static void main(String[] args) throws InterruptedException {
//主线程600并发,子线程20并发,总共12000请求网络IO表现
simulateLoadVariation();
}
private static void simulateLoadVariation() throws InterruptedException {
int numMainTasks = 600; // 主线程数量
// 创建一个 CompletableFuture 来异步启动所有任务
CompletableFuture<Void> allTasksFuture = CompletableFuture.allOf(
IntStream.range(0, numMainTasks)
.mapToObj(i -> CompletableFuture.runAsync(DynamicThreadPoolExample::runConcurrentTest1))
.toArray(CompletableFuture[]::new)
);
// 等待所有任务完成
allTasksFuture.join();
}
private static void runConcurrentTest1(){
List<String> list = new ArrayList<>();
IntStream.range(0, 20).forEach(item -> list.add("111"));
//模拟子线程20个并发
List<CompletableFuture<String>> decoratedFutures = list.stream()
.map(item -> CompletableFuture.supplyAsync(() -> {
try {
SimpleHttpClient.VertxResponse futureResponse = SimpleHttpClient.get("http://vv.video.qq.com/checktime?otype=json", null, null);
System.out.println("response:" + Thread.currentThread().getName() + ":" + futureResponse.getBody());
return "0";
} catch (Exception e) {
e.printStackTrace();
return "-1";
}
}, executor))
.collect(Collectors.toList());
}
static class MyNamedThreadFactory implements ThreadFactory {
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
@Override
public Thread newThread(Runnable r) {
Thread thread = defaultFactory.newThread(r);
thread.setName("MyNamedThread-" + thread.getId());
return thread;
}
}
}
通过对 Vertx 框架的简单封装,我们实现了一个高性能的 HTTP 请求工具类。它能够有效地处理大量并发请求,提高系统的吞吐量和响应速度。在实际应用中,我们可以根据具体需求对代码进行进一步优化和扩展,以满足不同场景下的网络通信需求。
文章地址: https://blog.golong.uk/archives/java/36.html
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。