前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一次数据库连接数被打满的优化方案

一次数据库连接数被打满的优化方案

作者头像
用户5271782
发布2024-11-14 19:59:35
210
发布2024-11-14 19:59:35

背景

在一次压测中,2000多个请求直接将云MySQL的连接数打爆了(MySQL配置的最大连接数为5000多,但实际上在压测中这台云数据库1800左右已经是极限)。这导致请求获取连接超时且无法回收,配置的连接超时时间为3秒,还涉及到了RPC远程调用消费能力不足,导致RPC的线程池被打满,系统已经不可用,返回时间最长达几十秒。

问题定位

通过链路追踪、观察服务器性能和错误日志,我们发现主要瓶颈在于某个服务的MySQL数据库连接数被打满,新的请求无法获取连接,导致连接超时和事务超时错误日志频发。云数据库的CPU利用率并不高,主要问题是连接数被打满,有两块业务涉及到的写入场景较多。

解决方案

为了解决这个问题,在不考虑接入新的中间件,也不考虑扩容的情况下,‘我们主要从以下几个方面进行了优化: 业务流程优化:允许在某些情况下出现脏数据,对于失败的结果采用不同的处理方案。 数据存储策略:成功的数据实时落库,失败的数据使用JVM队列并通过定时器批量处理。 连接管理和超时时间调整:设置较小的连接时间和事务超时时间。 PRC线程池参数优化,远程调用显式设置timeout时间参数。 后期规划:分库,限流,MQ消峰,优化远程调用的连接数。 具体来说,我们将失败的数据推送到队列中作为生产者,并使用一个单线程定时器(每隔0.5秒)作为消费者来批量处理队列中的数据。每次批处理的数量是可配置的,可在Nacos上进行动态配置。经过批处理数据库的CPU的利用率也起来了。

优化效果

经过优化后,连接数明显降低,系统稳定性得到了显著提升。同时,我们也对类似的业务线进行了同样的优化,并进行了简单的封装。 以下批处理代码示例只提供了大概流程说明,未有动态参数配置,未达到到最低消费数不load数据的操作,将队列给Spring容器管理等这些都细节。 以下是批处理代码的一个简单的示例: mysql支持批处理的配置: url = jdbc:mysql://localhost:3306/db?rewriteBatchedStatements=true 如果你在使用 MySQL 5.7 或更高版本,并且使用的是较新的 JDBC 驱动(如 5.x 版本),那么通常不需要显式加上这个参数。 如果你在使用较旧的 MySQL 版本或 JDBC 驱动,或者遇到了性能问题,可能需要显式加上这个参数 rewriteBatchedStatements=true

代码语言:javascript
复制
public class DataProcessor {

    //示例队列
    public static volatile BlockingQueue<Product> dataQueue = new LinkedBlockingQueue<>(30000);

    public static void pushData(Product product) {
       boolean offered=  dataQueue.offer(product);
       if(!offered){
           throw new IllegalStateException("队列已满");
       }
    }
}
代码语言:javascript
复制
生产者:失败的数据
public class DataProducer  {
    static AtomicLong atomicLong = new AtomicLong(1);
    public static void main(String[] args) {
        // 启动单线程池
        start(args);
    }

    public static void start(String[] args) {
        // 启动单线程池
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
         //模拟每秒业务失败的数据
        scheduler.scheduleAtFixedRate(() -> {
            Random random = new Random();
            IntStream.range(1,random.nextInt(10000)).forEach(value -> {
                try {
                    Product product = generateRandomProduct(random);
                    DataProcessor.pushData(product);
                } catch (IllegalStateException e) {
                    System.err.println("Queue is full: " + e.getMessage());
                }
            });
        }, 0, 500, TimeUnit.MILLISECONDS);
    }

