Spring 源碼學習(九) Transaction 事務

業務系統的數據,一般最後都會落入到數據庫中,例如 MySQL、Oracle 等主流數據庫,不可避免的,在數據更新時,有可能會遇到錯誤,這時需要將之前的數據更新操作撤回,避免錯誤數據。

Spring 的聲明式事務能幫我們處理回滾操作,讓我們不需要去關注數據庫底層的事務操作,可以不用在出現異常情況下,在 try / catch / finaly 中手寫回滾操作。

Spring 的事務保證程度比行業中其它技術(例如 TCC / 2PC / 3PC 等)稍弱一些,但使用 Spring 事務已經滿足大部分場景,所以它的使用和配置規則也是值得學習的。

接下來一起學習 Spring 事務是如何使用以及實現原理吧。

Table of Contents generated with DocToc

  • 使用例子
  • 註解屬性 @Transactional事務的傳播性 Propagation事務的隔離性 Isolation
  • Spring 中實現邏輯解析註冊 InfrastructureAdvisorAutoProxyCreator判斷目標方法是否適合 canApply匹配標籤 match小結運行事務增強器 TransactionInterceptor事務管理器事務開啟獲取 TransactionStatus獲取事務處理已經存在的事務PROPAGATION_NEVERPROPAGATION_NOT_SUPPORTED事務掛起PROPAGATION_REQUIRES_NEWPROPAGATION_NESTED事務創建小結事務回滾 & 提交
  • 總結
  • 參考資料

使用例子

1.創建數據庫表

<code>create table test.user(id int auto_incrementprimary key,name varchar(20) null, age int(3) null)engine=InnoDB charset=utf8;/<code>

2.創建對應數據庫表的 PO

<code>public class JdbcUser {private Integer id;private String name;private Integer age;...(使用 ctrl + N 進行代碼補全 setter 和 getter)}/<code>

3.創建表與實體間的映射

在使用 JdbcTemplate 時很糾結,在 Java 類中寫了很多硬編碼 SQL,與 MyBatis 使用方法不一樣,為了示例簡單,使用了 JdbcTemplate,不過還是建議朋友們用 MyBatis,讓代碼風格整潔。

<code>public class UserRowMapper implements RowMapper {@Overridepublic Object mapRow(ResultSet rs, int rowNum) throws SQLException {JdbcUser user = new JdbcUser();user.setId(rs.getInt("id"));user.setName(rs.getString("name"));user.setAge(rs.getInt("age"));return user;}}/<code>

4.創建數據操作接口

<code>public interface UserDao {/** * 插入 * @param user用戶信息 */void insertUser(JdbcUser user);/** * 根據 id 進行刪除 * @param id主鍵 */void deleteById(Integer id);/** * 查詢 * @return全部 */List<jdbcuser> selectAll();}/<jdbcuser>/<code>

5.創建數據操作接口實現類

跟書中例子不一樣,沒有在接口上加入事務註解,而是在公共方法上進行添加,可以在每個方法上自定義傳播事件、隔離級別。

