Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊PowerJob的AbstractSqlProcessor

聊聊PowerJob的AbstractSqlProcessor

原创
作者头像
code4it
发布于 2024-01-03 01:17:34
发布于 2024-01-03 01:17:34
15000
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下PowerJob的AbstractSqlProcessor

AbstractSqlProcessor

tech/powerjob/official/processors/impl/sql/AbstractSqlProcessor.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Slf4j
public abstract class AbstractSqlProcessor extends CommonBasicProcessor {

    /**
     * 默认超时时间
     */
    protected static final int DEFAULT_TIMEOUT = 60;
    /**
     * name => SQL validator
     * 注意 :
     * - 返回 true 表示验证通过
     * - 返回 false 表示 SQL 非法,将被拒绝执行
     */
    protected final Map<String, Predicate<String>> sqlValidatorMap = Maps.newConcurrentMap();
    /**
     * 自定义 SQL 解析器
     */
    protected SqlParser sqlParser;

    private static final Joiner JOINER = Joiner.on("|").useForNull("-");


    @Override
    public ProcessResult process0(TaskContext taskContext) {

        OmsLogger omsLogger = taskContext.getOmsLogger();
        // 解析参数
        SqlParams sqlParams = extractParams(taskContext);
        omsLogger.info("origin sql params: {}", JSON.toJSON(sqlParams));
        // 校验参数
        validateParams(sqlParams);

        StopWatch stopWatch = new StopWatch(this.getClass().getSimpleName());
        // 解析
        stopWatch.start("Parse SQL");
        if (sqlParser != null) {
            omsLogger.info("before parse sql: {}", sqlParams.getSql());
            String newSQL = sqlParser.parse(sqlParams.getSql(), taskContext);
            sqlParams.setSql(newSQL);
            omsLogger.info("after parse sql: {}", newSQL);
        }
        stopWatch.stop();

        // 校验 SQL
        stopWatch.start("Validate SQL");
        validateSql(sqlParams.getSql(), omsLogger);
        stopWatch.stop();

        // 执行
        stopWatch.start("Execute SQL");
        omsLogger.info("final sql params: {}", JSON.toJSON(sqlParams));
        executeSql(sqlParams, taskContext);
        stopWatch.stop();

        omsLogger.info(stopWatch.prettyPrint());
        String message = String.format("execute successfully, used time: %s millisecond", stopWatch.getTotalTimeMillis());
        return new ProcessResult(true, message);
    }

    abstract Connection getConnection(SqlParams sqlParams, TaskContext taskContext) throws SQLException;

    public void setSqlParser(SqlParser sqlParser) {
        this.sqlParser = sqlParser;
    }

    public void registerSqlValidator(String validatorName, Predicate<String> sqlValidator) {
        sqlValidatorMap.put(validatorName, sqlValidator);
        log.info("register sql validator({})' successfully.", validatorName);
    }

    //......
}    

AbstractSqlProcessor继承了CommonBasicProcessor,其process0先将入参解析为SqlParams,然后调用validateParams进行参数校验,针对sqlParser不为null的会通过sqlParser进行解析,接着通过validateSql校验sql,最后通过executeSql执行sql;它定义了getConnection抽象方法,提供了setSqlParser、registerSqlValidator方法

SqlParams

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    @Data
    public static class SqlParams {
        /**
         * 数据源名称
         */
        private String dataSourceName;
        /**
         * 需要执行的 SQL
         */
        private String sql;
        /**
         * 超时时间
         */
        private Integer timeout;
        /**
         * jdbc url
         * 具体格式可参考 https://www.baeldung.com/java-jdbc-url-format
         */
        private String jdbcUrl;
        /**
         * 是否展示 SQL 执行结果
         */
        private boolean showResult;
    }

SqlParams定义了dataSourceName、sql、timeout、jdbcUrl、showResult属性

validateSql

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    private void validateSql(String sql, OmsLogger omsLogger) {
        if (sqlValidatorMap.isEmpty()) {
            return;
        }
        for (Map.Entry<String, Predicate<String>> entry : sqlValidatorMap.entrySet()) {
            Predicate<String> validator = entry.getValue();
            if (!validator.test(sql)) {
                omsLogger.error("validate sql by validator[{}] failed, skip to process!", entry.getKey());
                throw new IllegalArgumentException("illegal sql, can't pass the validation of " + entry.getKey());
            }
        }
    }

validateSql遍历sqlValidatorMap,挨个执行test方法,验证不通过抛出IllegalArgumentException

