您好,登錄后才能下訂單哦!
首先我們來簡要看下分布式事務處理的XA規范
可知XA規范中分布式事務有AP,RM,TM組成:
其中應用程序(Application Program ,簡稱AP):AP定義事務邊界(定義事務開始和結束)并訪問事務邊界內的資源。
資源管理器(Resource Manager,簡稱RM):Rm管理計算機共享的資源,許多軟件都可以去訪問這些資源,資源包含比如數據庫、文件系統、打印機服務器等。
Xa主要規定了RM與TM之間的交互,下面來看下XA規范中定義的RM 和 TM交互的接口:
本圖來著 參考文章XA規范25頁
xa_start負責開啟或者恢復一個事務分支,并且管理XID到調用線程
xa_end 負責取消當前線程與事務分支的關聯
xa_prepare負責詢問RM 是否準備好了提交事務分支
xa_commit通知RM提交事務分支
XA協議是使用了二階段協議的,其中:
第一階段TM要求所有的RM準備提交對應的事務分支,詢問RM是否有能力保證成功的提交事務分支,RM根據自己的情況,如果判斷自己進行的工作可以被提交,那就就對工作內容進行持久化,并給TM回執OK;否者給TM的回執NO。RM在發送了否定答復并回滾了已經的工作后,就可以丟棄這個事務分支信息了。
也就是TM與RM之間是通過兩階段提交協議進行交互的。
MYSQL的數據庫存儲引擎InnoDB的事務特性能夠保證在存儲引擎級別實現ACID,而分布式事務讓存儲引擎級別的事務擴展到數據庫層面,甚至擴展到多個數據庫之間,這是通過兩階段提交協議來實現的,MySQL 5.0或者更新版本開始支持XA事務,從下圖可知MySQL中只有InnoDB引擎支持XA協議:
Mysql中存在兩種XA事務,一種是內部XA事務主要用來協調存儲引擎和二進制日志,一種是外部事務可以參與到外部分布式事務中(比如多個數據庫實現的分布式事務),本節我們主要討論外部事務。
在MySQL數據庫分布式事務中,MySQL是XA事務過程中的資源管理器(RM)存在的,TM是連接MySQL服務器的客戶端。MySQL數據庫是作為RM存在的,在分布式事務中一般會涉及到至少兩個RM,所以我們說的MySQL支持XA協議是說mysql作為RM來說的,也就是說MySQL實現了XA協議中RM應該具有的功能;需要注意的是MySQL中只有當隔離級別為Serializable時候才能使用分布式事務,所以需要使用set global tx_isolation='serializable',session tx_isolation='serializable';
設置數據庫隔離級別(具體可以參考本地事務)。
下面我們來看看在MySQL數據庫單個節點運行XA事務,首先來看下MySQL下xa事務語法:
其中xid是一個全局唯一的id標示一個分支事務,每個分支事務有自己的全局唯一的一個id,是一個字符串。
然后確認下mysql是否啟動了xa功能:
可知啟動了,下面具體看一個實例:
其中首先使用XA START ‘xid' 啟動了一個XA事務,并把它置于ACTIVE狀態
對于一個ACTIVE狀態的 XA事務,我們可以執行構成事務的多條SQL語句,也就是指定分支事務的邊界,然后執行一個XA END ‘xid'語句,XA END把事務放入IDLE狀態,也就是結束事務邊界,在xa start和xa end之間的語句就構成了本分支事務的一個事務范圍。當調用xa end 'xid1'后由于結束了事務邊界,所以這時候如何在執行sql語句會拋出ERROR 1399 (XAE07): XAER_RMFAIL: The command cannot be executed when global transaction is in the IDLE state錯誤,也就是當分支事務處于IDLE狀態時候不允許執行沒有包含到分支事務邊界里面的其他sql.
對于一個IDLE 狀態XA事務,可以執行一個XA PREPARE語句或一個XA COMMIT…ONE PHASE語句,其中XA PREPARE把事務放入PREPARED狀態。在此點上的XA RECOVER語句將在其輸出中包括事務的xid值,因為XA RECOVER會列出處于PREPARED狀態的所有XA事務。XA COMMIT…ONE PHASE用于預備和提交事務,也就是轉換為一階段協議,直接提交事務。
其中二階段協議中第一階段是執行 xa prepare時候,這時候MySQL客戶端(TM)向MySQL數據庫服務器(RM)發出prepare"準備提交"請求,數據庫收到請求后執行數據修改和日志記錄等處理,處理完成后只是把事務的狀態改成"可以提交",然后把結果返回給事務管理器。
如果第一階段中數據庫都prepare成功,那么mysql客戶端(TM)向數據庫服務器發出"commit"請求,數據庫服務器把事務的"可以提交"狀態改為"提交完成"狀態,然后返回應答。如果在第一階段內數據庫的操作發生了錯誤,或者mysql客戶端(RM)收不到數據庫的回應,則認為事務失敗,執行rollback回撤所有數據庫的事務。
上面例子是在一個數據庫節點上運行的一個分支事務,演示了單個數據庫上執行xa分支事務的流程,但是通常都是使用編程語言,比如Java的 JTA來完成MySQL的分布式事務的,下面一個例子用來演示:
首先添加依賴
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>6.0.6</version>
</dependency>
代碼:
public class XaDemo {
public static MysqlXADataSource getDataSource(String connStr, String user, String pwd) {
try {
MysqlXADataSource ds = new MysqlXADataSource();
ds.setUrl(connStr);
ds.setUser(user);
ds.setPassword(pwd);
return ds;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] arg) {
String connStr1 = "jdbc:mysql://192.168.0.1:3306/test";
String connStr2 = "jdbc:mysql://192.168.0.2:3306/test";
try {
//從不同數據庫獲取數據庫數據源
MysqlXADataSource ds1 = getDataSource(connStr1, "root", "123456");
MysqlXADataSource ds2 = getDataSource(connStr2, "root", "123456");
//數據庫1獲取連接
XAConnection xaConnection1 = ds1.getXAConnection();
XAResource xaResource1 = xaConnection1.getXAResource();
Connection connection1 = xaConnection1.getConnection();
Statement statement1 = connection1.createStatement();
//數據庫2獲取連接
XAConnection xaConnection2 = ds2.getXAConnection();
XAResource xaResource2 = xaConnection2.getXAResource();
Connection connection2 = xaConnection2.getConnection();
Statement statement2 = connection2.createStatement();
//創建事務分支的xid
Xid xid1 = new MysqlXid(new byte[] { 0x01 }, new byte[] { 0x02 }, 100);
Xid xid2 = new MysqlXid(new byte[] { 0x011 }, new byte[] { 0x012 }, 100);
try {
//事務分支1關聯分支事務sql語句
xaResource1.start(xid1, XAResource.TMNOFLAGS);
int update1Result = statement1.executeUpdate("update account_from set money=money - 50 where id=1");
xaResource1.end(xid1, XAResource.TMSUCCESS);
//事務分支2關聯分支事務sql語句
xaResource2.start(xid2, XAResource.TMNOFLAGS);
int update2Result = statement2.executeUpdate("update account_to set money= money + 50 where id=1");
xaResource2.end(xid2, XAResource.TMSUCCESS);
// 兩階段提交協議第一階段
int ret1 = xaResource1.prepare(xid1);
int ret2 = xaResource2.prepare(xid2);
// 兩階段提交協議第二階段
if (XAResource.XA_OK == ret1 && XAResource.XA_OK == ret2) {
xaResource1.commit(xid1, false);
xaResource2.commit(xid2, false);
System.out.println("reslut1:" + update1Result + ", result2:" + update2Result);
}
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
如上代碼對兩個機器上的數據庫進行轉賬操作。
更多本地事務咨詢可以單擊我
更多分布式事務咨詢可以單擊我
想了解更多關于粘包半包問題單擊我
更多關于分布式系統中服務降級策略的知識可以單擊 單擊我
想系統學dubbo的單擊我
想學并發的童鞋可以 單擊我
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。