您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關基于環狀隊列和迭代器如何實現分布式任務RR分配策略,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
# 背景
## 分布式任務分配
在很多運維場景下,我們都會執行一些長時間的任務,比如裝機、部署環境、打包鏡像等長時間任務, 而通常我們的任務節點數量通常是有限的(排除基于k8s的hpa、或者knative等自動伸縮場景)。
那么當我們有一個任務如何根據當前的worker和corrdinator和任務來進行合理的分配,分配其實也比較復雜,往復雜里面做,可以根據當前系統的負載、每個任務的執行資源消耗、當前集群的任務數量等, 這里我們就搞一個最簡單的,基于任務和當前worker的RR算法
## 系統架構
在worker和任務隊列之間,添加一層協調調度層Coordinator, 由它來根據當前集群任務的狀態來進行任務的分配,同時感知當前集群worker和task的狀態,協調整個集群任務的執行、終止等操作
# 單機實現
## 整體設計
members: 表示當前集群中所有的worker
tasks: 就是當前的任務
Coordinator: 就是我們的協調者, 負責根據members和tasks進行任務的分配
result: 就是分配的結果
## CircularIterator
CircularIterator就是我們的環狀對立迭代器, 擁有兩個方法, 一個是add添加member, 一個Next返回基于rr的下一個member
```go
// CircularIterator 環狀迭代器
type CircularIterator struct {
list []interface{} // 保存所有的成員變量
next int
}
// Next 返回下一個元素
func (c *CircularIterator) Next() interface{} {
item := c.list[c.next]
c.next = (c.next + 1) % len(c.list)
return item
}
// Add 添加任務
func (c *CircularIterator) Add(v interface{}) bool {
for _, item := range c.list {
if v == item {
return false
}
}
c.list = append(c.list, v)
return true
}
```
## Member&Task
Member就是負責執行任務的worker, 有一個AddTask方法和Execute方法負責任務的執行和添加任務
Task標識一個任務
```go
// Member 任務組成員
type Member struct {
id int
tasks []*Task
}
// ID 返回當前memberID
func (m *Member) ID() int {
return m.id
}
// AddTask 為member添加任務
func (m *Member) AddTask(t *Task) bool {
for _, task := range m.tasks {
if task == t {
return false
}
}
m.tasks = append(m.tasks, t)
return true
}
// Execute 執行任務
func (m *Member) Execute() {
for _, task := range m.tasks {
fmt.Printf("Member %d run task %s\n", m.ID(), task.Execute())
}
}
// Task 任務
type Task struct {
name string
}
// Execute 執行task返回結果
func (t *Task) Execute() string {
return "Task " + t.name + " run success"
}
```
## Coordinator
Coordinator是協調器,負責根據 Member和task進行集群任務的協調調度
```go
// Task 任務
type Task struct {
name string
}
// Execute 執行task返回結果
func (t *Task) Execute() string {
return "Task " + t.name + " run success"
}
// Coordinator 協調者
type Coordinator struct {
members []*Member
tasks []*Task
}
// TaskAssignments 為member分配任務
func (c *Coordinator) TaskAssignments() map[int]*Member {
taskAssignments := make(map[int]*Member)
// 構建迭代器
memberIt := c.getMemberIterator()
for _, task := range c.tasks {
member := memberIt.Next().(*Member)
_, err := taskAssignments[member.ID()]
if err == false {
taskAssignments[member.ID()] = member
}
member.AddTask(task)
}
return taskAssignments
}
func (c *Coordinator) getMemberIterator() *CircularIterator {
// 通過當前成員, 構造成員隊列
members := make([]interface{}, len(c.members))
for index, member := range c.members {
members[index] = member
}
return NewCircularIterftor(members)
}
// AddMember 添加member組成員
func (c *Coordinator) AddMember(m *Member) bool {
for _, member := range c.members {
if member == m {
return false
}
}
c.members = append(c.members, m)
return true
}
// AddTask 添加任務
func (c *Coordinator) AddTask(t *Task) bool {
for _, task := range c.tasks {
if task == t {
return false
}
}
c.tasks = append(c.tasks, t)
return true
}
```
## 測試
我們首先創建一堆member和task, 然后調用coordinator進行任務分配,執行任務結果
```go
coordinator := NewCoordinator()
for i := 0; i < 10; i++ {
m := &Member{id: i}
coordinator.AddMember(m)
}
for i := 0; i < 30; i++ {
t := &Task{name: fmt.Sprintf("task %d", i)}
coordinator.AddTask(t)
}
result := coordinator.TaskAssignments()
for _, member := range result {
member.Execute()
}
```
## 結果
可以看到每個worker均勻的得到任務分配
```bash
Member 6 run task Task task 6 run success
Member 6 run task Task task 16 run success
Member 6 run task Task task 26 run success
Member 8 run task Task task 8 run success
Member 8 run task Task task 18 run success
Member 8 run task Task task 28 run success
Member 0 run task Task task 0 run success
Member 0 run task Task task 10 run success
Member 0 run task Task task 20 run success
Member 3 run task Task task 3 run success
Member 3 run task Task task 13 run success
Member 3 run task Task task 23 run success
Member 4 run task Task task 4 run success
Member 4 run task Task task 14 run success
Member 4 run task Task task 24 run success
Member 7 run task Task task 7 run success
Member 7 run task Task task 17 run success
Member 7 run task Task task 27 run success
Member 9 run task Task task 9 run success
Member 9 run task Task task 19 run success
Member 9 run task Task task 29 run success
Member 1 run task Task task 1 run success
Member 1 run task Task task 11 run success
Member 1 run task Task task 21 run success
Member 2 run task Task task 2 run success
Member 2 run task Task task 12 run success
Member 2 run task Task task 22 run success
Member 5 run task Task task 5 run success
Member 5 run task Task task 15 run success
Member 5 run task Task task 25 run success
```
## 完整代碼
```go
package main
import "fmt"
// CircularIterator 環狀迭代器
type CircularIterator struct {
list []interface{}
next int
}
// Next 返回下一個元素
func (c *CircularIterator) Next() interface{} {
item := c.list[c.next]
c.next = (c.next + 1) % len(c.list)
return item
}
// Add 添加任務
func (c *CircularIterator) Add(v interface{}) bool {
for _, item := range c.list {
if v == item {
return false
}
}
c.list = append(c.list, v)
return true
}
// Member 任務組成員
type Member struct {
id int
tasks []*Task
}
// ID 返回當前memberID
func (m *Member) ID() int {
return m.id
}
// AddTask 為member添加任務
func (m *Member) AddTask(t *Task) bool {
for _, task := range m.tasks {
if task == t {
return false
}
}
m.tasks = append(m.tasks, t)
return true
}
// Execute 執行任務
func (m *Member) Execute() {
for _, task := range m.tasks {
fmt.Printf("Member %d run task %s\n", m.ID(), task.Execute())
}
}
// Task 任務
type Task struct {
name string
}
// Execute 執行task返回結果
func (t *Task) Execute() string {
return "Task " + t.name + " run success"
}
// Coordinator 協調者
type Coordinator struct {
members []*Member
tasks []*Task
}
// TaskAssignments 為member分配任務
func (c *Coordinator) TaskAssignments() map[int]*Member {
taskAssignments := make(map[int]*Member)
// 構建迭代器
memberIt := c.getMemberIterator()
for _, task := range c.tasks {
member := memberIt.Next().(*Member)
_, err := taskAssignments[member.ID()]
if err == false {
taskAssignments[member.ID()] = member
}
member.AddTask(task)
}
return taskAssignments
}
func (c *Coordinator) getMemberIterator() *CircularIterator {
// 通過當前成員, 構造成員隊列
members := make([]interface{}, len(c.members))
for index, member := range c.members {
members[index] = member
}
return NewCircularIterftor(members)
}
// AddMember 添加member組成員
func (c *Coordinator) AddMember(m *Member) bool {
for _, member := range c.members {
if member == m {
return false
}
}
c.members = append(c.members, m)
return true
}
// AddTask 添加任務
func (c *Coordinator) AddTask(t *Task) bool {
for _, task := range c.tasks {
if task == t {
return false
}
}
c.tasks = append(c.tasks, t)
return true
}
// NewCircularIterftor 返回迭代器
func NewCircularIterftor(list []interface{}) *CircularIterator {
iterator := CircularIterator{}
for _, item := range list {
iterator.Add(item)
}
return &iterator
}
// NewCoordinator 返回協調器
func NewCoordinator() *Coordinator {
c := Coordinator{}
return &c
}
func main() {
coordinator := NewCoordinator()
for i := 0; i < 10; i++ {
m := &Member{id: i}
coordinator.AddMember(m)
}
for i := 0; i < 30; i++ {
t := &Task{name: fmt.Sprintf("task %d", i)}
coordinator.AddTask(t)
}
result := coordinator.TaskAssignments()
for _, member := range result {
member.Execute()
}
}
```
任務協調是一個非常復雜的事情, 內部的任務平臺,雖然實現了基于任務的組合和app化,但是任務調度分配著一塊,仍然沒有去做,只是簡單的根據樹形任務去簡單的做一些分支任務的執行,未來有時間再做吧,要繼續研究下一個模塊了。
上述就是小編為大家分享的基于環狀隊列和迭代器如何實現分布式任務RR分配策略了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。