executeSql

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    @SneakyThrows
    private void executeSql(SqlParams sqlParams, TaskContext ctx) {

        OmsLogger omsLogger = ctx.getOmsLogger();

        boolean originAutoCommitFlag ;
        try (Connection connection = getConnection(sqlParams, ctx)) {
            originAutoCommitFlag = connection.getAutoCommit();
            connection.setAutoCommit(false);
            try (Statement statement = connection.createStatement()) {
                statement.setQueryTimeout(sqlParams.getTimeout() == null ? DEFAULT_TIMEOUT : sqlParams.getTimeout());
                statement.execute(sqlParams.getSql());

                connection.commit();

                if (sqlParams.showResult) {
                    outputSqlResult(statement, omsLogger);
                }
            } catch (Throwable e) {
                omsLogger.error("execute sql failed, try to rollback", e);
                connection.rollback();
                throw e;
            } finally {
                connection.setAutoCommit(originAutoCommitFlag);
            }
        }
    }

executeSql通过getConnection获取连接,设置为手动提交,然后创建Statement,设置queryTimeout,执行,最后提交,针对showResult的执行outputSqlResult

outputSqlResult

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    private void outputSqlResult(Statement statement, OmsLogger omsLogger) throws SQLException {
        omsLogger.info("====== SQL EXECUTE RESULT ======");

        for (int index = 0; index < Integer.MAX_VALUE; index++) {

            // 某一个结果集
            ResultSet resultSet = statement.getResultSet();
            if (resultSet != null) {
                try (ResultSet rs = resultSet) {
                    int columnCount = rs.getMetaData().getColumnCount();
                    List<String> columnNames = Lists.newLinkedList();
                    //column – the first column is 1, the second is 2, ...
                    for (int i = 1; i <= columnCount; i++) {
                        columnNames.add(rs.getMetaData().getColumnName(i));
                    }
                    omsLogger.info("[Result-{}] [Columns] {}" + System.lineSeparator(), index, JOINER.join(columnNames));
                    int rowIndex = 0;
                    List<Object> row = Lists.newLinkedList();
                    while (rs.next()) {
                        for (int i = 1; i <= columnCount; i++) {
                            row.add(rs.getObject(i));
                        }
                        omsLogger.info("[Result-{}] [Row-{}] {}" + System.lineSeparator(), index, rowIndex++, JOINER.join(row));
                    }
                }
            } else {
                int updateCount = statement.getUpdateCount();
                if (updateCount != -1) {
                    omsLogger.info("[Result-{}] update count: {}", index, updateCount);
                }
            }
            if (((!statement.getMoreResults()) && (statement.getUpdateCount() == -1))) {
                break;
            }
        }
        omsLogger.info("====== SQL EXECUTE RESULT ======");
    }

outputSqlResult从statement获取resultSet,然后打印columnName,在打印每行数据,对于更新操作则打印updateCount

SqlParser

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    @FunctionalInterface
    public interface SqlParser {
        /**
         * 自定义 SQL 解析逻辑
         *
         * @param sql         原始 SQL 语句
         * @param taskContext 任务上下文
         * @return 解析后的 SQL
         */
        String parse(String sql, TaskContext taskContext);
    }

SqlParser接口定义了parse方法

DynamicDatasourceSqlProcessor

tech/powerjob/official/processors/impl/sql/DynamicDatasourceSqlProcessor.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class DynamicDatasourceSqlProcessor extends AbstractSqlProcessor {

    @Override
    protected void validateParams(SqlParams sqlParams) {
        if (StringUtils.isEmpty(sqlParams.getJdbcUrl())) {
            throw new IllegalArgumentException("jdbcUrl can't be empty in DynamicDatasourceSqlProcessor!");
        }
    }

    @Override
    Connection getConnection(SqlParams sqlParams, TaskContext taskContext) throws SQLException {

        JSONObject params = JSONObject.parseObject(CommonUtils.parseParams(taskContext));
        Properties properties = new Properties();

        // normally at least a "user" and "password" property should be included
        params.forEach((k, v) -> properties.setProperty(k, String.valueOf(v)));

        return DriverManager.getConnection(sqlParams.getJdbcUrl(), properties);
    }

    @Override
    protected String getSecurityDKey() {
        return SecurityUtils.ENABLE_DYNAMIC_SQL_PROCESSOR;
    }
}

