前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MyBatis源码阅读(八) --- Executor执行器

MyBatis源码阅读(八) --- Executor执行器

作者头像
终有救赎
发布2024-01-30 09:00:30
1920
发布2024-01-30 09:00:30
举报
文章被收录于专栏:多线程多线程
一、概述

Executor 是一个接口,包含更新,查询,事务等一系列方法。在前面分析SqlSession创建过程的时候,我们知道每个SqlSession对象都会有一个Executor对象,SqlSession的操作都会交由Executor执行器执行。

我们先看看Executor类的继承图:

图片.png
图片.png

Executor接口有两个实现,

  • 一个是BaseExecutor抽象类,BaseExecutor又有四个子类:
    • SimpleExecutor:简单类型的执行器,也是默认的执行器,每次执行update或者select操作,都会创建一个Statement对象,执行结束后关闭Statement对象;
    • ReuseExecutor:可重用的执行器,重用的是Statement对象,第一次执行一条sql,会将这条sql的Statement对象缓存在key-value结构的map缓存中。下一次执行,就可以从缓存中取出Statement对象,减少了重复编译的次数,从而提高了性能。每个SqlSession对象都有一个Executor对象,因此这个缓存是SqlSession级别的,所以当SqlSession销毁时,缓存也会销毁;
    • BatchExecutor:批量执行器,默认情况是每次执行一条sql,MyBatis都会发送一条sql。而批量执行器的操作是,每次执行一条sql,不会立马发送到数据库,而是批量一次性发送多条sql;
    • ClosedExecutor: ResultLoaderMap的内部类,用来进行处理懒加载相关功能;
  • 另外一个是CachingExecutor实现类,在缓存的时候用到,使用到了装饰者模式对executor进行二次包装,动态增强了executor的功能;

Executor 接口采用了模版方法的设计模式,定义了一些模版方法,交给子类去实现。定义的方法如下:

代码语言:javascript
复制
public interface Executor {
 
  ResultHandler NO_RESULT_HANDLER = null;
  // 更新
  int update(MappedStatement ms, Object parameter) throws SQLException;
 
  // 先查询缓存,在查询数据库
  <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey cacheKey, BoundSql boundSql) throws SQLException;
 
  // 查询
  <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException;
 
  // 返回游标对象
  <E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException;
 
  // 释放Statement
  List<BatchResult> flushStatements() throws SQLException;
 
  // 事务提交
  void commit(boolean required) throws SQLException;
 
  // 事务回滚
  void rollback(boolean required) throws SQLException;
 
  // 创建缓存的键值对
  CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql);
 
  // 缓存是否存在
  boolean isCached(MappedStatement ms, CacheKey key);
 
  // 清除一级缓存
  void clearLocalCache();
 
  void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType);
 
  // 获取事务对象
  Transaction getTransaction();
 
  void close(boolean forceRollback);
 
  boolean isClosed();
 
  void setExecutorWrapper(Executor executor);
 
}
二、Executor执行器的创建过程

在前面分析SqlSession的获取过程的时候,我们当时暂且先跳过了executor执行器的创建过程这一部分的分析。很显然,executor的创建就是在获取SqlSession的时候。

代码语言:javascript
复制
//org.apache.ibatis.session.defaults.DefaultSqlSessionFactory#openSessionFromDataSource
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
  //事务对象
  Transaction tx = null;
  try {
    //从configuration对象中获取到我们之前解析的environment环境信息
    final Environment environment = configuration.getEnvironment();
    //事务工厂,这里是JbdcTransactionFactory工厂类
    final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
    //通过事务工厂创建JbdcTransaction事务,传入数据源等信息
    tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
    //创建Executor执行器
    final Executor executor = configuration.newExecutor(tx, execType);
    //创建DefaultSqlSession会话,传入Configuration、Executor对象
    return new DefaultSqlSession(configuration, executor, autoCommit);
  } catch (Exception e) {
    closeTransaction(tx); // may have fetched a connection so lets call close()
    throw ExceptionFactory.wrapException("Error opening session.  Cause: " + e, e);
  } finally {
    ErrorContext.instance().reset();
  }
}
  • final Executor executor = configuration.newExecutor(tx, execType);

通过configuration对象创建executor执行器,传入前面创建好的事务tx,以及指定的执行器类型。

继续追踪一下newExecutor()的源码:

代码语言:javascript
复制
public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
  executorType = executorType == null ? defaultExecutorType : executorType;
  //没有配置的话,默认创建SimpleExecutor
  executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
  Executor executor;
  //第一步:根据ExecutorType来创建不同类型的执行器
  if (ExecutorType.BATCH == executorType) {
    executor = new BatchExecutor(this, transaction);
  } else if (ExecutorType.REUSE == executorType) {
    executor = new ReuseExecutor(this, transaction);
  } else {
    executor = new SimpleExecutor(this, transaction);
  }
  //第二步:如果开启了一级缓存,使用装饰者模式对executor二次包装成CachingExecutor
  if (cacheEnabled) {
    executor = new CachingExecutor(executor);
  }
  //这里使用了责任链设计模式,在插件篇幅里面会详细介绍
  //第三步:加载插件
  executor = (Executor) interceptorChain.pluginAll(executor);
  return executor;
}

我们可以看到,newExecutor()方法根据ExecutorType来创建不同类型的执行器,默认创建的是SimpleExecutor简单类型执行器。

接下来我们看一下各种执行器的构造方法:

代码语言:javascript
复制
public SimpleExecutor(Configuration configuration, Transaction transaction) {
  super(configuration, transaction);
}
public ReuseExecutor(Configuration configuration, Transaction transaction) {
  super(configuration, transaction);
}
public BatchExecutor(Configuration configuration, Transaction transaction) {
  super(configuration, transaction);
}

我们看到了super(xxx)方法,也就是说三种类型的执行器其实都是调用的父类BaseExecutor的构造方法来创建:

代码语言:javascript
复制
protected BaseExecutor(Configuration configuration, Transaction transaction) {
  //事务
  this.transaction = transaction;
  //延迟加载
  this.deferredLoads = new ConcurrentLinkedQueue<>();
  //一级缓存
  this.localCache = new PerpetualCache("LocalCache");
  this.localOutputParameterCache = new PerpetualCache("LocalOutputParameterCache");
  this.closed = false;
  //全局配置对象
  this.configuration = configuration;
  this.wrapper = this;
}
代码语言:javascript
复制
final Executor executor = configuration.newExecutor(tx, execType);
return new DefaultSqlSession(configuration, executor, autoCommit);

到这里,executor创建就完成了,同时每个SqlSession也就有了自己唯一的Executor对象,如上代码executor作为DefaultSqlSession的成员属性。至于executor是在哪里调用的,相信看过前几篇文章的小伙伴都应该很清楚了,就是在SqlSession执行query方法的时候,具体是交给executor去执行查询的,具体过程可以参照前面的文章,这里不过多赘述了。

三、SimpleExecutor

SimpleExecutor 是默认的执行器,也是最简单的执行器。我们来看看里面几个关键方法:

代码语言:javascript
复制
public class SimpleExecutor extends BaseExecutor {
 
  public SimpleExecutor(Configuration configuration, Transaction transaction) {
    //默认调用父类BaseExecutor的构造方法
    super(configuration, transaction);
  }
 
  @Override
  public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
    Statement stmt = null;
    try {
      //第一步:从MappedStatement中获取到configuration全局配置对象
      Configuration configuration = ms.getConfiguration();
      //第二步:通过configuration创建StatementHandler
      StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
      //第三步:创建Statement对象
      stmt = prepareStatement(handler, ms.getStatementLog());
      //第四步:执行SQL查询
      return handler.update(stmt);
    } finally {
      //第五步:关闭statement
      closeStatement(stmt);
    }
  }
 
  @Override
  public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
    Statement stmt = null;
    try {
      Configuration configuration = ms.getConfiguration();
      StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
      stmt = prepareStatement(handler, ms.getStatementLog());
      return handler.query(stmt, resultHandler);
    } finally {
      closeStatement(stmt);
    }
  }
 
  private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
    Statement stmt;
    //获取到connection连接
    Connection connection = getConnection(statementLog);
    //创建Statement 
    stmt = handler.prepare(connection, transaction.getTimeout());
    //将sql语句中的占位符替换成最终查询参数
    //最底层实现:typeHandler.setParameter(ps, i + 1, value, jdbcType);
    handler.parameterize(stmt);
    return stmt;
  }
 
}

总结一下SimpleExecutor的操作步骤:

  1. 第一步:从MappedStatement中获取到configuration全局配置对象;
  2. 第二步:通过configuration创建StatementHandler;
  3. 第三步:创建Statement对象;
  4. 第四步:执行SQL查询;
四、ReuseExecutor

ReuseExecutor,重用类型的执行器,它的作用是重复利用statement对象,避免频繁的创建。如果在一个SqlSession中多次执行一条sql,如果每次都去生成Statement对象,会造成资源浪费。因此ReuseExecutor在SimpleExecutor的基础上,对prepareStatement()方法进行了改进,将Statement对象缓存在内存中,并且免去了关闭Statement对象这一步。

代码语言:javascript
复制
public class ReuseExecutor extends BaseExecutor {
    
