91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Golang中怎么利用Redis實現TCC分布式事務

發布時間:2021-08-09 14:25:27 來源:億速云 閱讀:154 作者:Leah 欄目:云計算

這期內容當中小編將會給大家帶來有關Golang中怎么利用Redis實現TCC分布式事務,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

  對于使用者而言這種部分成功部分失敗的情況非常難以處理,所以我們需要保證 MSET 操作要么全部成功要么全部失敗。

  MSET 命令在集群模式下的問題#

  于是問題來了 DEL、MSET 等命令所涉及的 key 可能分布在不同的節點中,在集群模式下實現這類涉及多個 key 的命令最簡單的方式當然是 For-Each 遍歷 key 并向它們所在的節點發送相應的操作指令。 以 MGET 命令的實現為例:

  func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {

  if len(args) < 2 {

  return reply.MakeErrReply("ERR wrong number of arguments for 'mget' command")

  }

  // 從參數列表中取出要讀取的 key

  keys := make([]string, len(args)-1)

  for i := 1; i < len(args); i++ {

  keys[i-1] = string(args[i])

  }

  resultMap := make(map[string][]byte)

  // 計算每個 key 所在的節點,并按照節點分組

  groupMap := cluster.groupBy(keys)

  // groupMap 的類型為 map[string][]string,key 是節點的地址,value 是 keys 中屬于該節點的 key 列表

  for peer, group := range groupMap {

  // 向每個節點發送 mget 指令,讀取分布在它上面的 key

  resp := cluster.Relay(peer, c, makeArgs("MGET", group...))

  if reply.IsErrorReply(resp) {

  errReply := resp.(reply.ErrorReply)

  return reply.MakeErrReply(fmt.Sprintf("ERR during get %s occurs: %v", group[0], errReply.Error()))

  }

  arrReply, _ := resp.(*reply.MultiBulkReply)

  // 將每個節點上的結果 merge 到 map 中

  for i, v := range arrReply.Args {

  key := group[i]

  resultMap[key] = v

  }

  }

  result := make([][]byte, len(keys))

  for i, k := range keys {

  result[i] = resultMap[k]

  }

  return reply.MakeMultiBulkReply(result)

  }

  // 計算 key 所屬的節點,并按節點分組

  func (cluster *Cluster) groupBy(keys []string) map[string][]string {

  result := make(map[string][]string)

  for _, key := range keys {

  // 使用一致性 hash 計算所屬節點

  peer := cluster.peerPicker.Get(key)

  // 將 key 加入到相應節點的分組中

  group, ok := result[peer]

  if !ok {

  group = make([]string, 0)

  }

  group = append(group, key)

  result[peer] = group

  }

  return result

  }

  那么 MSET 命令的實現能否如法炮制呢?答案是否定的。在上面的代碼中我們注意到,在向各個節點發送指令時若某個節點讀取失敗則會直接退出整個 MGET 執行過程。

  若在執行 MSET 指令時遇到部分節點失敗或超時,則會出現部分 key 設置成功而另一份設置失敗的情況。對于緩存使用者而言這種部分成功部分失敗的情況非常難以處理,所以我們需要保證 MSET 操作要么全部成功要么全部失敗。

  兩階段提交#

  兩階段提交(2-Phase Commit, 2PC)算法是解決我們遇到的一致性問題最簡單的算法。在 2PC 算法中寫操作被分為兩個階段來執行:

  Prepare 階段

  協調者向所有參與者發送事務內容,詢問是否可以執行事務操作。在 Godis 中收到客戶端 MSET 命令的節點是事務的協調者,所有持有相關 key 的節點都要參與事務。

  各參與者鎖定事務相關 key 防止被其它操作修改。各參與者寫 undo log 準備在事務失敗后進行回滾。

  參與者回復協調者可以提交。若協調者收到所有參與者的YES回復,則準備進行事務提交。若有參與者回復NO或者超時,則準備回滾事務

  Commit 階段

  協調者向所有參與者發送提交請求

  參與者正式提交事務,并在完成后釋放相關 key 的鎖。

  參與者協調者回復ACK,協調者收到所有參與者的ACK后認為事務提交成功。

  Rollback 階段

  在事務請求階段若有參與者回復NO或者超時,協調者向所有參與者發出回滾請求

  各參與者執行事務回滾,并在完成后釋放相關資源。

  參與者協調者回復ACK,協調者收到所有參與者的ACK后認為事務回滾成功。

  2PC是一種簡單的一致性協議,它存在一些問題:

  單點服務: 若協調者突然崩潰則事務流程無法繼續進行或者造成狀態不一致

  無法保證一致性: 若協調者第二階段發送提交請求時崩潰,可能部分參與者受到COMMIT請求提交了事務,而另一部分參與者未受到請求而放棄事務造成不一致現象。

  阻塞: 為了保證事務完成提交,各參與者在完成第一階段事務執行后必須鎖定相關資源直到正式提交,影響系統的吞吐量。

  首先我們定義事務的描述結構:

  type Transaction struct {

  id string // 事務 ID, 由 snowflake 算法生成

  args [][]byte // 命令參數

  cluster *Cluster

  conn redis.Connection

  keys []string // 事務中涉及的 key

  undoLog map[string][]byte // 每個 key 在事務執行前的值,用于回滾事務

  }

  Prepare 階段#

  先看事務參與者 prepare 階段的操作:

  // prepare 命令的格式是: PrepareMSet TxID key1, key2 ...

  // TxID 是事務 ID,由協調者決定

  func PrepareMSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {

  if len(args) < 3 {

  return reply.MakeErrReply("ERR wrong number of arguments for 'preparemset' command")

  }

  txId := string(args[1])

  size := (len(args) - 2) / 2

  keys := make([]string, size)

  for i := 0; i < size; i++ {

  keys[i] = string(args[2*i+2])

  }

  txArgs := [][]byte{

  []byte("MSet"),

  } // actual args for cluster.db

  txArgs = append(txArgs, args[2:]...)

  tx := NewTransaction(cluster, c, txId, txArgs, keys) // 創建新事務

  cluster.transactions.Put(txId, tx) // 存儲到節點的事務列表中

  err := tx.prepare() // 準備事務

  if err != nil {

  return reply.MakeErrReply(err.Error())

  }

  return &reply.OkReply{}

  }

  實際的準備操作在 tx.prepare() 中:

  func (tx *Transaction) prepare() error {

  // 鎖定相關 key

  tx.cluster.db.Locks(tx.keys...)

  // 準備 undo log

  tx.undoLog = make(map[string][]byte)

  for _, key := range tx.keys {

  entity, ok := tx.cluster.db.Get(key)

  if ok {

  blob, err := gob.Marshal(entity) // 將修改之前的狀態序列化之后存儲作為 undo log

  if err != nil {

  return err

  }

  tx.undoLog[key] = blob

  } else {

  // 若事務執行前 key 是空的,在回滾時應刪除它

  tx.undoLog[key] = []byte{}

  }

  }

  tx.status = PreparedStatus

  return nil

  }

  看看協調者在做什么:

  func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {

  // 解析參數

  argCount := len(args) - 1

  if argCount%2 != 0 || argCount < 1 {

  return reply.MakeErrReply("ERR wrong number of arguments for 'mset' command")

  }

  size := argCount / 2

  keys := make([]string, size)

  valueMap := make(map[string]string)

  for i := 0; i < size; i++ {

  keys[i] = string(args[2*i+1])

  valueMap[keys[i]] = string(args[2*i+2])

  }

  // 找到所屬的節點

  groupMap := cluster.groupBy(keys)

  if len(groupMap) == 1 { // do fast

  // 若所有的 key 都在同一個節點直接執行,不使用較慢的 2pc 算法

  for peer := range groupMap {

  return cluster.Relay(peer, c, args)

  }

  }

  // 開始準備階段

  var errReply redis.Reply

  txId := cluster.idGenerator.NextId() // 使用 snowflake 算法決定事務 ID

  txIdStr := strconv.FormatInt(txId, 10)

  rollback := false

  // 向所有參與者發送 prepare 請求

  for peer, group := range groupMap {

  peerArgs := []string{txIdStr}

  for _, k := range group {

  peerArgs = append(peerArgs, k, valueMap[k])

  }

  var resp redis.Reply

  if peer == cluster.self {

  resp = PrepareMSet(cluster, c, makeArgs("PrepareMSet", peerArgs...))

  } else {

  resp = cluster.Relay(peer, c, makeArgs("PrepareMSet", peerArgs...))

  }

  if reply.IsErrorReply(resp) {

  errReply = resp

  rollback = true

  break

  }

  }

  if rollback {

  // 若 prepare 過程出錯則執行回滾

  RequestRollback(cluster, c, txId, groupMap)

  } else {

  _, errReply = RequestCommit(cluster, c, txId, groupMap)

  rollback = errReply != nil

  }

  if !rollback {

  return &reply.OkReply{}

  }

  return errReply

  }

  Commit 階段#

  事務參與者提交本地事務:

  func Commit(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {

  if len(args) != 2 {

  return reply.MakeErrReply("ERR wrong number of arguments for 'commit' command")

  }

  // 讀取事務信息

  txId := string(args[1])

  raw, ok := cluster.transactions.Get(txId)

  if !ok {

  return reply.MakeIntReply(0)

  }

  tx, _ := raw.(*Transaction)

  // 在提交成功后解鎖 key

  defer func() {

  cluster.db.UnLocks(tx.keys...)

  tx.status = CommitedStatus

  //cluster.transactions.Remove(tx.id) // cannot remove, may rollback after commit

  }()

  cmd := strings.ToLower(string(tx.args[0]))

  var result redis.Reply

  if cmd == "del" {

  result = CommitDel(cluster, c, tx)

  } else if cmd == "mset" {

  result = CommitMSet(cluster, c, tx)

  }

  // 提交失敗

  if reply.IsErrorReply(result) {  陽痿早泄前列腺炎醫院哪家好http://www.zztjxb.com/

  err2 := tx.rollback()

  return reply.MakeErrReply(fmt.Sprintf("err occurs when rollback: %v, origin err: %s", err2, result))

  }

  return result

  }

  // 執行操作

  func CommitMSet(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply {

  size := len(tx.args) / 2

  keys := make([]string, size)

  values := make([][]byte, size)

  for i := 0; i < size; i++ {

  keys[i] = string(tx.args[2*i+1])

  values[i] = tx.args[2*i+2]鄭州無痛人流醫院哪家好http://www.hnzzxb.com/

  }

  for i, key := range keys {

  value := values[i]

  cluster.db.Put(key, &db.DataEntity{Data: value})

  }

  cluster.db.AddAof(reply.MakeMultiBulkReply(tx.args))

  return &reply.OkReply{}

  }

  協調者的邏輯也很簡單:

  func RequestCommit(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) ([]redis.Reply, reply.ErrorReply) {

  var errReply reply.ErrorReply

  txIdStr := strconv.FormatInt(txId, 10)

  respList := make([]redis.Reply, 0, len(peers))

  for peer := range peers {

  var resp redis.Reply

  if peer == cluster.self {

  resp = Commit(cluster, c, makeArgs("commit", txIdStr))

  } else {

  resp = cluster.Relay(peer, c, makeArgs("commit", txIdStr))

  }

  if reply.IsErrorReply(resp) {

  errReply = resp.(reply.ErrorReply)

  break

  }

  respList = append(respList, resp)

  }

  if errReply != nil {

  RequestRollback(cluster, c, txId, peers)

  return nil, errReply

  }

  return respList, nil

  }

  Rollback#

  回滾本地事務:

  func Rollback(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {

  if len(args) != 2 {

  return reply.MakeErrReply("ERR wrong number of arguments for 'rollback' command")

  }

  txId := string(args[1])

  raw, ok := cluster.transactions.Get(txId)

  if !ok {

  return reply.MakeIntReply(0)

  }

  tx, _ := raw.(*Transaction)

  err := tx.rollback()

  if err != nil {

  return reply.MakeErrReply(err.Error())

  }

  return reply.MakeIntReply(1)

  }

  func (tx *Transaction) rollback() error {

  for key, blob := range tx.undoLog {

  if len(blob) > 0 {

  entity := &db.DataEntity{}

  err := gob.UnMarshal(blob, entity) // 反序列化事務前的快照

  if err != nil {

  return err

  }

  tx.cluster.db.Put(key, entity) // 寫入事務前的數據

  } else {

  tx.cluster.db.Remove(key) // 若事務開始之前 key 不存在則將其刪除

  }

  }

  if tx.status != CommitedStatus {

  tx.cluster.db.UnLocks(tx.keys...)

  }

  tx.status = RollbackedStatus

  return nil

  }

  協調者的邏輯與 commit 類似:

  func RequestRollback(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) {

  txIdStr := strconv.FormatInt(txId, 10)

  for peer := range peers {

  if peer == cluster.self {

  Rollback(cluster, c, makeArgs("rollback", txIdStr))

  } else {

  cluster.Relay(peer, c, makeArgs("rollback", txIdStr))

  }

  }

  }

上述就是小編為大家分享的Golang中怎么利用Redis實現TCC分布式事務了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

正定县| 梅州市| 惠州市| 专栏| 天峨县| 习水县| 营山县| 平罗县| 丹凤县| 贵州省| 文昌市| 威海市| 南平市| 方山县| 织金县| 利津县| 景宁| 凤台县| 泾川县| 黄山市| 皮山县| 宝应县| 洛川县| 肃宁县| 苏尼特右旗| 博野县| 塔河县| 万山特区| 祥云县| 民勤县| 游戏| 新河县| 辽宁省| 健康| 霍城县| 泸西县| 咸阳市| 合江县| 汝阳县| 依兰县| 西安市|