DynamicDatasourceSqlProcessor继承了AbstractSqlProcessor,其validateParams要求jdbcUrl不能为空,其getConnection方法会从taskContext提取properties作为DriverManager.getConnection的属性,其getSecurityDKey返回的是powerjob.official-processor.dynamic-datasource.enable配置

SpringDatasourceSqlProcessor

tech/powerjob/official/processors/impl/sql/SpringDatasourceSqlProcessor.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Slf4j
public class SpringDatasourceSqlProcessor extends AbstractSqlProcessor {
    /**
     * 默认的数据源名称
     */
    private static final String DEFAULT_DATASOURCE_NAME = "default";
    /**
     * name => data source
     */
    private final Map<String, DataSource> dataSourceMap;

    /**
     * 指定默认的数据源
     *
     * @param defaultDataSource 默认数据源
     */
    public SpringDatasourceSqlProcessor(DataSource defaultDataSource) {
        dataSourceMap = Maps.newConcurrentMap();
        registerDataSource(DEFAULT_DATASOURCE_NAME, defaultDataSource);
    }

    @Override
    Connection getConnection(SqlParams sqlParams, TaskContext taskContext) throws SQLException {
        return dataSourceMap.get(sqlParams.getDataSourceName()).getConnection();
    }

    /**
     * 校验参数,如果校验不通过直接抛异常
     *
     * @param sqlParams SQL 参数信息
     */
    @Override
    protected void validateParams(SqlParams sqlParams) {
        // 检查数据源
        if (StringUtils.isEmpty(sqlParams.getDataSourceName())) {
            // use the default data source when current data source name is empty
            sqlParams.setDataSourceName(DEFAULT_DATASOURCE_NAME);
        }
        dataSourceMap.computeIfAbsent(sqlParams.getDataSourceName(), dataSourceName -> {
            throw new IllegalArgumentException("can't find data source with name " + dataSourceName);
        });
    }

    /**
     * 注册数据源
     *
     * @param dataSourceName 数据源名称
     * @param dataSource     数据源
     */
    public void registerDataSource(String dataSourceName, DataSource dataSource) {
        Objects.requireNonNull(dataSourceName, "DataSource name must not be null");
        Objects.requireNonNull(dataSource, "DataSource must not be null");
        dataSourceMap.put(dataSourceName, dataSource);
        log.info("register data source({})' successfully.", dataSourceName);
    }

    /**
     * 移除数据源
     *
     * @param dataSourceName 数据源名称
     */
    public void removeDataSource(String dataSourceName) {
        DataSource remove = dataSourceMap.remove(dataSourceName);
        if (remove != null) {
            log.warn("remove data source({})' successfully.", dataSourceName);
        }
    }
}

SpringDatasourceSqlProcessor继承了AbstractSqlProcessor,其构造器注册名为default的DataSource,其getConnection根据sqlParams的dataSourceName来获取连接,validateParams会先校验指定的dataSource是否存在;它提供了registerDataSource、removeDataSource方法

小结