  //map结构,key是执行的SQL语句,value就是对应的statement对象
  //可以看到,在一个会话中,相同的sql语句对应的statement可以重复利用  
  private final Map<String, Statement> statementMap = new HashMap<>();
 
  public ReuseExecutor(Configuration configuration, Transaction transaction) {
    super(configuration, transaction);
  }
 
  @Override
  public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
    Configuration configuration = ms.getConfiguration();
    StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
    Statement stmt = prepareStatement(handler, ms.getStatementLog());
    return handler.update(stmt);
  }
 
  @Override
  public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
    //第一步:获取configuration对象
    Configuration configuration = ms.getConfiguration();
    //第二步:创建StatementHandler
    StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
    //第三步:创建Statement
    Statement stmt = prepareStatement(handler, ms.getStatementLog());
    //第四步:执行SQL查询
    return handler.query(stmt, resultHandler);
  }
 
  @Override
  protected <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException {
    Configuration configuration = ms.getConfiguration();
    StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, null, boundSql);
    Statement stmt = prepareStatement(handler, ms.getStatementLog());
    return handler.queryCursor(stmt);
  }
    
  private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
    Statement stmt;
    BoundSql boundSql = handler.getBoundSql();
    //获取到执行的SQL语句
    String sql = boundSql.getSql();
    if (hasStatementFor(sql)) {
      //如果缓存中存在该条sql对应的statement,则直接从缓存中取,不重新创建statement
      stmt = getStatement(sql);
      applyTransactionTimeout(stmt);
    } else {
      //如果缓存中不存在该条sql对应的statement
      Connection connection = getConnection(statementLog);
      stmt = handler.prepare(connection, transaction.getTimeout());
      //将创建好的statement放入缓存中,方便下次重复利用
      putStatement(sql, stmt);
    }
    handler.parameterize(stmt);
    return stmt;
  }
  
  //判断缓存map中是否存在sql的Statement对象
  private boolean hasStatementFor(String sql) {
    try {
      return statementMap.keySet().contains(sql) && !statementMap.get(sql).getConnection().isClosed();
    } catch (SQLException e) {
      return false;
    }
  }
 
  private Statement getStatement(String s) {
    return statementMap.get(s);
  }
   
  //以sql为key, Statement为value放到缓存中   
  private void putStatement(String sql, Statement stmt) {
    statementMap.put(sql, stmt);
  }
 
}

从上面可以看到,ReuseExecutor内存维护了一个map结构的缓存statementMap,以sql为key, Statement为value放到缓存中,保证在同一个会话中,如果重复执行两个相同的SQL,第一次创建完的statement,可以在第二次查询的时候重复利用,节省了一些资源。注意:因为每个SqlSession都有自己唯一的对应的Executor对象,因此这个statementMap缓存是SqlSession级别的,如果SqlSession销毁了,statementMap缓存也会将销毁。

五、BatchExecutor

BatchExecutor是批处理的执行器,批量的发送sql到数据库,而不是一个个的发送。

代码语言:javascript
复制
public class BatchExecutor extends BaseExecutor {
  // 批量更新处理的固定返回值,不是返回受影响的行数
  public static final int BATCH_UPDATE_RETURN_VALUE = Integer.MIN_VALUE + 1002;
  // Statement集合 
  private final List<Statement> statementList = new ArrayList<>();
  //批量结果的集合
  private final List<BatchResult> batchResultList = new ArrayList<>();
  //当前Sql语句
  private String currentSql;
 //当前的MappedStatement对象
 private MappedStatement currentStatement;
 
  public BatchExecutor(Configuration configuration, Transaction transaction) {
    //默认调用父类BaseExecutor的构造方法
    super(configuration, transaction);
  }
 
