在一次压测中,2000多个请求直接将云MySQL的连接数打爆了(MySQL配置的最大连接数为5000多,但实际上在压测中这台云数据库1800左右已经是极限)。这导致请求获取连接超时且无法回收,配置的连接超时时间为3秒,还涉及到了RPC远程调用消费能力不足,导致RPC的线程池被打满,系统已经不可用,返回时间最长达几十秒。
通过链路追踪、观察服务器性能和错误日志,我们发现主要瓶颈在于某个服务的MySQL数据库连接数被打满,新的请求无法获取连接,导致连接超时和事务超时错误日志频发。云数据库的CPU利用率并不高,主要问题是连接数被打满,有两块业务涉及到的写入场景较多。
为了解决这个问题,在不考虑接入新的中间件,也不考虑扩容的情况下,‘我们主要从以下几个方面进行了优化: 业务流程优化:允许在某些情况下出现脏数据,对于失败的结果采用不同的处理方案。 数据存储策略:成功的数据实时落库,失败的数据使用JVM队列并通过定时器批量处理。 连接管理和超时时间调整:设置较小的连接时间和事务超时时间。 PRC线程池参数优化,远程调用显式设置timeout时间参数。 后期规划:分库,限流,MQ消峰,优化远程调用的连接数。 具体来说,我们将失败的数据推送到队列中作为生产者,并使用一个单线程定时器(每隔0.5秒)作为消费者来批量处理队列中的数据。每次批处理的数量是可配置的,可在Nacos上进行动态配置。经过批处理数据库的CPU的利用率也起来了。
经过优化后,连接数明显降低,系统稳定性得到了显著提升。同时,我们也对类似的业务线进行了同样的优化,并进行了简单的封装。 以下批处理代码示例只提供了大概流程说明,未有动态参数配置,未达到到最低消费数不load数据的操作,将队列给Spring容器管理等这些都细节。 以下是批处理代码的一个简单的示例: mysql支持批处理的配置: url = jdbc:mysql://localhost:3306/db?rewriteBatchedStatements=true 如果你在使用 MySQL 5.7 或更高版本,并且使用的是较新的 JDBC 驱动(如 5.x 版本),那么通常不需要显式加上这个参数。 如果你在使用较旧的 MySQL 版本或 JDBC 驱动,或者遇到了性能问题,可能需要显式加上这个参数 rewriteBatchedStatements=true
public class DataProcessor {
//示例队列
public static volatile BlockingQueue<Product> dataQueue = new LinkedBlockingQueue<>(30000);
public static void pushData(Product product) {
boolean offered= dataQueue.offer(product);
if(!offered){
throw new IllegalStateException("队列已满");
}
}
}
生产者:失败的数据
public class DataProducer {
static AtomicLong atomicLong = new AtomicLong(1);
public static void main(String[] args) {
// 启动单线程池
start(args);
}
public static void start(String[] args) {
// 启动单线程池
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
//模拟每秒业务失败的数据
scheduler.scheduleAtFixedRate(() -> {
Random random = new Random();
IntStream.range(1,random.nextInt(10000)).forEach(value -> {
try {
Product product = generateRandomProduct(random);
DataProcessor.pushData(product);
} catch (IllegalStateException e) {
System.err.println("Queue is full: " + e.getMessage());
}
});
}, 0, 500, TimeUnit.MILLISECONDS);
}
private static Product generateRandomProduct(Random random) {
Product product = new Product();
product.setId(atomicLong.incrementAndGet());
product.setProductName("Product_" + random.nextInt(1000));
product.setProductDescription("Description_" + random.nextInt(1000));
product.setImageUrl("https://example.com/image_" + random.nextInt(100) + ".jpg");
product.setPrice(BigDecimal.valueOf(random.nextDouble() * 1000).setScale(2, BigDecimal.ROUND_HALF_UP));
product.setReleaseDate(LocalDate.now().minusDays(random.nextInt(365)));
product.setManufacturer("Manufacturer_" + random.nextInt(100));
product.setCategory("Category_" + random.nextInt(100));
product.setSupplier("Supplier_" + random.nextInt(100));
product.setModelNumber("Model_" + random.nextInt(100));
product.setColor("Color_" + random.nextInt(100));
product.setWeight("Weight_" + random.nextInt(100));
product.setDimensions("Dimensions_" + random.nextInt(100));
product.setWarrantyPeriod("Warranty_" + random.nextInt(100));
product.setShippingDetails("Shipping_" + random.nextInt(100));
product.setCountryOfOrigin("Country_" + random.nextInt(100));
product.setMaterial("Material_" + random.nextInt(100));
product.setProductionMethod("Production_" + random.nextInt(100));
product.setPackagingDetails("Packaging_" + random.nextInt(100));
product.setAdditionalInfo("Additional_" + random.nextInt(100));
product.setNotes("Notes_" + random.nextInt(100));
product.setTechnicalSpecs("TechSpecs_" + random.nextInt(100));
product.setUsageInstructions("Usage_" + random.nextInt(100));
return product;
}
}
public class DataConsumer {
//批处理数据
private static volatile int batchSize = 10000;
// 构建类型到sql方法的映射
private static Map<Class<?>, ParameterConsumer<PreparedStatement, Integer, Object>> setterMethods;
// 构建类型到sql方法的映射
static {
setterMethods = new ParameterSetter().getSetterMethods();
}
//缓存中的获取PreparedStatementTable
private static Map<Class<?>, PreparedStatementTable> preparedStatementTables = new ConcurrentHashMap<>();
public static void startConsumer(String[] args) {
// 启动单线程池x
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
// 定时获取数据将jvm中的业务失败数据落库
scheduler.scheduleAtFixedRate(() -> {
batchInsert(Product.class);
}, 0, 500, TimeUnit.MILLISECONDS);
}
/**
* @Description 批处理数据
* @param:
* @return:
* @date: 2024/8/31 03:06
* @author xjg
*/
private static void batchInsert(Class<?> clz) {
Queue<Object> objectQueues = new LinkedBlockingQueue<>(batchSize);
System.out.println("队列数量:" + DataProcessor.dataQueue.size());
if (DataProcessor.dataQueue.size() < (batchSize / 2)) {
System.out.println("队列中的数据或者不足以新增数据!");
return;
}
int pollSize = DataProcessor.dataQueue.drainTo(objectQueues, batchSize);
long startTime = System.currentTimeMillis(); // 记录开始时间
if (pollSize > 0) {
try (Connection connection = DatabaseConnection.createDataSource().getConnection()) {
//connection.setAutoCommit(false); // 开启事务
String sql = "INSERT INTO %s (%s) VALUES (%s) ";
PreparedStatementTable preparedStatementTable = getFieldTable(clz);
sql = String.format(sql, preparedStatementTable.getTableName(), preparedStatementTable.getFieldTable(), preparedStatementTable.getSegmentation());
Map<Field, Class<?>> filedTypeMaps = preparedStatementTable.getFieldTableType();
List<Field> sortedFields = preparedStatementTable.getFields();
PreparedStatement statement = connection.prepareStatement(sql);
for (Object object : objectQueues) {
AtomicInteger atomicInteger = new AtomicInteger(0);
for (Field sortedField : sortedFields) {
Class<?> fieldType = filedTypeMaps.get(sortedField);
sortedField.setAccessible(true); // 设置私有字段可访问
Object fieldValue = sortedField.get(object); // 获取字段的实际值
ParameterConsumer<PreparedStatement, Integer, Object> setterMethod = setterMethods.get(fieldType);
if (setterMethod != null) {
setterMethod.accept(statement, atomicInteger.incrementAndGet(), fieldValue);
} else {
throw new IllegalArgumentException("不支持的类型 " + fieldType);
}
}
statement.addBatch();
}
statement.executeBatch();
// connection.commit(); // 提交事务
long endTime = System.currentTimeMillis(); // 记录结束时间
long elapsedTime = endTime - startTime; // 计算执行时间
System.out.println("批处理" + pollSize + "条数据,耗时:" + elapsedTime + " ms");
} catch (SQLException | IllegalAccessException e) {
e.printStackTrace();
}
} else {
System.out.println("暂时没有可以消耗的");
}
}
启动类
public class StartProcess {
public static void main(String[] args) {
//生产者
DataProducer.start(args);
//消费者
DataConsumer.startConsumer(args);
}
}
public static PreparedStatementTable getFieldTable(Class<?> clz) {
if (preparedStatementTables.containsKey(clz)) {
return preparedStatementTables.get(clz);
}
PreparedStatementTable preparedStatementTable = new PreparedStatementTable();
Annotation tableNameAnnotation = clz.getAnnotation(BatchTableName.class);
String tableName = "";
if (Objects.nonNull(tableNameAnnotation)) {
tableName = ((BatchTableName) tableNameAnnotation).TableName();
} else {
tableName = clz.getSimpleName();
}
preparedStatementTable.setTableName(tableName);
//获取我们的字段
Field[] fields = clz.getDeclaredFields();
String fieldTable = "";
String segmentation = "";
List<Field> sortField = new ArrayList<>(35);
for (Field field : fields) {
Annotation annotation = field.getAnnotation(Batch.class);
String tableField = "";
if (Objects.nonNull(annotation)) {
tableField = ((Batch)annotation).TableField();
} else {
tableField = field.getName();
}
sortField.add(field);
preparedStatementTable.getFieldTableType().put(field, field.getType());
fieldTable += tableField + ",";
segmentation += "?,";
}
preparedStatementTable.setFieldTable(fieldTable.substring(0, fieldTable.length() - 1));
preparedStatementTable.setSegmentation(segmentation.substring(0, segmentation.length() - 1));
preparedStatementTable.setFields(sortField);
//放入缓存
preparedStatementTables.put(clz, preparedStatementTable);
return preparedStatementTable;
}
/**
* @Description 泛型T为PreparedStatement接口,U是批处理参数index,V为对象值
* @param:
* @return:
* @date: 2024/8/31 14:17
* @author xjg
*/
@FunctionalInterface
interface ParameterConsumer<T, U, V> {
void accept(T t, U u, V v) throws SQLException;
}
public class ParameterSetter {
private static final Map<Class<?>, ParameterConsumer<PreparedStatement, Integer, Object>> setterMethods = new HashMap<>();
public ParameterSetter() {
setterMethods.put(String.class, this::setString);
setterMethods.put(Integer.class, this::setInt);
setterMethods.put(int.class, this::setInt);
setterMethods.put(Long.class, this::setLong);
setterMethods.put(long.class, this::setLong);
setterMethods.put(Double.class, this::setDouble);
setterMethods.put(double.class, this::setDouble);
setterMethods.put(Float.class, this::setFloat);
setterMethods.put(float.class, this::setFloat);
setterMethods.put(BigDecimal.class, this::setBigDecimal);
setterMethods.put(LocalDate.class, this::setDate);
setterMethods.put(LocalDateTime.class, this::setTimestamp);
setterMethods.put(Boolean.class, this::setBoolean);
setterMethods.put(boolean.class, this::setBoolean);
System.out.println("初始化我们的类型对应映射的preparedStatement接口方法!");
}
public Map<Class<?>, ParameterConsumer<PreparedStatement, Integer, Object>> getSetterMethods() {
return setterMethods;
}
private void setString(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
if (obj == null) {
preparedStatement.setNull(index, Types.VARCHAR);
} else if (obj instanceof String) {
preparedStatement.setString(index, (String) obj);
} else {
throw new IllegalArgumentException("不支持的类型 " + obj.getClass().getName());
}
}
private void setInt(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
if (obj == null) {
preparedStatement.setNull(index, Types.INTEGER);
} else if (obj instanceof Integer) {
preparedStatement.setInt(index, (Integer) obj);
} else if (obj instanceof int[]) {
preparedStatement.setInt(index, (int) obj);
} else {
throw new IllegalArgumentException("不支持的类型 " + obj.getClass().getName());
}
}
private void setLong(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
if (obj == null) {
preparedStatement.setNull(index, Types.BIGINT);
} else if (obj instanceof Long) {
preparedStatement.setLong(index, (Long) obj);
} else if (obj instanceof long[]) {
preparedStatement.setLong(index, (long) obj);
} else {
throw new IllegalArgumentException("不支持的类型 " + obj.getClass().getName());
}
}
private void setDouble(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
if (obj == null) {
preparedStatement.setNull(index, Types.DOUBLE);
} else if (obj instanceof Double) {
preparedStatement.setDouble(index, (Double) obj);
} else if (obj instanceof double[]) {
preparedStatement.setDouble(index, (double) obj);
} else {
throw new IllegalArgumentException("不支持的类型 " + obj.getClass().getName());
}
}
private void setFloat(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
if (obj == null) {
preparedStatement.setNull(index, Types.FLOAT);
} else if (obj instanceof Float) {
preparedStatement.setFloat(index, (Float) obj);
} else if (obj instanceof float[]) {
preparedStatement.setFloat(index, (float) obj);
} else {
throw new IllegalArgumentException("不支持的类型 " + obj.getClass().getName());
}
}
private void setBigDecimal(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
if (obj == null) {
preparedStatement.setNull(index, Types.DECIMAL);
} else if (obj instanceof BigDecimal) {
preparedStatement.setBigDecimal(index, (BigDecimal) obj);
} else {
throw new IllegalArgumentException("不支持的类型 " + obj.getClass().getName());
}
}
private void setDate(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
if (obj == null) {
preparedStatement.setNull(index, Types.DATE);
} else if (obj instanceof LocalDate) {
preparedStatement.setDate(index, Date.valueOf((LocalDate) obj));
} else {
throw new IllegalArgumentException("不支持的类型 " + obj.getClass().getName());
}
}
private void setTimestamp(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
if (obj == null) {
preparedStatement.setNull(index, Types.TIMESTAMP);
} else if (obj instanceof LocalDateTime) {
preparedStatement.setTimestamp(index, Timestamp.valueOf((LocalDateTime) obj));
} else {
throw new IllegalArgumentException("不支持的类型 " + obj.getClass().getName());
}
}
private void setBoolean(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
if (obj == null) {
preparedStatement.setNull(index, Types.BOOLEAN);
} else if (obj instanceof Boolean) {
preparedStatement.setBoolean(index, (Boolean) obj);
} else if (obj instanceof boolean[]) {
preparedStatement.setBoolean(index, (boolean) obj);
} else {
throw new IllegalArgumentException(" 不支持的类型" + obj.getClass().getName());
}
}
// 默认处理方法
private void defaultSet(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
throw new IllegalArgumentException("不支持的类型: " + obj.getClass().getName());
}
public void setParameter(PreparedStatement preparedStatement, int index, Object obj) throws SQLException {
if (obj == null) {
preparedStatement.setNull(index, Types.OTHER);
} else {
Class<?> clazz = obj.getClass();
ParameterConsumer<PreparedStatement, Integer, Object> setter = setterMethods.get(clazz);
if (setter != null) {
setter.accept(preparedStatement, index, obj);
} else {
defaultSet(preparedStatement, index, obj);
}
}
}
}
public class PreparedStatementTable {
private Field tableId;
private String tableName;
private String fieldTable;
private String segmentation;
private Map<Field, Class<?>> fieldTableType = new HashMap<>();
private List<Field> fields;
}
```
/**
* 表字段-批处理使用的,如果是mybatisplus 可以用它的注解也是可以的,如果每个字段的属性都是遵循标准的可以不用注解。
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface BatchTableField {
String value();
}```
/**
* 表名-批处理使用的
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface BatchTableName {
String value();
}
产品示例
@BatchTableName(TableName = "product")
public class Product {
@Batch(TableField = "id")
private Long id;
@Batch(TableField = "product_name")
private String productName;
@Batch(TableField = "product_description")
private String productDescription;
@Batch(TableField = "image_url")
private String imageUrl;
@Batch(TableField = "price")
private BigDecimal price;
@Batch(TableField = "release_date")
private LocalDate releaseDate;
@Batch(TableField = "manufacturer")
private String manufacturer;
@Batch(TableField = "category")
private String category;
@Batch(TableField = "supplier")
private String supplier;
@Batch(TableField = "model_number")
private String modelNumber;
@Batch(TableField = "color")
private String color;
@Batch(TableField = "size")
private String size;
@Batch(TableField = "weight")
private String weight;
@Batch(TableField = "dimensions")
private String dimensions;
@Batch(TableField = "warranty_period")
private String warrantyPeriod;
@Batch(TableField = "shipping_details")
private String shippingDetails;
@Batch(TableField = "country_of_origin")
private String countryOfOrigin;
@Batch(TableField = "material")
private String material;
@Batch(TableField = "production_method")
private String productionMethod;
@Batch(TableField = "packaging_details")
private String packagingDetails;
@Batch(TableField = "additional_info")
private String additionalInfo;
@Batch(TableField = "notes")
private String notes;
@Batch(TableField = "technical_specs")
private String technicalSpecs;
@Batch(TableField = "usage_instructions")
private String usageInstructions;
```}
真实数据跑起来处理耗时短,不用占用过多连接,适合对数据完整要求不高的处理办法。要求高的话建议 分库+MQ削峰 每个库的连接数毕竟是有限的。
2024-09-02 19:20:49 [pool-23-thread-4] INFO c.h.o.m.s.d.b.c.BatchDataConsumer - class com.har.org.***.domain.***批处理4589条数据,耗时:13 ms
2024-09-02 19:20:52 [pool-23-thread-1] INFO c.h.o.m.s.d.b.c.BatchDataConsumer - class com.har.org.***.domain.***批处理5000条数据,耗时:31 ms
文章地址: https://blog.golong.uk/archives/java/32.html
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。