您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關Golang中怎么利用Redis實現TCC分布式事務,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
對于使用者而言這種部分成功部分失敗的情況非常難以處理,所以我們需要保證 MSET 操作要么全部成功要么全部失敗。
MSET 命令在集群模式下的問題#
于是問題來了 DEL、MSET 等命令所涉及的 key 可能分布在不同的節點中,在集群模式下實現這類涉及多個 key 的命令最簡單的方式當然是 For-Each 遍歷 key 并向它們所在的節點發送相應的操作指令。 以 MGET 命令的實現為例:
func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) < 2 {
return reply.MakeErrReply("ERR wrong number of arguments for 'mget' command")
}
// 從參數列表中取出要讀取的 key
keys := make([]string, len(args)-1)
for i := 1; i < len(args); i++ {
keys[i-1] = string(args[i])
}
resultMap := make(map[string][]byte)
// 計算每個 key 所在的節點,并按照節點分組
groupMap := cluster.groupBy(keys)
// groupMap 的類型為 map[string][]string,key 是節點的地址,value 是 keys 中屬于該節點的 key 列表
for peer, group := range groupMap {
// 向每個節點發送 mget 指令,讀取分布在它上面的 key
resp := cluster.Relay(peer, c, makeArgs("MGET", group...))
if reply.IsErrorReply(resp) {
errReply := resp.(reply.ErrorReply)
return reply.MakeErrReply(fmt.Sprintf("ERR during get %s occurs: %v", group[0], errReply.Error()))
}
arrReply, _ := resp.(*reply.MultiBulkReply)
// 將每個節點上的結果 merge 到 map 中
for i, v := range arrReply.Args {
key := group[i]
resultMap[key] = v
}
}
result := make([][]byte, len(keys))
for i, k := range keys {
result[i] = resultMap[k]
}
return reply.MakeMultiBulkReply(result)
}
// 計算 key 所屬的節點,并按節點分組
func (cluster *Cluster) groupBy(keys []string) map[string][]string {
result := make(map[string][]string)
for _, key := range keys {
// 使用一致性 hash 計算所屬節點
peer := cluster.peerPicker.Get(key)
// 將 key 加入到相應節點的分組中
group, ok := result[peer]
if !ok {
group = make([]string, 0)
}
group = append(group, key)
result[peer] = group
}
return result
}
那么 MSET 命令的實現能否如法炮制呢?答案是否定的。在上面的代碼中我們注意到,在向各個節點發送指令時若某個節點讀取失敗則會直接退出整個 MGET 執行過程。
若在執行 MSET 指令時遇到部分節點失敗或超時,則會出現部分 key 設置成功而另一份設置失敗的情況。對于緩存使用者而言這種部分成功部分失敗的情況非常難以處理,所以我們需要保證 MSET 操作要么全部成功要么全部失敗。
兩階段提交#
兩階段提交(2-Phase Commit, 2PC)算法是解決我們遇到的一致性問題最簡單的算法。在 2PC 算法中寫操作被分為兩個階段來執行:
Prepare 階段
協調者向所有參與者發送事務內容,詢問是否可以執行事務操作。在 Godis 中收到客戶端 MSET 命令的節點是事務的協調者,所有持有相關 key 的節點都要參與事務。
各參與者鎖定事務相關 key 防止被其它操作修改。各參與者寫 undo log 準備在事務失敗后進行回滾。
參與者回復協調者可以提交。若協調者收到所有參與者的YES回復,則準備進行事務提交。若有參與者回復NO或者超時,則準備回滾事務
Commit 階段
協調者向所有參與者發送提交請求
參與者正式提交事務,并在完成后釋放相關 key 的鎖。
參與者協調者回復ACK,協調者收到所有參與者的ACK后認為事務提交成功。
Rollback 階段
在事務請求階段若有參與者回復NO或者超時,協調者向所有參與者發出回滾請求
各參與者執行事務回滾,并在完成后釋放相關資源。
參與者協調者回復ACK,協調者收到所有參與者的ACK后認為事務回滾成功。
2PC是一種簡單的一致性協議,它存在一些問題:
單點服務: 若協調者突然崩潰則事務流程無法繼續進行或者造成狀態不一致
無法保證一致性: 若協調者第二階段發送提交請求時崩潰,可能部分參與者受到COMMIT請求提交了事務,而另一部分參與者未受到請求而放棄事務造成不一致現象。
阻塞: 為了保證事務完成提交,各參與者在完成第一階段事務執行后必須鎖定相關資源直到正式提交,影響系統的吞吐量。
首先我們定義事務的描述結構:
type Transaction struct {
id string // 事務 ID, 由 snowflake 算法生成
args [][]byte // 命令參數
cluster *Cluster
conn redis.Connection
keys []string // 事務中涉及的 key
undoLog map[string][]byte // 每個 key 在事務執行前的值,用于回滾事務
}
Prepare 階段#
先看事務參與者 prepare 階段的操作:
// prepare 命令的格式是: PrepareMSet TxID key1, key2 ...
// TxID 是事務 ID,由協調者決定
func PrepareMSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) < 3 {
return reply.MakeErrReply("ERR wrong number of arguments for 'preparemset' command")
}
txId := string(args[1])
size := (len(args) - 2) / 2
keys := make([]string, size)
for i := 0; i < size; i++ {
keys[i] = string(args[2*i+2])
}
txArgs := [][]byte{
[]byte("MSet"),
} // actual args for cluster.db
txArgs = append(txArgs, args[2:]...)
tx := NewTransaction(cluster, c, txId, txArgs, keys) // 創建新事務
cluster.transactions.Put(txId, tx) // 存儲到節點的事務列表中
err := tx.prepare() // 準備事務
if err != nil {
return reply.MakeErrReply(err.Error())
}
return &reply.OkReply{}
}
實際的準備操作在 tx.prepare() 中:
func (tx *Transaction) prepare() error {
// 鎖定相關 key
tx.cluster.db.Locks(tx.keys...)
// 準備 undo log
tx.undoLog = make(map[string][]byte)
for _, key := range tx.keys {
entity, ok := tx.cluster.db.Get(key)
if ok {
blob, err := gob.Marshal(entity) // 將修改之前的狀態序列化之后存儲作為 undo log
if err != nil {
return err
}
tx.undoLog[key] = blob
} else {
// 若事務執行前 key 是空的,在回滾時應刪除它
tx.undoLog[key] = []byte{}
}
}
tx.status = PreparedStatus
return nil
}
看看協調者在做什么:
func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
// 解析參數
argCount := len(args) - 1
if argCount%2 != 0 || argCount < 1 {
return reply.MakeErrReply("ERR wrong number of arguments for 'mset' command")
}
size := argCount / 2
keys := make([]string, size)
valueMap := make(map[string]string)
for i := 0; i < size; i++ {
keys[i] = string(args[2*i+1])
valueMap[keys[i]] = string(args[2*i+2])
}
// 找到所屬的節點
groupMap := cluster.groupBy(keys)
if len(groupMap) == 1 { // do fast
// 若所有的 key 都在同一個節點直接執行,不使用較慢的 2pc 算法
for peer := range groupMap {
return cluster.Relay(peer, c, args)
}
}
// 開始準備階段
var errReply redis.Reply
txId := cluster.idGenerator.NextId() // 使用 snowflake 算法決定事務 ID
txIdStr := strconv.FormatInt(txId, 10)
rollback := false
// 向所有參與者發送 prepare 請求
for peer, group := range groupMap {
peerArgs := []string{txIdStr}
for _, k := range group {
peerArgs = append(peerArgs, k, valueMap[k])
}
var resp redis.Reply
if peer == cluster.self {
resp = PrepareMSet(cluster, c, makeArgs("PrepareMSet", peerArgs...))
} else {
resp = cluster.Relay(peer, c, makeArgs("PrepareMSet", peerArgs...))
}
if reply.IsErrorReply(resp) {
errReply = resp
rollback = true
break
}
}
if rollback {
// 若 prepare 過程出錯則執行回滾
RequestRollback(cluster, c, txId, groupMap)
} else {
_, errReply = RequestCommit(cluster, c, txId, groupMap)
rollback = errReply != nil
}
if !rollback {
return &reply.OkReply{}
}
return errReply
}
Commit 階段#
事務參與者提交本地事務:
func Commit(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) != 2 {
return reply.MakeErrReply("ERR wrong number of arguments for 'commit' command")
}
// 讀取事務信息
txId := string(args[1])
raw, ok := cluster.transactions.Get(txId)
if !ok {
return reply.MakeIntReply(0)
}
tx, _ := raw.(*Transaction)
// 在提交成功后解鎖 key
defer func() {
cluster.db.UnLocks(tx.keys...)
tx.status = CommitedStatus
//cluster.transactions.Remove(tx.id) // cannot remove, may rollback after commit
}()
cmd := strings.ToLower(string(tx.args[0]))
var result redis.Reply
if cmd == "del" {
result = CommitDel(cluster, c, tx)
} else if cmd == "mset" {
result = CommitMSet(cluster, c, tx)
}
// 提交失敗
if reply.IsErrorReply(result) { 陽痿早泄前列腺炎醫院哪家好http://www.zztjxb.com/
err2 := tx.rollback()
return reply.MakeErrReply(fmt.Sprintf("err occurs when rollback: %v, origin err: %s", err2, result))
}
return result
}
// 執行操作
func CommitMSet(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply {
size := len(tx.args) / 2
keys := make([]string, size)
values := make([][]byte, size)
for i := 0; i < size; i++ {
keys[i] = string(tx.args[2*i+1])
values[i] = tx.args[2*i+2]鄭州無痛人流醫院哪家好http://www.hnzzxb.com/
}
for i, key := range keys {
value := values[i]
cluster.db.Put(key, &db.DataEntity{Data: value})
}
cluster.db.AddAof(reply.MakeMultiBulkReply(tx.args))
return &reply.OkReply{}
}
協調者的邏輯也很簡單:
func RequestCommit(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) ([]redis.Reply, reply.ErrorReply) {
var errReply reply.ErrorReply
txIdStr := strconv.FormatInt(txId, 10)
respList := make([]redis.Reply, 0, len(peers))
for peer := range peers {
var resp redis.Reply
if peer == cluster.self {
resp = Commit(cluster, c, makeArgs("commit", txIdStr))
} else {
resp = cluster.Relay(peer, c, makeArgs("commit", txIdStr))
}
if reply.IsErrorReply(resp) {
errReply = resp.(reply.ErrorReply)
break
}
respList = append(respList, resp)
}
if errReply != nil {
RequestRollback(cluster, c, txId, peers)
return nil, errReply
}
return respList, nil
}
Rollback#
回滾本地事務:
func Rollback(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) != 2 {
return reply.MakeErrReply("ERR wrong number of arguments for 'rollback' command")
}
txId := string(args[1])
raw, ok := cluster.transactions.Get(txId)
if !ok {
return reply.MakeIntReply(0)
}
tx, _ := raw.(*Transaction)
err := tx.rollback()
if err != nil {
return reply.MakeErrReply(err.Error())
}
return reply.MakeIntReply(1)
}
func (tx *Transaction) rollback() error {
for key, blob := range tx.undoLog {
if len(blob) > 0 {
entity := &db.DataEntity{}
err := gob.UnMarshal(blob, entity) // 反序列化事務前的快照
if err != nil {
return err
}
tx.cluster.db.Put(key, entity) // 寫入事務前的數據
} else {
tx.cluster.db.Remove(key) // 若事務開始之前 key 不存在則將其刪除
}
}
if tx.status != CommitedStatus {
tx.cluster.db.UnLocks(tx.keys...)
}
tx.status = RollbackedStatus
return nil
}
協調者的邏輯與 commit 類似:
func RequestRollback(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) {
txIdStr := strconv.FormatInt(txId, 10)
for peer := range peers {
if peer == cluster.self {
Rollback(cluster, c, makeArgs("rollback", txIdStr))
} else {
cluster.Relay(peer, c, makeArgs("rollback", txIdStr))
}
}
}
上述就是小編為大家分享的Golang中怎么利用Redis實現TCC分布式事務了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。