前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Vertx-client框架-高性能 HTTP 请求框架

Vertx-client框架-高性能 HTTP 请求框架

原创
作者头像
用户5271782
发布2024-11-14 20:02:58
140
发布2024-11-14 20:02:58

在当今的软件开发中,高效的网络通信是至关重要的。本文将介绍一种基于高性能底层 Netty 的 Vertx 框架的简单 HTTP 请求封装方法,它能够承载更多的网络 IO 请求,为我们的应用程序提供更强大的网络通信能力。

一、背景介绍

随着互联网的发展,应用程序对网络通信的要求越来越高。传统的同步阻塞式 IO 模型在处理大量并发请求时往往会出现性能瓶颈,虽然有些支持异步获取结果的框架,但是在高并发情况负载较高的情况下也会出现一些问题。而 Vertx 框架基于 Netty 实现,采用异步非阻塞的 IO 模型,能够有效地处理大量并发连接,提高系统的吞吐量和响应速度,之前在系统中使用的okhttp同步/异步模式,但在系统负载较大的情况下基于 Netty 底层的http客户端表现更为出色,当然还有其他底层是Netty的http请求框架,这里就不多介绍了,主要介绍的是Vertx框架。 官网介绍:vertx.io/docs/vertx-…

二、使用示例

代码语言:javascript
复制
<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-core</artifactId>
    <version>4.5.10</version>
</dependency>
<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-web-client</artifactId>
    <version>4.5.10</version>
</dependency>


构建全局的 Vertx:通过配置VertxOptions参数,调整事件循环线程池大小,以适应阻塞型 IO 的需求。
同时,创建HttpClientOptions配置客户端的连接超时、读超时、写超时等参数,并使用WebClient.wrap方法创建WebClient,解决配置不生效的问题。
GET 请求封装:接收请求地址、请求头和请求参数,构建请求 URL,设置请求头,发送 GET 请求。在请求成功时,检查 HTTP 响应状态码,获取响应体并封装成VertxResponse对象返回。在请求失败时,抛出异常。
POST 请求封装:接收请求地址、请求头、请求体和请求体类型,设置请求头和请求体类型,发送 POST 请求。在请求成功时,检查 HTTP 响应状态码,获取响应体并封装成VertxResponse对象返回。在请求失败时,抛出异常。
请求体转换为 Buffer:根据请求体的类型,将其转换为io.vertx.core.buffer.Buffer对象,以便发送请求。
构建请求头:接收请求头参数,创建HeadersMultiMap对象,添加默认的User-Agent,并将传入的请求头添加到对象中。
构建请求 URL:接收请求地址和请求参数,将请求参数拼接成查询字符串,并添加到请求地址中。

三、代码实现

以下是使用 Java 实现的基于 Vertx 框架的简单 HTTP 请求封装代码,没有完全的测试哈,生产环境不建议使用,只建议学习探索。