    private static Product generateRandomProduct(Random random) {
        Product product = new Product();
        product.setId(atomicLong.incrementAndGet());
        product.setProductName("Product_" + random.nextInt(1000));
        product.setProductDescription("Description_" + random.nextInt(1000));
        product.setImageUrl("https://example.com/image_" + random.nextInt(100) + ".jpg");
        product.setPrice(BigDecimal.valueOf(random.nextDouble() * 1000).setScale(2, BigDecimal.ROUND_HALF_UP));
        product.setReleaseDate(LocalDate.now().minusDays(random.nextInt(365)));
        product.setManufacturer("Manufacturer_" + random.nextInt(100));
        product.setCategory("Category_" + random.nextInt(100));
        product.setSupplier("Supplier_" + random.nextInt(100));
        product.setModelNumber("Model_" + random.nextInt(100));
        product.setColor("Color_" + random.nextInt(100));
        product.setWeight("Weight_" + random.nextInt(100));
        product.setDimensions("Dimensions_" + random.nextInt(100));
        product.setWarrantyPeriod("Warranty_" + random.nextInt(100));
        product.setShippingDetails("Shipping_" + random.nextInt(100));
        product.setCountryOfOrigin("Country_" + random.nextInt(100));
        product.setMaterial("Material_" + random.nextInt(100));
        product.setProductionMethod("Production_" + random.nextInt(100));
        product.setPackagingDetails("Packaging_" + random.nextInt(100));
        product.setAdditionalInfo("Additional_" + random.nextInt(100));
        product.setNotes("Notes_" + random.nextInt(100));
        product.setTechnicalSpecs("TechSpecs_" + random.nextInt(100));
        product.setUsageInstructions("Usage_" + random.nextInt(100));
        return product;
    }
}
代码语言:javascript
复制
public class DataConsumer {
    //批处理数据
    private static volatile int batchSize = 10000;
    // 构建类型到sql方法的映射
    private static Map<Class<?>, ParameterConsumer<PreparedStatement, Integer, Object>> setterMethods;
    // 构建类型到sql方法的映射
    static {
        setterMethods =  new ParameterSetter().getSetterMethods();
    }
    //缓存中的获取PreparedStatementTable
     private static Map<Class<?>, PreparedStatementTable> preparedStatementTables = new ConcurrentHashMap<>();
    public static void startConsumer(String[] args) {
        // 启动单线程池x
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        // 定时获取数据将jvm中的业务失败数据落库
        scheduler.scheduleAtFixedRate(() -> {
            batchInsert(Product.class);
        }, 0, 500, TimeUnit.MILLISECONDS);
    }


  