AbstractSqlProcessor继承了CommonBasicProcessor,其process0先将入参解析为SqlParams,然后调用validateParams进行参数校验,针对sqlParser不为null的会通过sqlParser进行解析,接着通过validateSql校验sql,最后通过executeSql执行sql;它定义了getConnection抽象方法,提供了setSqlParser、registerSqlValidator方法。它有两个实现类分别是DynamicDatasourceSqlProcessor(通过jdbcUrl来构造连接)、SpringDatasourceSqlProcessor(通过给定的dataSource获取连接)。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
聊聊PowerJob的AbstractSqlProcessor
tech/powerjob/official/processors/impl/sql/AbstractSqlProcessor.java
code4it
2024/01/03
1530
聊聊PowerJob的AbstractSqlProcessor
聊聊PowerJob的AbstractScriptProcessor
tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java
code4it
2024/01/04
1100
聊聊PowerJob日志的上报及存储
tech/powerjob/worker/log/OmsLoggerFactory.java
code4it
2024/02/03
3410
聊聊powerjob的单机线程并发度
本文主要研究一下powerjob的单机线程并发度(threadConcurrency)
code4it
2024/03/12
1570
聊聊PowerJob的MapProcessor
tech/powerjob/worker/core/processor/sdk/MapProcessor.java
code4it
2024/01/24
2170
聊聊PowerJob的MapReduceProcessor
tech/powerjob/worker/core/processor/TaskResult.java
code4it
2024/01/26
1990
聊聊PowerJob的StoreStrategy
tech/powerjob/worker/common/constants/StoreStrategy.java
code4it
2023/12/29
1740
面试官问我:看过sharding-jdbc的源码吗?我吧啦吧啦说了一通!!
作者个人研发的在高并发场景下,提供的简单、稳定、可扩展的延迟消息队列框架,具有精准的定时任务和延迟队列处理功能。自开源半年多以来,已成功为十几家中小型企业提供了精准定时调度方案,经受住了生产环境的考验。为使更多童鞋受益,现给出开源框架地址:
冰河
2020/10/29
5060
面试官问我:看过sharding-jdbc的源码吗?我吧啦吧啦说了一通!!
数据库中间件 Sharding-JDBC 源码分析 —— JDBC实现与读写分离
本文主要基于 Sharding-JDBC 1.5.0 正式版 1. 概述 2. unspported 包 3. adapter 包 3.1 WrapperAdapter 3.2 AbstractDataSourceAdapter 3.3 AbstractConnectionAdapter 3.4 AbstractStatementAdapter 3.5 AbstractPreparedStatementAdapter 3.6 AbstractResultSetAdapter 4. 插入流程 5. 查询流程
芋道源码
2018/03/02
1.6K0
数据库中间件 Sharding-JDBC 源码分析 —— JDBC实现与读写分离
聊聊PowerJob的StoreStrategy
tech/powerjob/worker/common/constants/StoreStrategy.java
code4it
2024/01/03
1880
聊聊PowerJob的StoreStrategy
聊聊PowerJob的HttpProcessor
tech/powerjob/worker/core/processor/sdk/BasicProcessor.java
code4it
2024/01/01
2280
Sharding-Jdbc之读写分离导读 原
      Sharding-JDBC是一个开源的分布式数据库中间件,它无需额外部署和依赖,完全兼容JDBC和各种ORM框架。Sharding-JDBC作为面向开发的微服务云原生基础类库,完整的实现了分库分表、读写分离和分布式主键功能,并初步实现了柔性事务。
用户2603479
2018/08/16
1.4K0
数据源管理 | 基于JDBC模式,适配和管理动态数据源
不同厂商的关系型数据库,提供的链接方式,驱动包,驱动类名都是不一样的,Java数据库连接API,JDBC是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法,且适配大部分关系型数据库。
知了一笑
2020/04/10
1.4K0
数据源管理 | 基于JDBC模式,适配和管理动态数据源
聊聊nacos的MysqlHealthCheckProcessor
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/MysqlHealthCheckProcessor.java
code4it
2019/09/21
3760
聊聊nacos的MysqlHealthCheckProcessor
分布式任务调度:PowerJob 高级特性
PowerJob 的容器技术允许开发者开发独立于 Worker 项目之外 Java 处理器,简单来说,就是以 Maven 工程项目的维度去组织一堆 Java 文件(开发者开发的众多脚本处理器),进而兼具开发效率和可维护性。
Freedom123
2024/03/29
8580
分布式任务调度:PowerJob 高级特性
Sharding-Jdbc分库分表的导读
Sharding-JDBC是一个开源的分布式数据库中间件,它无需额外部署和依赖,完全兼容JDBC和各种ORM框架。Sharding-JDBC作为面向开发的微服务云原生基础类库,完整的实现了分库分表、读写分离和分布式主键功能,并初步实现了柔性事务。
用户2603479
2018/08/16
2.9K0
Sharding-Jdbc分库分表的导读
聊聊PowerJob的FileCleanupProcessor
tech/powerjob/worker/core/processor/sdk/BroadcastProcessor.java
code4it
2024/01/02
1630
聊聊PowerJob的HeavyTaskTracker的dispatchTask
本文主要研究一下PowerJob的HeavyTaskTracker的dispatchTask
code4it
2024/01/03
1850
聊聊PowerJob的HeavyTaskTracker的dispatchTask
sharding-jdbc4.0使用方式
Sharding-JDBC是ShardingSphere的第一个产品,也是ShardingSphere的前身。 它定位为轻量级Java框架,在Java的JDBC层提供的额外服务。它使用客户端直连数据库,以jar包形式提供服务,无需额外部署和依赖,可理解为增强版的JDBC驱动,完全兼容JDBC和各种ORM框架。
用户2603479
2019/12/10
2.9K0
聊聊powerjob的maxResultLength
powerjob-worker/src/main/java/tech/powerjob/worker/common/PowerJobWorkerConfig.java
code4it
2024/05/13
1130
聊聊powerjob的maxResultLength
相关推荐
聊聊PowerJob的AbstractSqlProcessor
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验