代码语言:javascript
复制
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
import io.vertx.core.http.impl.headers.HeadersMultiMap;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SimpleHttpClient {
    // 构建全局的Vertx
    private static WebClient client ;
    //全局超时时间
    private static final long TimeOut = 3000L;
    static {
        //配置vertxOptions参数
        VertxOptions vertxOptions = new VertxOptions();
        //The default number of event loop threads to be used  = 2 * number of cores on the machine
        //public static final int DEFAULT_EVENT_LOOP_POOL_SIZE = 2 * CpuCoreSensor.availableProcessors();
        vertxOptions.setEventLoopPoolSize(100); //默认是cpu核心数*2,
        Vertx vertx = Vertx.vertx(vertxOptions);

        //配置客户端的配置信息
        HttpClientOptions options = new HttpClientOptions();
        options.setConnectTimeout(2500); // 设置连接时间
        options.setReadIdleTimeout(2500); //设置读时间
        options.setWriteIdleTimeout(2500); //设置写超时时间
        options.setName("Vertx");
        options.setDecompressionSupported(true); //压缩支持
        options.setPoolEventLoopSize(350); //设置事件循环事件池大小 
        options.setMaxPoolSize(600);//如果是网络阻塞型IO这里我们可以适当的调整pool线程数,这里设置个600。
        options.setMaxWaitQueueSize(15000); //最大的队列大小,这个队列不宜设置过大,目前演示1.2万请求专门这么设置的。
        HttpClient vertxHttpClient = vertx.createHttpClient(options);

        //使用wrap方法创建WebClient,解决WebClientOptions配置setPoolEventLoopSize不生效的问题。
        client = WebClient.wrap(vertxHttpClient);

    }
    /**
     * GET请求
     *
     * @param url       请求地址
     * @param headers   请求头
     * @param params    请求参数
     * @return
     */
    public static VertxResponse get(String url, Map<String, String> headers, Map<String, Object> params) throws ExecutionException, InterruptedException, TimeoutException {
        final String buildUrl = buildUrlByParams(url, params);
        HeadersMultiMap headersMultiMap = buildHeaders(headers);
        CompletableFuture<VertxResponse> future = new CompletableFuture<>();
        Number startTime = System.currentTimeMillis();
        client.getAbs(buildUrl)
                .putHeaders(headersMultiMap)
                .send()
                .onSuccess(response -> {
                    HttpResponse<Buffer> httpResponse = response;
                    Number endTime = System.currentTimeMillis();
                    Number elapsedTime = (endTime.longValue() - startTime.longValue());
                    // 检查 HTTP 响应状态码
                    if (httpResponse.statusCode() >= 400) {
                        future.completeExceptionally(new RuntimeException("HTTP Error: " + httpResponse.statusCode()));
                    } else {
                        // 获取响应体
                        String responseBody = httpResponse.body() == null ? "" : httpResponse.body().toString();
                        VertxResponse vertxResponseObj = new VertxResponse(httpResponse.statusCode(), responseBody, httpResponse.headers(), elapsedTime.longValue());
                        future.complete(vertxResponseObj);
                    }
                })
                .onFailure(err -> {
                    future.completeExceptionally(err);
                });
        VertxResponse vertxResponse = future.get(TimeOut, TimeUnit.MILLISECONDS);
        return vertxResponse;
    }
    /**
     * POST请求
     *
     * @param url          请求地址
     * @param headers      请求头
     * @param requestBody  请求体
     * @param contentType  请求体类型
     * @return
     */
    public static VertxResponse post(String url, Map<String, String> headers, Object requestBody, String contentType) throws ExecutionException, InterruptedException, TimeoutException {
        HeadersMultiMap headersMultiMap = buildHeaders(headers);
        CompletableFuture<VertxResponse> future = new CompletableFuture<>();
        Number startTime = System.currentTimeMillis();
        client.postAbs(url)
                .putHeaders(headersMultiMap)
                .putHeader("Content-Type", contentType)
                .sendBuffer(convertRequestBodyToBuffer(requestBody), response -> {
                    if (response.succeeded()) {
                        HttpResponse<Buffer> httpResponse = response.result();
                        Number endTime = System.currentTimeMillis();
                        Number elapsedTime = (endTime.longValue() - startTime.longValue());
                        // 检查 HTTP 响应状态码
                        if (httpResponse.statusCode() >= 400) {
                            future.completeExceptionally(new RuntimeException("HTTP Error: " + httpResponse.statusCode()));
                        } else {
                            // 获取响应体
                            String responseBody = httpResponse.body() == null ? "" : httpResponse.body().toString();
                            VertxResponse vertxResponseObj = new VertxResponse(httpResponse.statusCode(), responseBody, httpResponse.headers(), elapsedTime.longValue());
                            future.complete(vertxResponseObj);
                        }
                    } else {
                        future.completeExceptionally(response.cause());
                    }
                });
        VertxResponse vertxResponse = future.get(TimeOut, TimeUnit.MILLISECONDS);
        return vertxResponse;
    }
    /**
     * 将请求体转换为Buffer
     *
     * @param requestBody
     * @return
     */
    private static io.vertx.core.buffer.Buffer convertRequestBodyToBuffer(Object requestBody) {
        if (requestBody instanceof String) {
            return io.vertx.core.buffer.Buffer.buffer((String) requestBody, "UTF-8");
        } else if (requestBody instanceof byte[]) {
            return io.vertx.core.buffer.Buffer.buffer((byte[]) requestBody);
        } else if (requestBody instanceof JsonObject) {
            return io.vertx.core.buffer.Buffer.buffer(((JsonObject) requestBody).encodePrettily());
        } else {
            return io.vertx.core.buffer.Buffer.buffer();
        }
    }


    /**
     * 构建请求头
     *
     * @param headers
     * @return
     */
    private static HeadersMultiMap buildHeaders(Map<String, String> headers) {
        HeadersMultiMap headersMultiMap = new HeadersMultiMap();
        if (Objects.isNull(headers)) {
            headers = new HashMap<>();
        }
        headersMultiMap.addAll(headers);
        headersMultiMap.add("User-Agent", "SimpleHttpClient");
        return headersMultiMap;

    }
    /**
     * 构建请求URL
     *
     * @param url
     * @param params
     * @return
     */
    private static String buildUrlByParams(String url, Map<String, Object> params) {
        if (Objects.nonNull(params) && !params.isEmpty()) {
            StringBuilder queryParams = new StringBuilder();
            if (params != null && !params.isEmpty()) {
                params.forEach((key, value) -> {
                    if (queryParams.length() > 0) {
                        queryParams.append("&");
                    }
                    queryParams.append(key).append("=").append(value);
                });
                if (queryParams.length() > 0) {
                    url += "?" + queryParams;
                }
            }
        }
        return url;
    }


    static class VertxResponse {
        private int statusCode;
        private String body;
        private MultiMap headers;
        private long elapsedTime;

        public VertxResponse(int statusCode, String body, MultiMap headers, long elapsedTime) {
            this.elapsedTime = elapsedTime;
            this.statusCode = statusCode;
            this.headers = headers;
            this.body = body;
        }


        public int getStatusCode() {
            return statusCode;
        }

        public String getBody() {
            return body;
        }

        public MultiMap getHeaders() {
            return headers;
        }

        public void setStatusCode(int statusCode) {
            this.statusCode = statusCode;
        }

        public void setBody(String body) {
            this.body = body;
        }

        public void setHeaders(MultiMap headers) {
            this.headers = headers;
        }

        public long getElapsedTime() {
            return elapsedTime;
        }

        public void setElapsedTime(long elapsedTime) {
            this.elapsedTime = elapsedTime;
        }

        @Override
        public String toString() {
            return "VertxResponse{" +
                    "statusCode=" + statusCode +
                    ", body='" + body + ''' +
                    ", headers=" + headers +
                    ", elapsedTime=" + elapsedTime +
                    '}';
        }
    }

}

四、使用方法

以下是使用封装好的SimpleHttpClient类发送 GET 和 POST 请求的示例代码,我们模拟向获取时间的api发送请求。

代码语言:javascript
复制
     // 发送 GET 请求
        Map<String, String> headers = new HashMap<>();
        headers.put("Authorization", "Bearer your_token");
        Map<String, Object> params = new HashMap<>();
        params.put("param1", "value1");
        params.put("param2", "value2");
        SimpleHttpClient.VertxResponse getResponse = SimpleHttpClient.get("https://vv.video.qq.com/checktime?otype=json", headers, params);
        System.out.println("GET Response: " + getResponse);

        // 发送 POST 请求
        headers = new HashMap<>();
        headers.put("Content-Type", "application/json");
        headers.put("Authorization", "Bearer your_token");
        String requestBody = "{"key":"value"}";
        SimpleHttpClient.VertxResponse postResponse = SimpleHttpClient.post("https://vv.video.qq.com/checktime?otype=json", headers, requestBody, "application/json");
        System.out.println("POST Response: " + postResponse);
代码语言:java
复制
模拟主线程600并发,子线程20并发,总共12000请求网络IO表现,1.7s获取所有的结果,比之前使用异步编排+okhttp同步请求快了2-5S。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class TestExample {
java 代码解读复制代码private static final int INITIAL_CORE_POOL_SIZE = 500;
private static final int INITIAL_MAX_POOL_SIZE = 500;
private static final long KEEP_ALIVE_TIME = 0L;
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
private static final BlockingQueue<Runnable> WORK_QUEUE = new LinkedBlockingQueue<>(12000);

private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(
        INITIAL_CORE_POOL_SIZE,
        INITIAL_MAX_POOL_SIZE,
        KEEP_ALIVE_TIME,
        TIME_UNIT,
        WORK_QUEUE,
        new MyNamedThreadFactory()
);



public static void main(String[] args) throws InterruptedException {
     //主线程600并发,子线程20并发,总共12000请求网络IO表现
    simulateLoadVariation();
    }

private static void simulateLoadVariation() throws InterruptedException {
    int numMainTasks = 600; // 主线程数量
    // 创建一个 CompletableFuture 来异步启动所有任务
    CompletableFuture<Void> allTasksFuture = CompletableFuture.allOf(
            IntStream.range(0, numMainTasks)
                    .mapToObj(i -> CompletableFuture.runAsync(DynamicThreadPoolExample::runConcurrentTest1))
                    .toArray(CompletableFuture[]::new)
    );
    // 等待所有任务完成
    allTasksFuture.join();
}
private static void runConcurrentTest1(){

    List<String> list = new ArrayList<>();
    IntStream.range(0, 20).forEach(item -> list.add("111"));
    //模拟子线程20个并发
    List<CompletableFuture<String>> decoratedFutures = list.stream()
            .map(item -> CompletableFuture.supplyAsync(() -> {
                try {
                    SimpleHttpClient.VertxResponse futureResponse = SimpleHttpClient.get("http://vv.video.qq.com/checktime?otype=json", null, null);
                    System.out.println("response:" + Thread.currentThread().getName() + ":" + futureResponse.getBody());
                    return "0";
                } catch (Exception e) {
                    e.printStackTrace();
                    return "-1";
                }
            }, executor))
            .collect(Collectors.toList());
}



static class MyNamedThreadFactory implements ThreadFactory {
    private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = defaultFactory.newThread(r);
        thread.setName("MyNamedThread-" + thread.getId());
        return thread;
    }
}
}
通过对 Vertx 框架的简单封装,我们实现了一个高性能的 HTTP 请求工具类。它能够有效地处理大量并发请求,提高系统的吞吐量和响应速度。在实际应用中,我们可以根据具体需求对代码进行进一步优化和扩展,以满足不同场景下的网络通信需求。

文章地址: https://blog.golong.uk/archives/java/36.html

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、背景介绍
  • 三、代码实现
  • 四、使用方法
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档