  @Override
  public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
    //第一步:获取configuration对象
    final Configuration configuration = ms.getConfiguration();
    //第二步:创建StatementHandler
    final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
    //第三步:获取到BoundSql,再拿到具体的sql
    final BoundSql boundSql = handler.getBoundSql();
    final String sql = boundSql.getSql();
    final Statement stmt;
    //如果当前执行的sql跟拿到的sql一致,并且MappedStatement也是同一个的话
    if (sql.equals(currentSql) && ms.equals(currentStatement)) {
      int last = statementList.size() - 1;
      stmt = statementList.get(last);
      applyTransactionTimeout(stmt);
      handler.parameterize(stmt);//fix Issues 322
      BatchResult batchResult = batchResultList.get(last);
      batchResult.addParameterObject(parameterObject);
    } else {
      Connection connection = getConnection(ms.getStatementLog());
      stmt = handler.prepare(connection, transaction.getTimeout());
      handler.parameterize(stmt);    //fix Issues 322
      //设置currentSql和currentStatement为当前sql、当前MappedStatement
      currentSql = sql;
      currentStatement = ms;
      //将statement添加到集合中
      statementList.add(stmt);
      batchResultList.add(new BatchResult(ms, sql, parameterObject));
    }
    //调用JDBC的addBatch()方法,添加到批处理中
    handler.batch(stmt);
    return BATCH_UPDATE_RETURN_VALUE;
  }
 
  @Override
  public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
    try {
      List<BatchResult> results = new ArrayList<>();
      // 回滚则直接返回空集合
      if (isRollback) {
        return Collections.emptyList();
      }
      //遍历statementList集合中的Statement,一条条执行,并将结果加入到结果集合中
      for (int i = 0, n = statementList.size(); i < n; i++) {
        Statement stmt = statementList.get(i);
        applyTransactionTimeout(stmt);
        BatchResult batchResult = batchResultList.get(i);
        try {
          //执行sql
          batchResult.setUpdateCounts(stmt.executeBatch());
          MappedStatement ms = batchResult.getMappedStatement();
          List<Object> parameterObjects = batchResult.getParameterObjects();
          KeyGenerator keyGenerator = ms.getKeyGenerator();
          if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
            Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator;
            jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
          } else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { //issue #141
            for (Object parameter : parameterObjects) {
              keyGenerator.processAfter(this, ms, stmt, parameter);
            }
          }
          // 关闭statement
          closeStatement(stmt);
        } catch (BatchUpdateException e) {
          StringBuilder message = new StringBuilder();
          message.append(batchResult.getMappedStatement().getId())
              .append(" (batch index #")
              .append(i + 1)
              .append(")")
              .append(" failed.");
          if (i > 0) {
            message.append(" ")
                .append(i)
                .append(" prior sub executor(s) completed successfully, but will be rolled back.");
          }
          throw new BatchExecutorException(message.toString(), e, results, batchResult);
        }
        //将结果加载到结果集合中
        results.add(batchResult);
      }
      return results;
    } finally {
      for (Statement stmt : statementList) {
        closeStatement(stmt);
      }
      currentSql = null;
      statementList.clear();
      batchResultList.clear();
    }
  }
 
}

BatchExecutor总结:

  • doUpdate()返回的值是固定的【Integer.MIN_VALUE + 1002】,不是影响的行数;
  • 如果连续提交相同的sql,则只会执行一次;
  • 提交sql不会立马执行,而是等到commit时候才统一执行;
  • 底层使用的是JDBC的批处理操作,addBatch()和executeBatch()操作;
六、CachingExecutor

在前面总结executor创建的时候,我们看见过CachingExecutor,它的作用其实就是,如果项目中开启了一级缓存的话,它会将executor使用装饰者模式包装成CachingExecutor,来增强executor的功能。

代码语言:javascript
复制
if (cacheEnabled) {
  executor = new CachingExecutor(executor);
}

下面来看看CachingExecutor在Mybatis中是如何实现的。

代码语言:javascript
复制
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
  BoundSql boundSql = ms.getBoundSql(parameterObject);
  //创建缓存key
  CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
  return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
 
//org.apache.ibatis.executor.CachingExecutor#query(org.apache.ibatis.mapping.MappedStatement, java.lang.Object, org.apache.ibatis.session.RowBounds, org.apache.ibatis.session.ResultHandler, org.apache.ibatis.cache.CacheKey, org.apache.ibatis.mapping.BoundSql)
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
    throws SQLException {
  //拿到二级缓存,判断二级缓存是否存在此数据
  Cache cache = ms.getCache();
  if (cache != null) {
    flushCacheIfRequired(ms);
    if (ms.isUseCache() && resultHandler == null) {
      ensureNoOutParams(ms, boundSql);
      @SuppressWarnings("unchecked")
      List<E> list = (List<E>) tcm.getObject(cache, key);
      if (list == null) {
        //如果二级缓存为空,再从一级缓存中取查找,还找不到,则查询数据库,然后再讲结果放到缓存中  
        list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
        tcm.putObject(cache, key, list); // issue #578 and #116
      }
      return list;
    }
  }
  return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
七、总结

本篇文章主要总结了Mybatis里面其中一个重要的组件--Executor执行器,并介绍了它的继承体系以及分别介绍了它的常见的几个实现类,在执行查询或者缓存方面都是如何工作的,相信大家对Executor没有那么陌生了吧。

鉴于笔者水平有限,如果文章有什么错误或者需要补充的,希望小伙伴们指出来,希望这篇文章对大家有帮助。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-01-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、概述
  • 二、Executor执行器的创建过程
  • 三、SimpleExecutor
  • 四、ReuseExecutor
  • 五、BatchExecutor
  • 六、CachingExecutor
  • 七、总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档