您好,登錄后才能下訂單哦!
Spring怎么管理控制mybatis事務?針對這個問題,今天小編總結這篇有關mybatis事務管理的文章,希望能幫助更多想解決這個問題的朋友找到更加簡單易行的辦法。
mapper 文件是怎么解析的, SqlSessionFactory
這個重要的對象,是的就是我們經常需要配置的:
@Bean @ConditionalOnMissingBean public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception { // 略 }
這里面做了很多自動化的配置,當然我們可以通過重寫它來自定義我們自己的 sqlSessionFactory
,借用一下上篇文章的圖片:
spring 借助 SqlSessionFactoryBean
來創建 sqlSessionFactory
,這可以視作是一個典型的建造者模式,來創建 SqlSessionFactory
。
spring 拿到我們配置的 mapper 路徑去掃描我們 mapper.xml 然后進行一個循環進行解析(上篇文章第二章節:二、SqlSessionFactory 的初始化與 XMLMapperBuilder):
-- 代碼位于 org.mybatis.spring.SqlSessionFactoryBean#buildSqlSessionFactory -- if (this.mapperLocations != null) { if (this.mapperLocations.length == 0) { LOGGER.warn(() -> "Property 'mapperLocations' was specified but matching resources are not found."); } else { for (Resource mapperLocation : this.mapperLocations) { if (mapperLocation == null) { continue; } try { XMLMapperBuilder xmlMapperBuilder = new XMLMapperBuilder(mapperLocation.getInputStream(), targetConfiguration, mapperLocation.toString(), targetConfiguration.getSqlFragments()); xmlMapperBuilder.parse(); } catch (Exception e) { throw new NestedIOException("Failed to parse mapping resource: '" + mapperLocation + "'", e); } finally { ErrorContext.instance().reset(); } LOGGER.debug(() -> "Parsed mapper file: '" + mapperLocation + "'"); } } } else { LOGGER.debug(() -> "Property 'mapperLocations' was not specified."); } -- 代碼位于 org.apache.ibatis.builder.xml.XMLMapperBuilder#parse -- public void parse() { if (!configuration.isResourceLoaded(resource)) { configurationElement(parser.evalNode("/mapper")); // 上篇文章主要說的 configuration.addLoadedResource(resource); bindMapperForNamespace();// 創建mapperProxy的工廠對象 } parsePendingResultMaps(); parsePendingCacheRefs(); parsePendingStatements(); }
configurationElement(parser.evalNode("/mapper"));
里面發生的故事,實際上還有后續的步驟,如果對 mybatis 有所了解的,應該知道,mybatis 會為我們的接口創建一個叫做 mapperProxy
的代理對象(劃重點),其實就是在這后續的步驟 bindMapperForNamespace();
做的(不盡然,實際上是創建并綁定了 mapperProxyFactory
)。
不貼太多代碼,bindMapperForNamespace()
方法里核心做的主要就是調用 configuration.addMapper()
方法
if (boundType != null) { if (!configuration.hasMapper(boundType)) { // Spring may not know the real resource name so we set a flag // to prevent loading again this resource from the mapper interface // look at MapperAnnotationBuilder#loadXmlResource configuration.addLoadedResource("namespace:" + namespace); configuration.addMapper(boundType); } }
這個 boundType
就是我們在 mapper 文件里面指定的 namespace
,比如:
<mapper namespace="com.anur.mybatisdemo.test.TrackerConfigMapper"> XXXXXXXXXXXXXXXXXX 里面寫的sql語句,resultMap 等等,略</mapper>
在 configuration.addMapper()
中調用了 mapperRegistry.addMapper()
,看到 knowMappers
,這個就是存儲我們生產 MapperProxy
的工廠映射 map,我們稍微再講,先繼續往下看。
public <T> void addMapper(Class<T> type) { if (type.isInterface()) { if (hasMapper(type)) { throw new BindingException("Type " + type + " is already known to the MapperRegistry."); } boolean loadCompleted = false; try { knownMappers.put(type, new MapperProxyFactory<>(type)); // It's important that the type is added before the parser is run // otherwise the binding may automatically be attempted by the // mapper parser. If the type is already known, it won't try. MapperAnnotationBuilder parser = new MapperAnnotationBuilder(config, type); parser.parse(); loadCompleted = true; } finally { if (!loadCompleted) { knownMappers.remove(type); } } } }
看到 MapperAnnotationBuilder#parse()
,parse()
中主要是對這個接口里面定義的方法做了 parseStatement
這件事
for (Method method : methods) { try { // issue #237 if (!method.isBridge()) { parseStatement(method); } } catch (IncompleteElementException e) { configuration.addIncompleteMethod(new MethodResolver(this, method)); } }
parseStatement()
就是解析注解語句的地方, 如果說我們沒有寫 xml,將語句以注解的形式寫在方法上,則會在這里進行語句解析。它和我們上篇文章講到的解析xml很像,就是拿到一大堆屬性,比如 resultMap
,keyGenerator
等等,生成一個 MappedStatement
對象,這里就不贅述了。
void parseStatement(Method method) { Class<?> parameterTypeClass = getParameterType(method); LanguageDriver languageDriver = getLanguageDriver(method); SqlSource sqlSource = getSqlSourceFromAnnotations(method, parameterTypeClass, languageDriver); if (sqlSource != null) { // 解析注解式的 sql 語句,略 } }
我們知道承載 mapperStatement
的是一個 map 映射,通過我們在上篇文章中反復強調的 id
來作為 key,那么重復添加會出現什么呢?
答案在這里,mybatis
的這個 map 被重寫了,同時寫這兩者的話,會拋出 ...already contains value for...
的異常
-- 代碼位置 org.apache.ibatis.session.Configuration.StrictMap#put -- @Override @SuppressWarnings("unchecked") public V put(String key, V value) { if (containsKey(key)) { throw new IllegalArgumentException(name + " already contains value for " + key + (conflictMessageProducer == null ? "" : conflictMessageProducer.apply(super.get(key), value))); } if (key.contains(".")) { final String shortKey = getShortName(key); if (super.get(shortKey) == null) { super.put(shortKey, value); } else { super.put(shortKey, (V) new Ambiguity(shortKey)); } } return super.put(key, value); }
剛才在1.1中我們提到了,mapperProxy
,也就是剛才 org.apache.ibatis.binding.MapperRegistry#addMapper
的代碼:knownMappers.put(type, new MapperProxyFactory<>(type));
看到 MapperProxyFactory
的內部:
-- 有刪減 --public class MapperProxyFactory<T> { private final Class<T> mapperInterface; private final Map<Method, MapperMethod> methodCache = new ConcurrentHashMap<>(); public MapperProxyFactory(Class<T> mapperInterface) { this.mapperInterface = mapperInterface; } @SuppressWarnings("unchecked") protected T newInstance(MapperProxy<T> mapperProxy) { return (T) Proxy.newProxyInstance(mapperInterface.getClassLoader(), new Class[] { mapperInterface }, mapperProxy); } public T newInstance(SqlSession sqlSession) { final MapperProxy<T> mapperProxy = new MapperProxy<>(sqlSession, mapperInterface, methodCache); return newInstance(mapperProxy); } }
了解JDK動態代理的小伙伴應該很清楚了, newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h)
意為,為接口創建一個實現了 InvocationHandler
的代理對象。我們在調用接口方法的時候,實際上要看代理類是如何實現的。
那么看看 mapperProxy 的內部的 invoke
是如何實現的,這里有三類方法,
一種是一些 Object
對象帶來的方法,這里不進行代理,直接 invoke
,
一種是default方法,一種比較蛋疼的寫法,把接口當抽象類寫,里面可以放一個default方法寫實現,這種代理了也沒太大意義
最后一種也就是我們準備代理的方法, 它會為每個非上面兩者的方法,懶加載一個 MapperMethod
對象,并調用 MapperMethod#execute
來執行真正的 mybatis 邏輯。
-- 有刪減 --public class MapperProxy<T> implements InvocationHandler, Serializable { public MapperProxy(SqlSession sqlSession, Class<T> mapperInterface, Map<Method, MapperMethod> methodCache) { this.sqlSession = sqlSession; this.mapperInterface = mapperInterface; this.methodCache = methodCache; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { try { if (Object.class.equals(method.getDeclaringClass())) {// 來自 Object 的方法,比如 toString() return method.invoke(this, args); } else if (method.isDefault()) {// 靜態方法,我們可以直接忽略 if (privateLookupInMethod == null) { return invokeDefaultMethodJava8(proxy, method, args); } else { return invokeDefaultMethodJava9(proxy, method, args); } } } catch (Throwable t) { throw ExceptionUtil.unwrapThrowable(t); } final MapperMethod mapperMethod = cachedMapperMethod(method); return mapperMethod.execute(sqlSession, args); } private MapperMethod cachedMapperMethod(Method method) { return methodCache.computeIfAbsent(method, k -> new MapperMethod(mapperInterface, method, sqlSession.getConfiguration())); } }
MapperMethod
的邏輯是怎么樣的,也很好猜到,它的構造函數中創建了兩個對象,
public class MapperMethod { private final SqlCommand command; private final MethodSignature method; public MapperMethod(Class<?> mapperInterface, Method method, Configuration config) { this.command = new SqlCommand(config, mapperInterface, method); this.method = new MethodSignature(config, mapperInterface, method); }
sqlCommand
sqlCommand
實際上就是從 configuration
里面把它對應的 MappedStatement
取出來,持有它的唯一 id
和執行類型。
public static class SqlCommand { private final String name; private final SqlCommandType type; public SqlCommand(Configuration configuration, Class<?> mapperInterface, Method method) { final String methodName = method.getName(); final Class<?> declaringClass = method.getDeclaringClass(); MappedStatement ms = resolveMappedStatement(mapperInterface, methodName, declaringClass, configuration); if (ms == null) { if (method.getAnnotation(Flush.class) != null) { name = null; type = SqlCommandType.FLUSH; } else { throw new BindingException("Invalid bound statement (not found): " + mapperInterface.getName() + "." + methodName); } } else { name = ms.getId(); type = ms.getSqlCommandType(); if (type == SqlCommandType.UNKNOWN) { throw new BindingException("Unknown execution method for: " + name); } } }
MethodSignature MethodSignature
是針對接口返回值、參數等值的解析,比如我們的 @Param
注解,就是在 new ParamNameResolver(configuration, method);
里面解析的,比較簡單,在之前的文章 簡單概括的mybatis sqlSession 源碼解析 里也提到過,這里就不多說了。
public MethodSignature(Configuration configuration, Class<?> mapperInterface, Method method) { Type resolvedReturnType = TypeParameterResolver.resolveReturnType(method, mapperInterface); if (resolvedReturnType instanceof Class<?>) { this.returnType = (Class<?>) resolvedReturnType; } else if (resolvedReturnType instanceof ParameterizedType) { this.returnType = (Class<?>) ((ParameterizedType) resolvedReturnType).getRawType(); } else { this.returnType = method.getReturnType(); } this.returnsVoid = void.class.equals(this.returnType); this.returnsMany = configuration.getObjectFactory().isCollection(this.returnType) || this.returnType.isArray(); this.returnsCursor = Cursor.class.equals(this.returnType); this.returnsOptional = Optional.class.equals(this.returnType); this.mapKey = getMapKey(method); this.returnsMap = this.mapKey != null; this.rowBoundsIndex = getUniqueParamIndex(method, RowBounds.class); this.resultHandlerIndex = getUniqueParamIndex(method, ResultHandler.class); this.paramNameResolver = new ParamNameResolver(configuration, method); }
mapperMethod
就是 sqlSession
與 mappedStatement
的一個整合。它的執行是一個策略模式:
public Object execute(SqlSession sqlSession, Object[] args) { Object result; switch (command.getType()) { case INSERT: { Object param = method.convertArgsToSqlCommandParam(args); result = rowCountResult(sqlSession.insert(command.getName(), param)); break; } case UPDATE: { Object param = method.convertArgsToSqlCommandParam(args); result = rowCountResult(sqlSession.update(command.getName(), param)); break; } case DELETE: { Object param = method.convertArgsToSqlCommandParam(args); result = rowCountResult(sqlSession.delete(command.getName(), param)); break; } case SELECT: // 略.. }
具體是怎么執行的在文章 簡單概括的mybatis sqlSession 源碼解析 提到過,這里也不過多贅述。
這里對 MapperProxy
在初始化與調用過程中的關系做一下羅列:
為了避免有小伙伴對 sqlSession
完全沒有概念,這里將接口代碼貼出,可以看出 sqlSession
是執行語句的一個入口,同時也提供了事務的一些操作,實際上就是如此:
public interface SqlSession extends Closeable { <T> T selectOne(String statement); <T> T selectOne(String statement, Object parameter); <E> List<E> selectList(String statement); <E> List<E> selectList(String statement, Object parameter); <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds); <K, V> Map<K, V> selectMap(String statement, String mapKey); <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey); <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey, RowBounds rowBounds); <T> Cursor<T> selectCursor(String statement); <T> Cursor<T> selectCursor(String statement, Object parameter); <T> Cursor<T> selectCursor(String statement, Object parameter, RowBounds rowBounds); void select(String statement, Object parameter, ResultHandler handler); void select(String statement, ResultHandler handler); void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler); int insert(String statement); int insert(String statement, Object parameter); int update(String statement); int update(String statement, Object parameter); int delete(String statement); int delete(String statement, Object parameter); void commit(); void commit(boolean force); void rollback(); void rollback(boolean force); List<BatchResult> flushStatements(); void close(); void clearCache(); Configuration getConfiguration(); <T> T getMapper(Class<T> type); Connection getConnection(); }
首先忘掉 spring 為我們提供的便利,看一下基礎的,脫離了 spring 托管的 mybatis 是怎么進行 sql 操作的:
SqlSession sqlSession = sqlSessionFactory.openSession(); TrackerConfigMapper mapper = sqlSession.getMapper(TrackerConfigMapper.class); TrackerConfigDO one = mapper.getOne(1);
SqlSessionFactory
有兩個子類實現:DefaultSqlSessionFactory
和 SqlSessionManager
,SqlSessionManager
使用動態代理 + 靜態代理對 DefaultSqlSessionFactory
進行了代理,不過不用太在意這個 SqlSessionManager
,后面會說明原因。
上面不管怎么代理,實際邏輯的執行者都是 DefaultSqlSessionFactory
,我們看看它的創建方法,也就是 openSession()
實際執行的方法:
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) { Transaction tx = null; try { final Environment environment = configuration.getEnvironment(); final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment); tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit); final Executor executor = configuration.newExecutor(tx, execType); return new DefaultSqlSession(configuration, executor, autoCommit); } catch (Exception e) { closeTransaction(tx); // may have fetched a connection so lets call close() throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e); } finally { ErrorContext.instance().reset(); } }
environment
可用于數據源切換,那么提到數據源切換,就很容易想到了,連接的相關信息是這貨維持的。 所以看到我們的代碼: tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
, TransactionFactory
有三個實現,它們分別是 JdbcTransactionFactory
、ManagedTransactionFactory
和 SpringManagedTransactionFactory
。
JdbcTransactionFactory
和 ManagedTransactionFactory
最大的區別就在于 ManagedTransactionFactory
實現了空的 commit 與 rollback,源碼中這樣說道:付與容器來管理 transaction
的生命周期,這個博主不是特別熟悉,因為沒這么用過,tomcat、jetty 等容器實現了對 jdbc 的代理。要注意,不管如何都是使用的 jdbc 這套接口規范進行數據庫操作的。
/** * {@link Transaction} that lets the container manage the full lifecycle of the transaction. * Delays connection retrieval until getConnection() is called. * Ignores all commit or rollback requests. * By default, it closes the connection but can be configured not to do it. * * @author Clinton Begin * * @see ManagedTransactionFactory */
Transaction
是 mybatis 創建的一個對象,它實際上是對 jdbc
connection
對象的一個封裝:
-- 代碼位于 org.apache.ibatis.transaction.jdbc.JdbcTransaction -- @Override public Connection getConnection() throws SQLException { if (connection == null) { openConnection(); } return connection; } @Override public void commit() throws SQLException { if (connection != null && !connection.getAutoCommit()) { if (log.isDebugEnabled()) { log.debug("Committing JDBC Connection [" + connection + "]"); } connection.commit(); } } @Override public void rollback() throws SQLException { if (connection != null && !connection.getAutoCommit()) { if (log.isDebugEnabled()) { log.debug("Rolling back JDBC Connection [" + connection + "]"); } connection.rollback(); } }
我們知道 sqlSession 的 四大對象之一,Executor,負責統領全局,從語句獲取(從 mappedStatement),到參數拼裝(parameterHandler),再到執行語句(statementHandler),最后結果集封裝(resultHandler),都是它負責“指揮”的。
我們看到它使用 Transaction
進行初始化,另外的一個參數是它的類型,這里不多說,REUSE 是帶語句緩存的,和普通的 SimpleExecutor 沒有特別大的區別,BATCH 類型則是通過 jdbc 提供的批量提交來對網絡請求進行優化。
public enum ExecutorType { SIMPLE, REUSE, BATCH}
最后將持有 Transaction
的 Executor 置入 SqlSession
,完成一個 SqlSession
對象的創建。
可以看到,我們的確是一個SqlSession
對應一個連接(Transaction
),MapperProxy
這個業務接口的動態代理對象又持有一個 SqlSession
對象,那么總不可能一直用同一個連接吧?
當然有疑問是好的,而且通過對 SqlSession 初始化過程的剖析,我們已經完善了我們對 mybatis 的認知:
接下來就是來打消這個疑問,MapperProxy
持有的 sqlSession
和 SqlSessionFactory
創建的這個有什么關系?
實際上答案就在 SqlSessionTemplate
,SqlSessionTemplate
是 spring 對 mybatis SqlSessionFactory
的封裝,同時,它還是 SqlSession
的代理。
SqlSessionTemplate
和 mybatis 提供的 SqlSessionManager
( SqlSessionFactory
的另一個實現類,也是DefaultSqlSessionFactory
的代理類,可以細想一下,業務是否共用同一個 sqlSession
還要在業務里面去傳遞,去控制是不是很麻煩) 是一樣的思路,不過 spring 直接代理了 sqlSession
:
-- 代碼位于 org.mybatis.spring.SqlSessionTemplate -- private final SqlSessionFactory sqlSessionFactory; private final ExecutorType executorType; private final SqlSession sqlSessionProxy; private final PersistenceExceptionTranslator exceptionTranslator; /** * Constructs a Spring managed {@code SqlSession} with the given * {@code SqlSessionFactory} and {@code ExecutorType}. * A custom {@code SQLExceptionTranslator} can be provided as an * argument so any {@code PersistenceException} thrown by MyBatis * can be custom translated to a {@code RuntimeException} * The {@code SQLExceptionTranslator} can also be null and thus no * exception translation will be done and MyBatis exceptions will be * thrown * * @param sqlSessionFactory a factory of SqlSession * @param executorType an executor type on session * @param exceptionTranslator a translator of exception */ public SqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) { notNull(sqlSessionFactory, "Property 'sqlSessionFactory' is required"); notNull(executorType, "Property 'executorType' is required"); this.sqlSessionFactory = sqlSessionFactory; this.executorType = executorType; this.exceptionTranslator = exceptionTranslator; this.sqlSessionProxy = (SqlSession) newProxyInstance( SqlSessionFactory.class.getClassLoader(), new Class[] { SqlSession.class }, new SqlSessionInterceptor()); }
還是熟悉的配方,就是 jdk 的動態代理,SqlSessionTemplate
在初始化時創建了一個 SqlSession
代理,也內置了 ExecutorType
,SqlSessionFactory
等 defaultSqlSession
初始化的必要組件。
想必看到這里,已經有很多小伙伴知道這里是怎么回事了,是的,我們對 SqlSession
的操作都是經由這個代理來完成,代理的內部,實現了真正 SqlSession
的創建與銷毀,回滾與提交等,我們先縱覽以下它的代理實現。
對于這種jdk動態代理,我們看到 SqlSessionInterceptor#invoke
方法就明了了。我們先過一遍常規的流程,也就是沒有使用 spring 事務功能支持,執行完 sql 就直接提交事務的常規操作:
1、getSqlSession()
創建 sqlSession
2、執行 MapperProxy
,也就是前面講了一大堆的,MapperProxy
中,通過 MapperMethod
來調用 sqlSession
和我們生成好的 mappedStatement
操作 sql 語句。
3、提交事務
4、關閉事務
注:代碼有很大刪減
private class SqlSessionInterceptor implements InvocationHandler { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { SqlSession sqlSession = getSqlSession( SqlSessionTemplate.this.sqlSessionFactory, SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator); // 創建或者獲取真正需要的 SqlSession try { Object result = method.invoke(sqlSession, args); // 執行原本想對 SqlSession 做的事情 if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) { // force commit even on non-dirty sessions because some databases require // a commit/rollback before calling close() sqlSession.commit(true);// 如非 spring 管理事務,則直接提交 } finally { if (sqlSession != null) { closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory); } } } }
注意:注釋掉的代碼在此類型的操作中沒有什么意義,getSqlSession()
在這里只是簡單通過 sessionFactory
創建了一個 sqlSession
:
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) { // SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory); // SqlSession session = sessionHolder(executorType, holder); // if (session != null) { // return session; // } LOGGER.debug(() -> "Creating a new SqlSession"); session = sessionFactory.openSession(executorType); // registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session); return session; }
看完前面的實現,有小伙伴會好奇,我的 @Transactional 注解呢?我的事務傳播等級呢?
實際上,除去上述常規流程,更多的是要借助 TransactionSynchronizationManager
這個對象來完成,比如剛才步驟一,getSqlSession()
我暫時注釋掉的代碼里面,有一個很重要的操作:
我們把剛才 getSqlSession()
中注釋掉的代碼再拿回來看看:
SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory); SqlSession session = sessionHolder(executorType, holder); if (session != null) { return session; } session = sessionFactory.openSession(executorType); registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session); return session;
我們可以看到首先獲取一個叫做 SqlSessionHolder
的東西,如果里面沒有 sqlSession
則調用 sessionFactory.openSession(executorType);
創建一個,并把它注冊到 TransactionSynchronizationManager。
sqlSessionHolder 沒什么可說的,它就只是個純粹的容器,里面主要就是裝著一個 SqlSession
:
public SqlSessionHolder(SqlSession sqlSession, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) { notNull(sqlSession, "SqlSession must not be null"); notNull(executorType, "ExecutorType must not be null"); this.sqlSession = sqlSession; this.executorType = executorType; this.exceptionTranslator = exceptionTranslator; }
所以說我們只需要把目光焦距在 TransactionSynchronizationManager
就可以了,它的內部持有了很多個元素為 Map<Object, Object>
的 ThreadLocal
(代碼示例中只貼出了 resources
這一個 ThreadLocal
):
public abstract class TransactionSynchronizationManager { private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class); private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources"); @Nullable public static Object getResource(Object key) { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doGetResource(actualKey); if (value != null && logger.isTraceEnabled()) { logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } return value; } @Nullable private static Object doGetResource(Object actualKey) { Map<Object, Object> map = resources.get(); if (map == null) { return null; } Object value = map.get(actualKey); // Transparently remove ResourceHolder that was marked as void... if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { map.remove(actualKey); // Remove entire ThreadLocal if empty... if (map.isEmpty()) { resources.remove(); } value = null; } return value; }
也就是說,spring 的事務,是借助 TransactionSynchronizationManager
+ SqlSessionHolder
對 sqlSession
的控制來實現的。
那么這樣就很清晰了,如下總結,也如下圖:
MapperProxy
內置的 sqlSession
是 sqlSessiontemplate
sqlSessiontemplate
通過持有 SqlSessionFactory
來創建真正的 SqlSession
TransactionSynchronizationManager
+ SqlSessionHolder
則扮演著 SqlSession
管理的角色
上一個小節只是講了是什么,沒有講為什么,到了這里如果有好奇寶寶一定會好奇諸如 spring 的一系列事務控制是怎么實現的,當然本文不會講太多 spring 事務管理相關的太多東西,以后會有后續文章專門剖析事務管理。
我們可以簡單看下 TransactionInterceptor
,這是 @Transactional
注解的代理類。
/** * AOP Alliance MethodInterceptor for declarative transaction * management using the common Spring transaction infrastructure * ({@link org.springframework.transaction.PlatformTransactionManager}/ * {@link org.springframework.transaction.ReactiveTransactionManager}). * * <p>Derives from the {@link TransactionAspectSupport} class which * contains the integration with Spring's underlying transaction API. * TransactionInterceptor simply calls the relevant superclass methods * such as {@link #invokeWithinTransaction} in the correct order. * * <p>TransactionInterceptors are thread-safe. * * @author Rod Johnson * @author Juergen Hoeller * @see TransactionProxyFactoryBean * @see org.springframework.aop.framework.ProxyFactoryBean * @see org.springframework.aop.framework.ProxyFactory */@SuppressWarnings("serial")public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable { /** * Create a new TransactionInterceptor. * <p>Transaction manager and transaction attributes still need to be set. * @see #setTransactionManager * @see #setTransactionAttributes(java.util.Properties) * @see #setTransactionAttributeSource(TransactionAttributeSource) */ public TransactionInterceptor() { } @Override @Nullable public Object invoke(MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); }
可以看到它的代理方法 invoke()
的執行邏輯在 invokeWithinTransaction()
里:
--代碼位于 org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction -- @Nullable protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); final TransactionManager tm = determineTransactionManager(txAttr); if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) { // 響應式事務相關 } PlatformTransactionManager ptm = asPlatformTransactionManager(tm); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { cleanupTransactionInfo(txInfo); } if (vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... TransactionStatus status = txInfo.getTransactionStatus(); if (status != null && txAttr != null) { retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } } commitTransactionAfterReturning(txInfo); return retVal; } else { // CallbackPreferringPlatformTransactionManager 的處理邏輯 } }
invokeWithinTransaction()
的代碼雖然長,我們還是把它分段來看:
第一部分,準備階段
也就是這部分代碼:
// If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); final TransactionManager tm = determineTransactionManager(txAttr); PlatformTransactionManager ptm = asPlatformTransactionManager(tm); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
獲取 TransactionAttribute
(TransactionDefinition
(底層接口),這里面裝載了事務傳播等級,隔離級別等屬性。TransactionAttribute
的創建依據配置,或者我們的事務傳播等級注解,對什么異常進行回滾等,后續會繼續對它的應用做說明, PlatformTransactionManager
則是進行事務管理的主要操作者。
第二部分,事務開啟或者獲取與準備,也就是我們執行邏輯的第一行代碼 createTransactionIfNecessary()
(是不是和前面說到的 SqlSession的創建或者獲取很像?)
我們可以看到 createTransactionIfNecessary()
的實現就做了兩件事,其一是獲取一個叫做 TransactionStatus
的東西,另外則是調用 prepareTransactionInfo()
,獲取一個 TransactionInfo
:
// Standard transaction demarcation with getTransaction and commit/rollback calls. TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); --代碼位于 org.springframework.transaction.interceptor.TransactionAspectSupport#createTransactionIfNecessary -- protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { TransactionStatus status = tm.getTransaction(txAttr); return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
先看看第一件事,也就是獲取 TransactionStatus
,它保存了事務的 savePoint
,是否新事物等。刪減掉一些判斷方法,代碼如下:
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // Use defaults if no transaction definition given. TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults()); Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(def, transaction, debugEnabled); } if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( def, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, def); prepareSynchronization(status, def); return status; } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + def); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } }
代碼很長,但是不急,我們可以簡單看出它分為兩個部分:
第一部分是獲取事務 doGetTransaction()
第二部分則是判斷是否新事物,
則 TransactionDefinition.PROPAGATION_REQUIRED
、TransactionDefinition.PROPAGATION_REQUIRES_NEW
、TransactionDefinition.PROPAGATION_NESTED
是一種邏輯
其余是另一種邏輯,信息量有點大,但是慢慢來:
如果不是新事物,則執行 handleExistingTransaction
,
如果是新事物
protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource()); txObject.setConnectionHolder(conHolder, false); return txObject; }
doGetTransaction
獲取我們的事務對象,這里也使用了 TransactionSynchronizationManager
(前面說到的 SqlSession
的管理類),事務對象會嘗試獲取本事務所使用的連接對象,這個和事務傳播等級有關,先立個 flag。
我們可以看到這里面主要邏輯就是去獲取 ConnectionHolder
,實際上很簡單,只要能獲取到,就是已經存在的事務,獲取不到(或者事務已經關閉)就是新事物。
如果說前面無法從 TransactionSynchronizationManager
獲取到 conHolder
,或者說,我們的線程中并沒有 ConnectionHolder
那么將會進入此分支,此分支的支持的三個事務傳播等級 TransactionDefinition.PROPAGATION_REQUIRED
、TransactionDefinition.PROPAGATION_REQUIRES_NEW
、TransactionDefinition.PROPAGATION_NESTED
都是需要創建新事務的,所以它們在同一個分支里面:
SuspendedResourcesHolder suspendedResources = suspend(null); boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( def, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, def); prepareSynchronization(status, def); return status;
SuspendedResourcesHolder
與事務的掛起相關,doBegin()
則是對連接對象 connection
的獲取和配置,prepareSynchronization()
則是對新事物的一些初始化操作。我們一點點看:
/** * This implementation sets the isolation level but ignores the timeout. */ @Override protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection newCon = obtainDataSource().getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); txObject.setReadOnly(definition.isReadOnly()); // Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } con.setAutoCommit(false); } prepareTransactionalConnection(con, definition); txObject.getConnectionHolder().setTransactionActive(true); // Bind the connection holder to the thread. if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } } }
可以看到,ConnectionHolder
的創建和連接的打開就是在這里進行的,創建后,設置其隔離級別,取消 connection
的自動提交,將提交操作納入到 spring 管理,并且將其存到 TransactionSynchronizationManager
使得 4.2.1 提到的 doGetTransaction()
可以拿到此 ConnectionHolder
。
做完連接的獲取與配置后,下一步就是對事物的一些初始化:
/** * Initialize transaction synchronization as appropriate. */ protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) { if (status.isNewSynchronization()) { TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel( definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null); TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); TransactionSynchronizationManager.setCurrentTransactionName(definition.getName()); TransactionSynchronizationManager.initSynchronization(); } }
這個代碼都是代碼字面意義的簡單設置,就不贅述了。
剛才講的是 “無法從 TransactionSynchronizationManager
獲取到 conHolder
”,并且屬于一些需要創建新事物的傳播等級的情況。
如果說方才沒有事務,也不需要創建新的事務,則會進入此分支,創建一個空的 TransactionStatus
,內部的事務對象為空,代碼很簡單就不貼了,有興趣可以去看看 org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
的最后一個分支。
剛才說的都是無法獲取到 conHolder
的情況,如果獲取到了,則又是另一套代碼了,handleExistingTransaction
很長,它的第一個部分是對傳播等級的控制,有興趣的小伙伴可以去看看源碼,我這里只挑一個簡單的傳播等級 PROPAGATION_NESTED_NEW
做說明(其他的會在專門的事務一期做講解):
-- 代碼位于 org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction --private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } SuspendedResourcesHolder suspendedResources = suspend(transaction); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } } ... 略 }
我們可以發現和 4.2.2 新事物的處理
代碼是一樣的,唯一的區別就是此 TransactionStatus
對象會真正內嵌一個事務掛起對象 SuspendedResourcesHolder
。
拿到 TransactionStatus
之后, prepareTransactionInfo()
里簡單的將剛才那些 PlatformTransactionManager
、TransactionAttribute
、TransactionStatus
包裝成一個 TransactionInfo
對象,并將其保存在 ThreadLocal
中,這個 bindToThread()
還會將當前已經持有的 TransactionInfo
對象暫存。
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, String joinpointIdentification, @Nullable TransactionStatus status) { TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); if (txAttr != null) { // The transaction manager will flag an error if an incompatible tx already exists. txInfo.newTransactionStatus(status); } // We always bind the TransactionInfo to the thread, even if we didn't create // a new transaction here. This guarantees that the TransactionInfo stack // will be managed correctly even if no transaction was created by this aspect. txInfo.bindToThread(); return txInfo; }
到這里思路就很清晰了,代理為我們做的事情就是生成了一個叫做 TransactionInfo
的東西,里面的 TransactionManager
可以使得 spring 去對最底層的 connection
對象做一些回滾,提交操作。TransactionStatus
則保存掛起的事務的信息,以及當前事務的一些狀態,如下圖:
讓我們回到第四節開頭的那段很長的代碼,到這里是不是很明了了:
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); final TransactionManager tm = determineTransactionManager(txAttr); PlatformTransactionManager ptm = asPlatformTransactionManager(tm); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { cleanupTransactionInfo(txInfo); } if (vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... TransactionStatus status = txInfo.getTransactionStatus(); if (status != null && txAttr != null) { retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } } commitTransactionAfterReturning(txInfo); return retVal; } }
1、獲取 TransactionInfo
2、執行切面
3、將之前掛起的 TransactionInfo
找回:
private void bindToThread() { // Expose current TransactionStatus, preserving any existing TransactionStatus // for restoration after this transaction is complete. this.oldTransactionInfo = transactionInfoHolder.get(); transactionInfoHolder.set(this); } private void restoreThreadLocalStatus() { // Use stack to restore old transaction TransactionInfo. // Will be null if none was set. transactionInfoHolder.set(this.oldTransactionInfo); }
4、如果需要,則提交當前事務
5、返回切面值
我們在第三章講到,mybatis有一個叫做 defualtSqlSessionFactory
的類,負責創建 sqlSession
,但是它和 spring 又是怎么產生關聯的呢?
答案就在于,spring 實現了自己的 TransactionFactory
,以及自己的 Transaction
對象 SpringManagedTransaction
。回顧一下 SqlSession
的創建過程:
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) { Transaction tx = null; try { final Environment environment = configuration.getEnvironment(); final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment); tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit); final Executor executor = configuration.newExecutor(tx, execType); return new DefaultSqlSession(configuration, executor, autoCommit); } catch (Exception e) { closeTransaction(tx); // may have fetched a connection so lets call close() throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e); } finally { ErrorContext.instance().reset(); } }
看一下 SpringManagedTransaction
是如何管理 connection
的:
private void openConnection() throws SQLException { this.connection = DataSourceUtils.getConnection(this.dataSource); this.autoCommit = this.connection.getAutoCommit(); this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource); LOGGER.debug(() -> "JDBC Connection [" + this.connection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring"); }
DataSourceUtils.getConnection(this.dataSource);
劃重點,里面的實現不用我多說了,我們可以看到熟悉的身影,也就是 ConnectionHolder
,連接是從這里(優先)拿的:
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource); if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) { conHolder.requested(); if (!conHolder.hasConnection()) { logger.debug("Fetching resumed JDBC Connection from DataSource"); conHolder.setConnection(fetchConnection(dataSource)); } return conHolder.getConnection(); }
更新整套體系圖:
我們整體簡單過一次:
mybatis 啟動時根據xml、注解創建了 mapperedStatement
,用于sql執行,創建了 SqlSessionFactory
用于創建 SqlSession
對象。
mybatis 啟動時創建了 MapperProxyFactory
用于創建接口的代理對象 MapperProxy
在創建 MapperProxy
時,spring 為其注入了一個 sqlSession
用于 sql執行,但是這個 sqlSession
是一個代理對象,叫做 sqlSessionTemplate
,它會自動選擇我們該使用哪個 sqlSession
去執行
在執行時,spring 切面在執行事務之前,會創建一個叫做 TransactionInfo
的對象,此對象會根據事務傳播等級來控制是否創建新連接,是否掛起上一個連接,將信息保存在 TransactionSynchronizationManager
到了真正需要創建或者獲取 sqlSession
時,spring 重寫的 TransactionFactory
會優先去 TransactionSynchronizationManager
中拿連接對象。
看完上述內容,你們對mybatis事務的管理控制有進一步的了解嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。