    /**
     * @Description 批处理数据
     * @param:
     * @return:
     * @date: 2024/8/31 03:06
     * @author xjg
     */
    private static void batchInsert(Class<?> clz) {
        Queue<Object> objectQueues = new LinkedBlockingQueue<>(batchSize);
        System.out.println("队列数量:" + DataProcessor.dataQueue.size());
        if (DataProcessor.dataQueue.size() < (batchSize / 2)) {
            System.out.println("队列中的数据或者不足以新增数据!");
            return;
        }
        int pollSize = DataProcessor.dataQueue.drainTo(objectQueues, batchSize);
        long startTime = System.currentTimeMillis();  // 记录开始时间
        if (pollSize > 0) {
            try (Connection connection = DatabaseConnection.createDataSource().getConnection()) {
                //connection.setAutoCommit(false);  // 开启事务
                String sql = "INSERT INTO  %s  (%s) VALUES (%s) ";
                PreparedStatementTable preparedStatementTable = getFieldTable(clz);
                sql = String.format(sql, preparedStatementTable.getTableName(), preparedStatementTable.getFieldTable(), preparedStatementTable.getSegmentation());
                Map<Field, Class<?>> filedTypeMaps = preparedStatementTable.getFieldTableType();
                List<Field> sortedFields = preparedStatementTable.getFields();
                PreparedStatement statement = connection.prepareStatement(sql);
                for (Object object : objectQueues) {
                    AtomicInteger atomicInteger = new AtomicInteger(0);
                    for (Field sortedField : sortedFields) {
                        Class<?> fieldType = filedTypeMaps.get(sortedField);
                        sortedField.setAccessible(true);  // 设置私有字段可访问
                        Object fieldValue = sortedField.get(object); // 获取字段的实际值
                        ParameterConsumer<PreparedStatement, Integer, Object> setterMethod = setterMethods.get(fieldType);
                        if (setterMethod != null) {
                            setterMethod.accept(statement, atomicInteger.incrementAndGet(), fieldValue);
                        } else {
                            throw new IllegalArgumentException("不支持的类型 " + fieldType);
                        }
                    }
                    statement.addBatch();
                }
                statement.executeBatch();
                // connection.commit();  // 提交事务
                long endTime = System.currentTimeMillis();  // 记录结束时间
                long elapsedTime = endTime - startTime;  // 计算执行时间
                System.out.println("批处理" + pollSize + "条数据,耗时:" + elapsedTime + " ms");
            } catch (SQLException | IllegalAccessException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("暂时没有可以消耗的");
        }
    }
代码语言:javascript
复制
启动类
public class StartProcess {
    public static void main(String[] args) {
        //生产者
        DataProducer.start(args);
        //消费者
        DataConsumer.startConsumer(args);
    }
}
代码语言:javascript
复制
 public static PreparedStatementTable getFieldTable(Class<?> clz) {
    if (preparedStatementTables.containsKey(clz)) {
        return preparedStatementTables.get(clz);
    }
    PreparedStatementTable preparedStatementTable = new PreparedStatementTable();
    Annotation tableNameAnnotation = clz.getAnnotation(BatchTableName.class);
    String tableName = "";
     if (Objects.nonNull(tableNameAnnotation)) {
          tableName = ((BatchTableName) tableNameAnnotation).TableName();
     } else {
         tableName = clz.getSimpleName();
     }
    preparedStatementTable.setTableName(tableName);
    //获取我们的字段
    Field[] fields = clz.getDeclaredFields();
    String fieldTable = "";
    String segmentation = "";
    List<Field> sortField = new ArrayList<>(35);
    for (Field field : fields) {
        Annotation annotation = field.getAnnotation(Batch.class);
        String tableField = "";
        if (Objects.nonNull(annotation)) {
            tableField = ((Batch)annotation).TableField();
        } else {
            tableField = field.getName();
        }
        sortField.add(field);
        preparedStatementTable.getFieldTableType().put(field, field.getType());
        fieldTable += tableField + ",";
        segmentation += "?,";
    }
    preparedStatementTable.setFieldTable(fieldTable.substring(0, fieldTable.length() - 1));
    preparedStatementTable.setSegmentation(segmentation.substring(0, segmentation.length() - 1));
    preparedStatementTable.setFields(sortField);
    //放入缓存
    preparedStatementTables.put(clz, preparedStatementTable);
    return preparedStatementTable;
}
代码语言:javascript
复制
/**
 * @Description  泛型T为PreparedStatement接口,U是批处理参数index,V为对象值
 * @param:
 * @return:
 * @date: 2024/8/31 14:17
 * @author xjg
 */
@FunctionalInterface
interface ParameterConsumer<T, U, V> {
    void accept(T t, U u, V v) throws SQLException;
}

public class ParameterSetter {

    private static final Map<Class<?>, ParameterConsumer<PreparedStatement, Integer, Object>> setterMethods = new HashMap<>();

    public ParameterSetter() {
        setterMethods.put(String.class, this::setString);
        setterMethods.put(Integer.class, this::setInt);
        setterMethods.put(int.class, this::setInt);
        setterMethods.put(Long.class, this::setLong);
        setterMethods.put(long.class, this::setLong);
        setterMethods.put(Double.class, this::setDouble);
        setterMethods.put(double.class, this::setDouble);
        setterMethods.put(Float.class, this::setFloat);
        setterMethods.put(float.class, this::setFloat);
        setterMethods.put(BigDecimal.class, this::setBigDecimal);
        setterMethods.put(LocalDate.class, this::setDate);
        setterMethods.put(LocalDateTime.class, this::setTimestamp);
        setterMethods.put(Boolean.class, this::setBoolean);
        setterMethods.put(boolean.class, this::setBoolean);
                System.out.println("初始化我们的类型对应映射的preparedStatement接口方法!");
    }

    public  Map<Class<?>, ParameterConsumer<PreparedStatement, Integer, Object>> getSetterMethods() {
        return setterMethods;
    }

    private void setString(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
        if (obj == null) {
            preparedStatement.setNull(index, Types.VARCHAR);
        } else if (obj instanceof String) {
            preparedStatement.setString(index, (String) obj);
        } else {
            throw new IllegalArgumentException("不支持的类型 " + obj.getClass().getName());
        }
    }

    private void setInt(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
        if (obj == null) {
            preparedStatement.setNull(index, Types.INTEGER);
        } else if (obj instanceof Integer) {
            preparedStatement.setInt(index, (Integer) obj);
        } else if (obj instanceof int[]) {
            preparedStatement.setInt(index, (int) obj);
        } else {
            throw new IllegalArgumentException("不支持的类型 " + obj.getClass().getName());
        }
    }

    private void setLong(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
        if (obj == null) {
            preparedStatement.setNull(index, Types.BIGINT);
        } else if (obj instanceof Long) {
            preparedStatement.setLong(index, (Long) obj);
        } else if (obj instanceof long[]) {
            preparedStatement.setLong(index, (long) obj);
        } else {
            throw new IllegalArgumentException("不支持的类型 " + obj.getClass().getName());
        }
    }

    private void setDouble(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
        if (obj == null) {
            preparedStatement.setNull(index, Types.DOUBLE);
        } else if (obj instanceof Double) {
            preparedStatement.setDouble(index, (Double) obj);
        } else if (obj instanceof double[]) {
            preparedStatement.setDouble(index, (double) obj);
        } else {
            throw new IllegalArgumentException("不支持的类型 " + obj.getClass().getName());
        }
    }

    private void setFloat(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
        if (obj == null) {
            preparedStatement.setNull(index, Types.FLOAT);
        } else if (obj instanceof Float) {
            preparedStatement.setFloat(index, (Float) obj);
        } else if (obj instanceof float[]) {
            preparedStatement.setFloat(index, (float) obj);
        } else {
            throw new IllegalArgumentException("不支持的类型 " + obj.getClass().getName());
        }
    }

    private void setBigDecimal(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
        if (obj == null) {
            preparedStatement.setNull(index, Types.DECIMAL);
        } else if (obj instanceof BigDecimal) {
            preparedStatement.setBigDecimal(index, (BigDecimal) obj);
        } else {
            throw new IllegalArgumentException("不支持的类型 " + obj.getClass().getName());
        }
    }

    private void setDate(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
        if (obj == null) {
            preparedStatement.setNull(index, Types.DATE);
        } else if (obj instanceof LocalDate) {
            preparedStatement.setDate(index, Date.valueOf((LocalDate) obj));
        } else {
            throw new IllegalArgumentException("不支持的类型 " + obj.getClass().getName());
        }
    }

    private void setTimestamp(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
        if (obj == null) {
            preparedStatement.setNull(index, Types.TIMESTAMP);
        } else if (obj instanceof LocalDateTime) {
            preparedStatement.setTimestamp(index, Timestamp.valueOf((LocalDateTime) obj));
        } else {
            throw new IllegalArgumentException("不支持的类型 " + obj.getClass().getName());
        }
    }

    private void setBoolean(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
        if (obj == null) {
            preparedStatement.setNull(index, Types.BOOLEAN);
        } else if (obj instanceof Boolean) {
            preparedStatement.setBoolean(index, (Boolean) obj);
        } else if (obj instanceof boolean[]) {
            preparedStatement.setBoolean(index, (boolean) obj);
        } else {
            throw new IllegalArgumentException(" 不支持的类型" + obj.getClass().getName());
        }
    }

    // 默认处理方法
    private void defaultSet(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
        throw new IllegalArgumentException("不支持的类型: " + obj.getClass().getName());
    }

    public void setParameter(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
        if (obj == null) {
            preparedStatement.setNull(index, Types.OTHER);
        } else {
            Class<?> clazz = obj.getClass();
            ParameterConsumer<PreparedStatement, Integer, Object> setter = setterMethods.get(clazz);
            if (setter != null) {
                setter.accept(preparedStatement, index, obj);
            } else {
                defaultSet(preparedStatement, index, obj);
            }
        }
    }
}
public class PreparedStatementTable {
    private Field tableId;
    private String tableName;
    private String fieldTable;
    private String segmentation;
    private Map<Field, Class<?>> fieldTableType = new HashMap<>();
    private List<Field> fields;
    }
    ```
    
/**
 * 表字段-批处理使用的,如果是mybatisplus 可以用它的注解也是可以的,如果每个字段的属性都是遵循标准的可以不用注解。
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface BatchTableField {
    String value();
}```
/**
 * 表名-批处理使用的
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface BatchTableName {
    String value();
}
代码语言:java
复制
产品示例
@BatchTableName(TableName = "product")
public class Product {
    @Batch(TableField = "id")
    private Long id;
    @Batch(TableField = "product_name")
    private String productName;
    @Batch(TableField = "product_description")
    private String productDescription;
    @Batch(TableField = "image_url")
    private String imageUrl;
    @Batch(TableField = "price")
    private BigDecimal price;
    @Batch(TableField = "release_date")
    private LocalDate releaseDate;
    @Batch(TableField = "manufacturer")
    private String manufacturer;
    @Batch(TableField = "category")
    private String category;
    @Batch(TableField = "supplier")
    private String supplier;
    @Batch(TableField = "model_number")
    private String modelNumber;
    @Batch(TableField = "color")
    private String color;
    @Batch(TableField = "size")
    private String size;
    @Batch(TableField = "weight")
    private String weight;
    @Batch(TableField = "dimensions")
    private String dimensions;
    @Batch(TableField = "warranty_period")
    private String warrantyPeriod;
    @Batch(TableField = "shipping_details")
    private String shippingDetails;
    @Batch(TableField = "country_of_origin")
    private String countryOfOrigin;
    @Batch(TableField = "material")
    private String material;
    @Batch(TableField = "production_method")
    private String productionMethod;
    @Batch(TableField = "packaging_details")
    private String packagingDetails;
    @Batch(TableField = "additional_info")
    private String additionalInfo;
    @Batch(TableField = "notes")
    private String notes;
    @Batch(TableField = "technical_specs")
    private String technicalSpecs;
    @Batch(TableField = "usage_instructions")
    private String usageInstructions;
```}
真实数据跑起来处理耗时短,不用占用过多连接,适合对数据完整要求不高的处理办法。要求高的话建议 分库+MQ削峰 每个库的连接数毕竟是有限的。
2024-09-02 19:20:49 [pool-23-thread-4] INFO  c.h.o.m.s.d.b.c.BatchDataConsumer - class com.har.org.***.domain.***批处理4589条数据,耗时:13 ms
2024-09-02 19:20:52 [pool-23-thread-1] INFO  c.h.o.m.s.d.b.c.BatchDataConsumer - class com.har.org.***.domain.***批处理5000条数据,耗时:31 ms

文章地址: https://blog.golong.uk/archives/java/32.html

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 问题定位
  • 解决方案
  • 优化效果
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档