您好,登錄后才能下訂單哦!
這篇文章主要介紹了JPA多數據源分布式事務的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
在解決mysql字段脫敏處理時,結合sharding-jdbc的脫敏組件功能,為了sql兼容和最小化應用改造,博主給出了一個多數據源融合的字段脫敏解決方案(只把包含脫敏字段表的操作走sharding-jdbc脫敏代理數據源)。這個方案解決了問題的同時,帶來了一個新的問題,數據源的事務是獨立的,正如我文中所述《JPA項目多數據源模式整合sharding-jdbc實現數據脫敏》,在spring上下文中,每個數據源對應一個獨立的事務管理器,默認的事務管理器的數據源就用業務本身的數據源,所以需要加密的業務使用時,需要指定@Transactional注解里的事務管理器名稱為脫敏對應的事務管理器名稱。簡單的業務場景這樣用也就沒有問題了,但是一般的業務場景總有一個事務覆蓋兩個數據源的操作,這個時候單指定哪個事務管理器都不行,so,這里需要一種多數據源的事務管理器。
XA協議采用2PC(兩階段提交)的方式來管理分布式事務。XA接口提供資源管理器與事務管理器之間進行通信的標準接口。在JDBC的XA事務相關api抽象里,相關接口定義如下
public interface XADataSource extends CommonDataSource { /** * 嘗試建立物理數據庫連接,使用給定的用戶名和密碼。返回的連接可以在分布式事務中使用 */ XAConnection getXAConnection() throws SQLException; //省略getLogWriter等非關鍵方法 }
public interface XAConnection extends PooledConnection { /** * 檢索一個{@code XAResource}對象,事務管理器將使用該對象管理該{@code XAConnection}對象在分布式事務中的事務行為 */ javax.transaction.xa.XAResource getXAResource() throws SQLException; }
public interface XAResource { /** * 提交xid指定的全局事務 */ void commit(Xid xid, boolean onePhase) throws XAException; /** * 結束代表事務分支執行的工作。資源管理器從指定的事務分支中分離XA資源,并讓事務完成。 */ void end(Xid xid, int flags) throws XAException; /** * 通知事務管理器忽略此xid事務分支 */ void forget(Xid xid) throws XAException; /** * 判斷是否同一個資源管理器 */ boolean isSameRM(XAResource xares) throws XAException; /** * 指定xid事務準備階段 */ int prepare(Xid xid) throws XAException; /** * 從資源管理器獲取準備好的事務分支的列表。事務管理器在恢復期間調用此方法, * 以獲取當前處于準備狀態或初步完成狀態的事務分支的列表。 */ Xid[] recover(int flag) throws XAException; /** * 通知資源管理器回滾代表事務分支完成的工作。 */ void rollback(Xid xid) throws XAException; /** * 代表xid中指定的事務分支開始工作。 */ void start(Xid xid, int flags) throws XAException; //省略非關鍵方法 }
相比較普通的事務管理,JDBC的XA協議管理多了一個XAResource資源管理器,XA事務相關的行為(開啟、準備、提交、回滾、結束)都由這個資源管理器來控制,這些都是框架內部的行為,體現在開發層面提供的數據源也變成了XADataSource。而JTA的抽象里,定義了UserTransaction、TransactionManager。想要使用JTA事務,必須先實現這兩個接口。所以,如果我們要使用JTA+XA控制多數據源的事務,在sprign boot里以Atomikos為例,
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency>
spring boot已經幫我們把XA事務管理器自動裝載類定義好了,如:
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties({ AtomikosProperties.class, JtaProperties.class }) @ConditionalOnClass({ JtaTransactionManager.class, UserTransactionManager.class }) @ConditionalOnMissingBean(PlatformTransactionManager.class) class AtomikosJtaConfiguration { @Bean(initMethod = "init", destroyMethod = "shutdownWait") @ConditionalOnMissingBean(UserTransactionService.class) UserTransactionServiceImp userTransactionService(AtomikosProperties atomikosProperties, JtaProperties jtaProperties) { Properties properties = new Properties(); if (StringUtils.hasText(jtaProperties.getTransactionManagerId())) { properties.setProperty("com.atomikos.icatch.tm_unique_name", jtaProperties.getTransactionManagerId()); } properties.setProperty("com.atomikos.icatch.log_base_dir", getLogBaseDir(jtaProperties)); properties.putAll(atomikosProperties.asProperties()); return new UserTransactionServiceImp(properties); } @Bean(initMethod = "init", destroyMethod = "close") @ConditionalOnMissingBean(TransactionManager.class) UserTransactionManager atomikosTransactionManager(UserTransactionService userTransactionService) throws Exception { UserTransactionManager manager = new UserTransactionManager(); manager.setStartupTransactionService(false); manager.setForceShutdown(true); return manager; } @Bean @ConditionalOnMissingBean(XADataSourceWrapper.class) AtomikosXADataSourceWrapper xaDataSourceWrapper() { return new AtomikosXADataSourceWrapper(); } @Bean JtaTransactionManager transactionManager(UserTransaction userTransaction, TransactionManager transactionManager, ObjectProvidertransactionManagerCustomizers) { JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(userTransaction, transactionManager); transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(jtaTransactionManager)); return jtaTransactionManager; } }
顯然,想要使用XA事務,除了需要提供UserTransaction、TransactionManager的實現。還必須要有一個XADataSource,而sharding-jdbc代理的數據源是DataSource的,我們需要將XADataSource包裝成普通的DataSource,spring已經提供了一個AtomikosXADataSourceWrapper的XA數據源包裝器,而且在AtomikosJtaConfiguration里已經注冊到Spring上下文中,所以我們在自定義數據源時可以直接注入包裝器實例,然后,因為是JPA環境,所以在創建EntityManagerFactory實例時,需要指定JPA的事務管理類型為JTA,綜上,普通的業務默認數據源配置如下:
/** * @author: kl @kailing.pub * @date: 2020/5/18 */ @Configuration @EnableConfigurationProperties({JpaProperties.class, DataSourceProperties.class}) public class DataSourceConfiguration{ @Primary @Bean public DataSource dataSource(AtomikosXADataSourceWrapper wrapper, DataSourceProperties dataSourceProperties) throws Exception { MysqlXADataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(MysqlXADataSource.class).build(); return wrapper.wrapDataSource(dataSource); } @Primary @Bean(initMethod = "afterPropertiesSet") public LocalContainerEntityManagerFactoryBean entityManagerFactory(JpaProperties jpaProperties, DataSource dataSource, EntityManagerFactoryBuilder factoryBuilder) { return factoryBuilder.dataSource(dataSource) .packages(Constants.BASE_PACKAGES) .properties(jpaProperties.getProperties()) .persistenceUnit("default") .jta(true) .build(); } @Bean @Primary public EntityManager entityManager(EntityManagerFactory entityManagerFactory){ //必須使用SharedEntityManagerCreator創建SharedEntityManager實例,否則SimpleJpaRepository中的事務不生效 return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory); } }
sharding-jdbc加密數據源和普通業務數據源其實是同一個數據源,只是走加解密邏輯的數據源需要被sharding-jdbc的加密組件代理一層,加上了加解密的處理邏輯。所以配置如下:
/** * @author: kl @kailing.pub * @date: 2020/5/18 */ @Configuration @EnableConfigurationProperties({JpaProperties.class,SpringBootEncryptRuleConfigurationProperties.class, SpringBootPropertiesConfigurationProperties.class}) public class EncryptDataSourceConfiguration { @Bean public DataSource encryptDataSource(DataSource dataSource,SpringBootPropertiesConfigurationProperties props,SpringBootEncryptRuleConfigurationProperties encryptRule) throws SQLException { return EncryptDataSourceFactory.createDataSource(dataSource, new EncryptRuleConfigurationYamlSwapper().swap(encryptRule), props.getProps()); } @Bean(initMethod = "afterPropertiesSet") public LocalContainerEntityManagerFactoryBean encryptEntityManagerFactory(@Qualifier("encryptDataSource") DataSource dataSource,JpaProperties jpaProperties, EntityManagerFactoryBuilder factoryBuilder) throws SQLException { return factoryBuilder.dataSource(dataSource) .packages(Constants.BASE_PACKAGES) .properties(jpaProperties.getProperties()) .persistenceUnit("encryptPersistenceUnit") .jta(true) .build(); } @Bean public EntityManager encryptEntityManager(@Qualifier("encryptEntityManagerFactory") EntityManagerFactory entityManagerFactory){ //必須使用SharedEntityManagerCreator創建SharedEntityManager實例,否則SimpleJpaRepository中的事務不生效 return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory); } }
遇到問題1、
Connection pool exhausted - try increasing 'maxPoolSize' and/or 'borrowConnectionTimeout' on the DataSourceBean.
解決問題:默認AtomikosXADataSourceWrapper包裝器初始化的數據源連接池最大為1,所以需要添加配置參數如:
spring.jta.atomikos.datasource.max-pool-size=20
遇到問題2、
XAER_INVAL: Invalid arguments (or unsupported command)
解決問題:這個是mysql實現XA的bug,僅當您在同一事務中多次訪問同一MySQL數據庫時,才會發生此問題,在mysql連接url加上如下參數即可,如:
spring.datasource.url = jdbc:mysql://127.0.0.1:3306/xxx?pinGlobalTxToPhysicalConnection=true
在這個場景中,雖然是多數據源,但是底層鏈接的是同一個mysql數據庫,所以XA事務行為為,從第一個執行的sql開始(并不是JTA事務begin階段),生成xid并XA START事務,然后XA END。第二個數據源的sql執行時會判斷是否同一個mysql資源,如果是同一個則用剛生成的xid重新XA START RESUME,然后XA END,最終雖然在應用層是兩個DataSource,其實最后只會調用XA COMMIT一次。mysql驅動實現的XAResource的start如下:
public void start(Xid xid, int flags) throws XAException { StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH); commandBuf.append("XA START "); appendXid(commandBuf, xid); switch (flags) { case TMJOIN: commandBuf.append(" JOIN"); break; case TMRESUME: commandBuf.append(" RESUME"); break; case TMNOFLAGS: // no-op break; default: throw new XAException(XAException.XAER_INVAL); } dispatchCommand(commandBuf.toString()); this.underlyingConnection.setInGlobalTx(true); }
第一次sql執行時,flags=0,走的TMNOFLAGS邏輯,第二次sql執行時,flags=134217728,走的TMRESUME,重新開啟事務的邏輯。以上是Mysql XA的真實事務邏輯,但是博主研究下來發現,msyql xa并不支持XA START RESUME這種語句,而且有很多限制《Mysql XA交易限制》,所以在mysql數據庫使用XA事務時,最好了解下mysql xa的缺陷
鏈式事務不是我首創的叫法,在spring-data-common項目的Transaction包下,已經有一個默認實現ChainedTransactionManager,前文中《深入理解spring的@Transactional工作原理》已經分析了Spring的事務抽象,由PlatformTransactionManager(事務管理器)、TransactionStatus(事務狀態)、TransactionDefinition(事務定義)等形態組成,ChainedTransactionManager也是實現了PlatformTransactionManager和TransactionStatus。實現原理也很簡單,在ChainedTransactionManager內部維護了事務管理器的集合,通過代理編排真實的事務管理器,在事務開啟、提交、回滾時,都分別操作集合里的事務。以達到對多個事務的統一管理。這個方案比較簡陋,而且有缺陷,在提交階段,如果異常不是發生在第一個數據源,那么會存在之前的提交不會回滾,所以在使用ChainedTransactionManager時,盡量把出問題可能性比較大的事務管理器放鏈的后面(開啟事務、提交事務順序相反)。這里只是拋出了一種新的多數據源事務管理的思路,能用XA盡量用XA管理。
普通的業務默認數據源配置如下:
/** * @author: kl @kailing.pub * @date: 2020/5/18 */ @Configuration @EnableConfigurationProperties({JpaProperties.class, DataSourceProperties.class}) public class DataSourceConfiguration{ @Primary @Bean public DataSource dataSource(DataSourceProperties dataSourceProperties){ return dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build(); } @Primary @Bean(initMethod = "afterPropertiesSet") public LocalContainerEntityManagerFactoryBean entityManagerFactory(JpaProperties jpaProperties, DataSource dataSource, EntityManagerFactoryBuilder factoryBuilder) { return factoryBuilder.dataSource(dataSource) .packages(Constants.BASE_PACKAGES) .properties(jpaProperties.getProperties()) .persistenceUnit("default") .build(); } @Bean @Primary public EntityManager entityManager(EntityManagerFactory entityManagerFactory){ //必須使用SharedEntityManagerCreator創建SharedEntityManager實例,否則SimpleJpaRepository中的事務不生效 return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory); } @Primary @Bean public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory){ JpaTransactionManager txManager = new JpaTransactionManager(); txManager.setEntityManagerFactory(entityManagerFactory); return txManager; } }
sharding-jdbc加密數據源配置如下:
/** * @author: kl @kailing.pub * @date: 2020/5/18 */ @Configuration @EnableConfigurationProperties({JpaProperties.class,SpringBootEncryptRuleConfigurationProperties.class, SpringBootPropertiesConfigurationProperties.class}) public class EncryptDataSourceConfiguration { @Bean public DataSource encryptDataSource(DataSource dataSource,SpringBootPropertiesConfigurationProperties props,SpringBootEncryptRuleConfigurationProperties encryptRule) throws SQLException { return EncryptDataSourceFactory.createDataSource(dataSource, new EncryptRuleConfigurationYamlSwapper().swap(encryptRule), props.getProps()); } @Bean(initMethod = "afterPropertiesSet") public LocalContainerEntityManagerFactoryBean encryptEntityManagerFactory(@Qualifier("encryptDataSource") DataSource dataSource,JpaProperties jpaProperties, EntityManagerFactoryBuilder factoryBuilder) throws SQLException { return factoryBuilder.dataSource(dataSource) .packages(Constants.BASE_PACKAGES) .properties(jpaProperties.getProperties()) .persistenceUnit("encryptPersistenceUnit") .build(); } @Bean public EntityManager encryptEntityManager(@Qualifier("encryptEntityManagerFactory") EntityManagerFactory entityManagerFactory){ //必須使用SharedEntityManagerCreator創建SharedEntityManager實例,否則SimpleJpaRepository中的事務不生效 return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory); } @Bean public PlatformTransactionManager chainedTransactionManager(PlatformTransactionManager transactionManager) throws SQLException { JpaTransactionManager encryptTransactionManager = new JpaTransactionManager(); encryptTransactionManager.setEntityManagerFactory(encryptEntityManagerFactory()); //使用鏈式事務管理器包裝真正的transactionManager、txManager事務 ChainedTransactionManager chainedTransactionManager = new ChainedTransactionManager(encryptTransactionManager,transactionManager); return chainedTransactionManager; } }
使用這種方案,在涉及到多數據源的業務時,需要指定使用哪個事務管理器,如:
@PersistenceContext(unitName = "encryptPersistenceUnit") private EntityManager entityManager; @PersistenceContext private EntityManager manager; @Transactional(transactionManager = "chainedTransactionManager") public AccountModel save(AccountDTO dto){ AccountModel accountModel = AccountMapper.INSTANCE.dtoTo(dto); entityManager.persist(accountModel); entityManager.flush(); AccountModel accountMode2 = AccountMapper.INSTANCE.dtoTo(dto); manager.persist(accountMode2); manager.flush(); return accountModel; }
感謝你能夠認真閱讀完這篇文章,希望小編分享的“JPA多數據源分布式事務的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。