<code>public class UserJdbcTemplate implements UserDao {private DataSource dataSource;private JdbcTemplate jdbcTemplate;@Override@Transactional(propagation = Propagation.REQUIRED)public void insertUser(JdbcUser user) {String sql = "insert into user (id, name, age) values (?, ?, ?)";jdbcTemplate.update(sql, user.getId(), user.getName(), user.getAge());System.out.println("Create record : " + user.toString());}@Override@Transactionalpublic void deleteById(Integer id) {String sql = "delete from user where id = ?";jdbcTemplate.update(sql, id);System.out.println("Delete record, id = " + id);// 事務測試,拋出異常,讓上面的插入操作回滾throw new RuntimeException("aa");}@Overridepublic List<jdbcuser> selectAll() {String sql = "select * from user";List<jdbcuser> users = jdbcTemplate.query(sql, new UserRowMapper());return users;}public void setDataSource(DataSource dataSource) {// 使用 setter 注入參數時,同時初始化 jdbcTemplatethis.dataSource = dataSource;this.jdbcTemplate = new JdbcTemplate(dataSource);}}/<jdbcuser>/<jdbcuser>/<code> 

6.創建配置文件

<code><beans><bean><property><property><property><property>/<bean><bean><property>/<bean><annotation-driven><bean><property>/<bean>/<beans>/<code>

7.添加依賴

記得添加數據庫連接和 jdbc、tx 這兩個 spring 模塊的依賴

<code>optional(project(":spring-jdbc"))  // for Quartz supportoptional(project(":spring-tx"))  // for Quartz supportcompile group: 'mysql', name: 'mysql-connector-java', version: '5.1.6'/<code>

8.啟動代碼

<code>public class TransactionBootstrap {public static void main(String[] args) {ApplicationContext context = new ClassPathXmlApplicationContext("transaction/transaction.xml");UserJdbcTemplate jdbcTemplate = (UserJdbcTemplate) context.getBean("userJdbcTemplate");System.out.println("--- Records Creation start ----");JdbcUser user = new JdbcUser(4, "test", 21);jdbcTemplate.insertUser(user);}}/<code>

通過上面的代碼,我做了兩個測試:

  1. 配置文件中,沒開啟事務。 也就是 <annotation-driven> 這一行被註釋了,雖然我們執行的方法中拋出了 RuntimeExcepton,但是數據庫中依舊被插入了數據。
  2. 配置文件中,開啟事務。 將上面的註釋去掉,刪掉數據庫中的記錄,重新執行啟動代碼,發現數據沒有被插入, 在程序拋出異常情況下,Spring 成功執行了事務,回滾了插入操作。

註解屬性 @Transactional

具體位置在:org.springframework.transaction.annotation.Transactional

屬性類型作用 valueString可選的限定描述符,指定使用的事務管理器 propagation枚舉:Propagation可選的事務傳播行為 isolation枚舉:Isolation可選的事務隔離級別設置 readOnlyboolean設置讀寫或只讀事務,默認是隻讀 rollbackForClass 數組,必須繼承自 Throwable導致事務回滾的異常類數組 rollbackForClassName類名稱數組,必須繼承自 Throwable 導致事務回滾的異常類名字數組noRollbackForClass 數組,必須繼承自 Throwable不會導致事務回滾的異常類數組 noRollbackForClassName類名稱數組,必須繼承自 Throwable 不會導致事務回滾的異常類名字數組


事務的傳播性 Propagation

  • REQUIRED

這是默認的傳播屬性,如果外部調用方有事務,將會加入到事務,沒有的話新建一個。

  • PROPAGATION_SUPPORTS

如果當前存在事務,則加入到該事務;如果當前沒有事務,則以非事務的方式繼續運行。

  • PROPAGATION_NOT_SUPPORTED

以非事務方式運行,如果當前存在事務,則把當前事務掛起。

  • PROPAGATION_NEVER

以非事務方式運行,如果當前存在事務,則拋出異常。


事務的隔離性 Isolation

  • READ_UNCOMMITTED

最低級別,只能保證不讀取 物理上損害的數據,允許髒讀

  • READ_COMMITTED

只能讀到已經提交的數據

  • REPEATABLE_READ

可重複讀

  • SERIALIZABLE

串行化讀,讀寫相互阻塞

這裡只是簡單描述了一下這兩個主要屬性,因為底層與數據庫相關,可以看下我之前整理過的 MySQL鎖機制


Spring 中實現邏輯

介紹完如何使用還有關鍵屬性設定,本著知其然,知其所以然的學習精神,來了解代碼是如何實現的吧。


解析

之前在解析自定義標籤時提到,AOP 和 TX 都使用了自定義標籤,按照我們上一篇 AOP 的學習,再來一遍解析自定義標籤的套路:事務自定義標籤。

定位到 TxNamespaceHandler 類的初始化方法:

<code>@Overridepublic void init() {registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());// 使用 AnnotationDrivenBeanDefinitionParser 解析器,解析 annotation-driven 標籤registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());}/<code> 

根據上面的方法,Spring 在初始化時候,如果遇到諸如 <annotation-driven> 開頭的配置後,將會使用 AnnotationDrivenBeanDefinitionParser 解析器的 parse 方法進行解析。/<annotation-driven>

<code>public BeanDefinition parse(Element element, ParserContext parserContext) {registerTransactionalEventListenerFactory(parserContext);String mode = element.getAttribute("mode");// AspectJ 另外處理if ("aspectj".equals(mode)) {// mode="aspectj"registerTransactionAspect(element, parserContext);if (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader())) {registerJtaTransactionAspect(element, parserContext);}}else {// mode="proxy"AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);}return null;}/<code>

Spring 中的事務默認是以 AOP 為基礎,如果需要使用 AspectJ 的方式進行事務切入,需要在 mode 屬性中配置:

<code><annotation-driven>/<code>

本篇筆記主要圍繞著默認實現方式,動態 AOP 來學習,如果對於 AspectJ 實現感興趣請查閱更多資料~


註冊 InfrastructureAdvisorAutoProxyCreator

與 AOP 一樣,在解析時,會創建一個自動創建代理器,在事務 TX 模塊中,使用的是 InfrastructureAdvisorAutoProxyCreator。

首先來看,在默認配置情況下,AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext) 做了什麼操作:

<code>private static class AopAutoProxyConfigurer {public static void configureAutoProxyCreator(Element element, ParserContext parserContext) {// 註冊 InfrastructureAdvisorAutoProxyCreator 自動創建代理器AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);// txAdvisorBeanName = org.springframework.transaction.config.internalTransactionAdvisorString txAdvisorBeanName = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME;if (!parserContext.getRegistry().containsBeanDefinition(txAdvisorBeanName)) {Object eleSource = parserContext.extractSource(element);// Create the TransactionAttributeSource definition.// 創建 TransactionAttributeSource 的 beanRootBeanDefinition sourceDef = new RootBeanDefinition("org.springframework.transaction.annotation.AnnotationTransactionAttributeSource");// 註冊 bean,並使用 Spring 中的定義規則生成 beanNameString sourceName = parserContext.getReaderContext().registerWithGeneratedName(sourceDef);// 創建 TransactionInterceptor 的 beanRootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class);interceptorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));String interceptorName = parserContext.getReaderContext().registerWithGeneratedName(interceptorDef);// 創建 TransactionAttributeSourceAdvisor 的 beanRootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);// 將 sourceName 的 bean 注入 advisor 的 transactionAttributeSource 屬性中advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));// 將 interceptorName 的 bean 注入到 advisor 的 adviceBeanName 屬性中advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);if (element.hasAttribute("order")) {// 如果配置了 order 屬性,則加入到 bean 中advisorDef.getPropertyValues().add("order", element.getAttribute("order"));}// 以 txAdvisorBeanName 名字註冊 advisorDefparserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef);// 創建 CompositeComponentDefinitionCompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource);compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName));compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName));compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName));parserContext.registerComponent(compositeDef);}}}/<code>

在這裡註冊了代理類和三個 bean,這三個關鍵 bean 支撐了整個事務功能,為了待會更好的理解這三者的關聯關係,我們先來回顧下 AOP 的核心概念:

  1. Pointcut 定義一個切點,可以在這個被攔截的方法前後進行切面邏輯。
  2. Advice 用來定義攔截行為,在這裡實現增強的邏輯,它是一個祖先接口 org.aopalliance.aop.Advice。還有其它繼承接口,例如 MethodBeforeAdvice ,特定指方法執行前的增強。
  3. Advisor 用來封裝切面的所有信息,主要是上面兩個,它用來充當 Advice 和 Pointcut 的適配器。
Spring 源碼學習(九) Transaction 事務

回顧完 AOP 的概念後,繼續來看下這三個關鍵 bean:

  • TransactionInterceptor: 實現了 Advice 接口,在這裡定義了攔截行為。
  • AnnotationTransactionAttributeSource:封裝了目標方法是否被攔截的邏輯,雖然沒有實現 Pointcut 接口,但是在後面目標方法判斷的時候,實際上還是委託給了 AnnotationTransactionAttributeSource.getTransactionAttributeSource,通過適配器模式,返回了 Pointcut 切點信息。
  • TransactionAttributeSourceAdvisor: 實現了 Advisor 接口,包裝了上面兩個信息。

這三個 bean 組成的結構與 AOP 切面環繞實現的結構一致,所以先學習 AOP 的實現,對事務的瞭解會有所幫助


接著看我們的自動創建代理器是如何創建的:

AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element)


<code>public static void registerAutoProxyCreatorIfNecessary(ParserContext parserContext, Element sourceElement) {BeanDefinition beanDefinition = AopConfigUtils.registerAutoProxyCreatorIfNecessary(parserContext.getRegistry(), parserContext.extractSource(sourceElement));useClassProxyingIfNecessary(parserContext.getRegistry(), sourceElement);registerComponentIfNecessary(beanDefinition, parserContext);}private static void registerComponentIfNecessary(@Nullable BeanDefinition beanDefinition, ParserContext parserContext) {if (beanDefinition != null) {    // 註冊的 beanName 是 org.springframework.aop.config.internalAutoProxyCreatorparserContext.registerComponent(new BeanComponentDefinition(beanDefinition, AopConfigUtils.AUTO_PROXY_CREATOR_BEAN_NAME));}}/<code>

在這一步中,註冊了一個 beanName 是 org.springframework.aop.config.internalAutoProxyCreator 的 bean:InfrastructureAdsivorAutoProxyCreator,下圖是它的繼承體系圖:

Spring 源碼學習(九) Transaction 事務

可以看到,它實現了 InstantiationAwareBeanPostProcessor 這個接口,也就是說在 Spring 容器中,所有 bean 實例化時,Spring 都會保證調用其 postProcessAfterInitialization 方法。

與上一篇介紹的 AOP 代理器一樣,在實例化 bean 的時候,調用了代理器父類 AbstractAutoProxyCreator 的 postProcessAfterInitialization 方法:

<code>public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {if (bean != null) {// 組裝 keyObject cacheKey = getCacheKey(bean.getClass(), beanName);if (this.earlyProxyReferences.remove(cacheKey) != bean) {// 如果適合被代理,則需要封裝指定的 beanreturn wrapIfNecessary(bean, beanName, cacheKey);}}return bean;}/<code>

其中關於 wrapIfNecessory 方法,在上一篇 AOP 中已經詳細講過,這裡講下這個方法做了什麼工作:

  1. 找出指定 bean 對應的增強器
  2. 根據找出的增強器創建代理

與創建 AOP 代理相似的過程就不再重複說,講下它們的不同點:


判斷目標方法是否適合 canApply

AopUtils#canApply(Advisor, Class>, boolean)


<code>public static boolean canApply(Advisor advisor, Class> targetClass, boolean hasIntroductions) {if (advisor instanceof IntroductionAdvisor) {return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);}else if (advisor instanceof PointcutAdvisor) {PointcutAdvisor pca = (PointcutAdvisor) advisor;return canApply(pca.getPointcut(), targetClass, hasIntroductions);}else {// It doesn't have a pointcut so we assume it applies.return true;}}/<code>

我們在前面看到,TransactionAttributeSourceAdvisor 的父類是 PointcutAdvisor,所以在目標方法判斷的時候,會取出切點信息 pca.getPointcut()。

我們之前注入的切面類型 bean 是 AnnotationTransactionAttributeSource,通過下面的方法包裝,最後返回對象類型是 TransactionAttributeSourcePointcut 的切點信息

<code>private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {@Override@Nullableprotected TransactionAttributeSource getTransactionAttributeSource() {// 實現父類的方法,在子類中進行了擴展,返回之前在標籤註冊時的AnnotationTransactionAttributeSourcereturn transactionAttributeSource;}};/<code>

匹配標籤 match

在匹配 match 操作中,區別的是 AOP 識別的是 @Before 、@After,而我們的事務 TX 識別的是 @Transactional 標籤。

判斷是否是事務方法的入口方法在這:

org.springframework.transaction.interceptor.TransactionAttributeSourcePointcut#matches


<code>@Overridepublic boolean matches(Method method, Class> targetClass) {// 事務切點匹配的方法TransactionAttributeSource tas = getTransactionAttributeSource();return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);}/<code>

那它到底到哪一步解析事務註解的呢,繼續跟蹤代碼,答案是:

AnnotationTransactionAttributeSource#determineTransactionAttribute


<code>protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {for (TransactionAnnotationParser parser : this.annotationParsers) {TransactionAttribute attr = parser.parseTransactionAnnotation(element);if (attr != null) {return attr;}}return null;}/<code>

在這一步中,遍歷註冊的註解解析器進行解析,由於我們關注的是事務解析,所以直接定位到事務註解的解析器:

SpringTransactionAnnotationParser#parseTransactionAnnotation(AnnotatedElement)


<code>public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {// 解析事務註解的屬性AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(element, Transactional.class, false, false);if (attributes != null) {return parseTransactionAnnotation(attributes);}else {return null;}}/<code>

首先判斷是否含有 @Transactional 註解,如果有的話,才繼續調用 parse 解析方法:

<code>protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();// 註釋 9.4 解析事務註解的每一個屬性Propagation propagation = attributes.getEnum("propagation");rbta.setPropagationBehavior(propagation.value());Isolation isolation = attributes.getEnum("isolation");rbta.setIsolationLevel(isolation.value());rbta.setTimeout(attributes.getNumber("timeout").intValue());rbta.setReadOnly(attributes.getBoolean("readOnly"));rbta.setQualifier(attributes.getString("value"));List<rollbackruleattribute> rollbackRules = new ArrayList<>();for (Class> rbRule : attributes.getClassArray("rollbackFor")) {rollbackRules.add(new RollbackRuleAttribute(rbRule));}for (String rbRule : attributes.getStringArray("rollbackForClassName")) {rollbackRules.add(new RollbackRuleAttribute(rbRule));}for (Class> rbRule : attributes.getClassArray("noRollbackFor")) {rollbackRules.add(new NoRollbackRuleAttribute(rbRule));}for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {rollbackRules.add(new NoRollbackRuleAttribute(rbRule));}rbta.setRollbackRules(rollbackRules);return rbta;}/<rollbackruleattribute>/<code>

小結

通過上面的步驟,完成了對應類或者方法的事務屬性解析。

主要步驟在於尋找增強器,以及判斷這些增強器是否與方法或者類匹配。

如果某個 bean 屬於可被事務增強時,也就是適用於增強器 BeanFactoryTransactionAttributeSourceAdvisor 進行增強。

之前我們注入了 TransactionInterceptor 到 BeanFactoryTransactionAttributeSourceAdvisor 中,所以在調用事務增強器增強的代理類時,會執行 TransactionInterceptor 進行增強。同時,也就是在 TransactionInterceptor 類中的 invoke 方法中完成整個事務的邏輯。


運行


事務增強器 TransactionInterceptor

TransactionInterceptor 支撐著整個事務功能的架構。跟之前 AOP 的 JDK 動態代理 分析的一樣,TransactionInterceptor 攔截器繼承於 MethodInterceptor,所以我們要從它的關鍵方法 invoke() 看起:

<code>public Object invoke(MethodInvocation invocation) throws Throwable {// 註釋 9.5 執行事務攔截器,完成整個事務的邏輯Class> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);// Adapt to TransactionAspectSupport's invokeWithinTransaction...return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);}/<code>

實際調用了父類的方法:TransactionAspectSupport#invokeWithinTransaction

<code>protected Object invokeWithinTransaction(Method method, @Nullable Class> targetClass,final InvocationCallback invocation) throws Throwable {// 如果transaction屬性為null,則該方法是非事務性的TransactionAttributeSource tas = getTransactionAttributeSource();// 獲取對應事務屬性final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);// 獲取事務管理器final PlatformTransactionManager tm = determineTransactionManager(txAttr);// 構造方法唯一標識(類.方法)final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {// 聲明式事務處理// 標準事務劃分 : 使用 getTransaction 和 commit / rollback 調用TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);Object retVal;try {//傳入的是回調函數對象: invocation.proceed。 執行被增強的方法retVal = invocation.proceedWithInvocation();}catch (Throwable ex) {// 異常回滾completeTransactionAfterThrowing(txInfo, ex);throw ex;}finally {// 清除信息cleanupTransactionInfo(txInfo);}// 提交事務commitTransactionAfterReturning(txInfo);return retVal;}else {// 編程式事務處理final ThrowableHolder throwableHolder = new ThrowableHolder();// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.try {Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);...return result;}}}/<code>

貼出的代碼有刪減,簡略了錯誤異常的 try / catch 和編程式事務處理的邏輯。因為我們更多使用到的是聲明式事務處理,就是在 XML 文件配置或者 @Transactional 註解編碼,實際通過 AOP 實現,而編程式事務處理是通過 Transaction Template 實現,比較少使用到,所以省略了這部分處理代碼。


事務管理器

通過該方法,確定要用於給定事務的特定事務管理器

TransactionAspectSupport#determineTransactionManager


<code>protected PlatformTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {// Do not attempt to lookup tx manager if no tx attributes are set// 註釋 9.6 尋找事務管理器if (txAttr == null || this.beanFactory == null) {// 如果沒有事務屬性或者 BeanFactory 為空時,從緩存裡面尋找return asPlatformTransactionManager(getTransactionManager());}String qualifier = txAttr.getQualifier();// 如果註解配置中指定了事務管理器,直接取出使用if (StringUtils.hasText(qualifier)) {return determineQualifiedTransactionManager(this.beanFactory, qualifier);}else if (StringUtils.hasText(this.transactionManagerBeanName)) {return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);}else {// 上面步驟都沒找到,最後才去容器中,根據 className 來尋找PlatformTransactionManager defaultTransactionManager = asPlatformTransactionManager(getTransactionManager());...return defaultTransactionManager;}}/<code>

由於最開始我們在 XML 文件中配置過 transactionManager 屬性,所以該方法在我們例子中將會返回類型是 DataSourceTransactionManager 的事務管理器,下面是 DataSourceTransactionManager 的繼承體系:


Spring 源碼學習(九) Transaction 事務


它實現了 InitializingBean 接口,不過只是在 afterPropertiesSet() 方法中,簡單校驗 dataSource 是否為空,不細說這個類。


事務開啟

TransactionAspectSupport#createTransactionIfNecessary


<code>protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {// 如果沒有名稱指定則使用方法唯一標識,並使用 DelegatingTransactionAttribute 包裝 txAttrif (txAttr != null && txAttr.getName() == null) {txAttr = new DelegatingTransactionAttribute(txAttr) {@Overridepublic String getName() {return joinpointIdentification;}};}TransactionStatus status = null;if (txAttr != null) {if (tm != null) {// 獲取 TransactionStatusstatus = tm.getTransaction(txAttr);}}// 根據指定的屬性與 status 準備一個 TransactionInforeturn prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);}/<code>

在創建事務方法中,主要完成以下三件事:

  1. 使用 DelegatingTransactionAttribute 包裝 txAttr 實例
  2. 獲取事務:tm.getTransaction(txAttr)
  3. 構建事務信息:prepareTransactionInfo(tm, txAttr, joinpointIdentification, status)

核心方法在第二點和第三點,分別摘取核心進行熟悉。


獲取 TransactionStatus

status = tm.getTransaction(txAttr);

由於代碼較長,直接來總結其中幾個關鍵點

獲取事務

創建對應的事務實例,我們使用的是 DataSourceTransactionManager 中的 doGetTransaction 方法,創建基於 JDBC 的事務實例。

<code>protected Object doGetTransaction() {DataSourceTransactionObject txObject = new DataSourceTransactionObject();txObject.setSavepointAllowed(isNestedTransactionAllowed());// 如果當前線程已經記錄數據庫鏈接則使用原有鏈接ConnectionHolder conHolder =(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());// false 表示非新創建連接txObject.setConnectionHolder(conHolder, false);return txObject;}/<code>

其中在同一個線程中,判斷是否有重複的事務,是在 TransactionSynchronizationManager.getResource(obtainDataSource()) 中完成的,關鍵判斷邏輯是下面這個:

<code>private static final ThreadLocal> resources =new NamedThreadLocal<>("Transactional resources");private static Object doGetResource(Object actualKey) {Map<object> map = resources.get();if (map == null) {return null;}Object value = map.get(actualKey);// Transparently remove ResourceHolder that was marked as void...if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {map.remove(actualKey);// Remove entire ThreadLocal if empty...if (map.isEmpty()) {resources.remove();}value = null;}return value;}/<object>/<code>

結論:resources 是一個 ThreadLocal 線程私有對象,每個線程獨立存儲,所以判斷是否存在事務,判斷的依據是當前線程、當前數據源(DataSource)中是否存在活躍的事務 - map.get(actualKey)。


處理已經存在的事務

根據前面說的,判斷當前線程是否存在事務,判斷依據為當前線程記錄的連接不為空且連接中(connectionHolder)中的 transactionActive 屬性不為空,如果當前線程存在事務,將根據不同的事務傳播特性進行處理。具體代碼邏輯如下:

<code>if (isExistingTransaction(transaction)) {// Existing transaction found -> check propagation behavior to find out how to behave.// 當前線程存在事務,分情況進行處理return handleExistingTransaction(def, transaction, debugEnabled);}/<code>

PROPAGATION_NEVER

在配置中配置設定為 PROPAGATION_NEVER,表示該方法需要在非事務的環境下運行,但處於事務處理的狀態(可能是外部帶事務的方法調用了非事務的方法),將會拋出異常:

<code>if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");}/<code>

PROPAGATION_NOT_SUPPORTED

如果有事務存在,將事務掛起,而不是拋出異常:

<code>if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {Object suspendedResources = suspend(transaction);boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);}/<code>

事務掛起

對於掛起操作,主要目的是記錄原有事務的狀態,以便於後續操作對事務的恢復:

實際上,suspend() 方法調用的是事務管理器 DataSourceTransactionManager 中的 doSuspend() 方法

<code>protected Object doSuspend(Object transaction) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;//  將數據庫連接設置為 nulltxObject.setConnectionHolder(null);return TransactionSynchronizationManager.unbindResource(obtainDataSource());}/<code>

最後調用的關鍵方法是 TransactionSynchronizationManager#doUnbindResource

<code>private static Object doUnbindResource(Object actualKey) {Map<object> map = resources.get();if (map == null) {return null;}Object value = map.remove(actualKey);if (map.isEmpty()) {resources.remove();}if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {value = null;}if (value != null && logger.isTraceEnabled()) {        Thread.currentThread().getName() + "]");}return value;}/<object>/<code> 

看了第七條參考資料中的文章,結合代碼理解了事務掛起的操作:移除當前線程、數據源活動事務對象的一個過程

那它是如何實現事務掛起的呢,答案是在 doSuspend() 方法中的 txObject.setConnectionHolder(null),將 connectionHolder 設置為 null。

一個 connectionHolder 表示一個數據庫連接對象,如果它為 null,表示在下次需要使用時,得從緩存池中獲取一個連接,新連接的自動提交是 true。


PROPAGATION_REQUIRES_NEW

表示當前方法必須在它自己的事務裡運行,一個新的事務將被啟動,而如果有一個事務正在運行的話,則這個方法運行期間被掛起。

<code>SuspendedResourcesHolder suspendedResources = suspend(transaction);try {boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);// 新事務的建立doBegin(transaction, definition);prepareSynchronization(status, definition);return status;}catch (RuntimeException | Error beginEx) {resumeAfterBeginException(transaction, suspendedResources, beginEx);throw beginEx;}/<code>

與前一個方法相同的是,在 PROPAGATION_REQUIRES_NEW 廣播特性下,也會使用 suspend 方法將原事務掛起。方法 doBegin(),是事務開啟的核心。


PROPAGATION_NESTED

表示如果當前正有一個事務在運行中,則該方法應該運行在一個嵌套的事務中,被嵌套的事務可以獨立於封裝事務進行提交或者回滾,如果封裝事務不存在,行為就像 PROPAGATION_REQUIRES_NEW。

在代理處理上,有兩個分支,與 PROPAGATION_REQUIRES_NEW 相似的不貼出來,講下使用 savepoint 保存點的方式事務處理:

<code>if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {// 嵌入式事務的處理if (useSavepointForNestedTransaction()) {DefaultTransactionStatus status =prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);// 創建 savepointstatus.createAndHoldSavepoint();return status;}}/<code>

學習過數據庫的朋友應該清楚 savepoint,可以利用保存點回滾部分事務,從而使事務處理更加靈活和精細。跟蹤代碼,發現創建保存點調用的方法是 org.hsqldb.jdbc.JDBCConnection#setSavepoint(java.lang.String),感興趣的可以往下繼續深入學習~


事務創建

其實在前面方法中,都出現過這個方法 doBegin(),在這個方法中創建事務,順便設置數據庫的隔離級別、timeout 屬性和設置 connectionHolder:

DataSourceTransactionManager#doBegin


<code>protected void doBegin(Object transaction, TransactionDefinition definition) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;Connection con = null;if (!txObject.hasConnectionHolder() ||txObject.getConnectionHolder().isSynchronizedWithTransaction()) {Connection newCon = obtainDataSource().getConnection();txObject.setConnectionHolder(new ConnectionHolder(newCon), true);}txObject.getConnectionHolder().setSynchronizedWithTransaction(true);con = txObject.getConnectionHolder().getConnection();// 設置隔離級別Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);txObject.setPreviousIsolationLevel(previousIsolationLevel);// configured the connection pool to set it already).// 更改自動提交設置,由 spring 進行控制if (con.getAutoCommit()) {txObject.setMustRestoreAutoCommit(true);con.setAutoCommit(false);}// 準備事務連接prepareTransactionalConnection(con, definition);// 設置判斷當前線程是否存在事務的依據txObject.getConnectionHolder().setTransactionActive(true);int timeout = determineTimeout(definition);if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {txObject.getConnectionHolder().setTimeoutInSeconds(timeout);}// Bind the connection holder to the thread.if (txObject.isNewConnectionHolder()) {// 將當前獲取到的連接綁定到當前線程TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());}}}/<code> 

結論:Spring 事務的開啟,就是將數據庫自動提交屬性設置為 false


小結

在聲明式的事務處理中,主要有以下幾個處理步驟:

  1. 獲取事務的屬性:tas.getTransactionAttribute(method, targetClass)
  2. 加載配置中配置的 TransactionManager:determineTransactionManager(txAttr);
  3. 不同的事務處理方式使用不同的邏輯:關於聲明式事務和編程式事務,可以查看這篇文章-Spring編程式和聲明式事務實例講解
  4. 在目標方法執行前獲取事務並收集事務信息:createTransactionIfNecessary(tm, txAttr, joinpointIdentification)
  5. 執行目標方法:invocation.proceed()
  6. 出現異常,嘗試異常處理:completeTransactionAfterThrowing(txInfo, ex);
  7. 提交事務前的事務信息消除
    :cleanupTransactionInfo(txInfo)
  8. 提交事務:commitTransactionAfterReturning(txInfo)

事務回滾 & 提交

這兩步操作,主要調用了底層數據庫連接的 API,所以沒有細說。


總結

本篇文章簡單記錄瞭如何使用 Spring 的事務,以及在代碼中如何實現。

在之前的使用場景中,只用到了默認配置的聲明式事務 @Transactional,不瞭解其它屬性設置的含義,也不知道在默認配置下,如果是同一個類中的方法自調用是不支持事務。

所以,經過這一次學習和總結,在下一次使用時,就能夠知道不同屬性設置能解決什麼問題,例如修改廣播特性 PROPAGATION,讓事務支持方法自調用,還有設置事務超時時間 timeout、隔離級別等屬性。


分享到:


相關文章: