您好,登錄后才能下訂單哦!
人工盯著服務器,出了問題,到機器前面,翻日志,查狀態,手動操作
開始寫一些自動化腳本,啟動計劃任務,自動啟動服務,監控服務等
腳本功能太弱,開發了大量工具,某種工具解決某個特定領域的問題,常用的有ansible,puppet等
將工具整合,自主研發,實現標準化,實現自動化流程控制,而今,平臺已經開始邁向智能化的發展方向。
https://gitee.com/ChangPaoZhe/mschedule
1 分發任務
分發腳本到目前節點上去執行
2 控制
控制并發,控制多少個節點同時執行
對錯誤做出響應,由用戶設定,最多允許失敗的比例或者數量,當超過范圍時,需要終止任務執行
3 能跨機房部署
4 能對作業做版本控制,這是輔助功能,可過后實現
本項目的出發點,是只需要會使用shell腳本就可以了,可以通過使用shell腳本的方式來完成遠程任務的下發和處理流程。
ansible,salt等需要學習特定的內部語言,如果覺得ansible這樣的工具不能滿足需求,二次開發難度過高,代碼量不小,本身它們開發接口不完善,而且熟悉它的叫也比較難,就算開發出來維護也難。
從這些項目上二次開發,等于拉一個分支,如果主分支有了新的特性,想要合并也是比較困難的。
自己開發,滿足自己需求,完全適合自己需求,代碼規模可控,便于他人接收維護。
自己開發就是造輪子,造輪子不是不好,其起初要實現的功能應該是比較簡單的。后面可以逐步進行完善操作。
瀏覽器端和webSERVER端交互是通過HTTP實現的,而WEB server和master server 是通過TCP鏈接來實現的,master server 和agent之間也是通過TCP 鏈接來實現的
有agent類,被控節點需要安裝或運行特殊的軟件,用于和服務器端進行通信,服務器端把腳本,命令傳遞給agent端,由agent端控制來執行
被控節點不需要安裝或者運行特殊軟件,如通過SSH來實現,這其實也是有agent的,不過不是自己寫的程序
優缺點
1 通用,簡單,易實現,但管理不善,容易出現安全問題
2 并行效率不高,有agent的并行執行可以不和管理服務器通信,可以并發很高,ssh執行要和master之間通信
3 ssh鏈接是有狀態的,任務執行的時候,master不能掛了,否則任務將執行失敗。
python 中有很多運行進程的方式,不過都過時了。
建議使用標準庫subprocess模塊,啟動一個子進程。
def __init__(self, args, bufsize=-1, executable=None,
stdin=None, stdout=None, stderr=None,
preexec_fn=None, close_fds=_PLATFORM_DEFAULT_CLOSE_FDS,
shell=False, cwd=None, env=None, universal_newlines=False,
startupinfo=None, creationflags=0,
restore_signals=True, start_new_session=False,
pass_fds=()):
第一個是參數,后面是可選,但shell默認為False,可將其置為True, stdout 后面跟文件或管道
def wait(self, timeout=None, endtime=None):
"""Wait for child process to terminate. Returns returncode
attribute."""
if endtime is not None:
timeout = self._remaining_time(endtime)
if timeout is None:
timeout_millis = _winapi.INFINITE
else:
timeout_millis = int(timeout * 1000)
if self.returncode is None:
result = _winapi.WaitForSingleObject(self._handle,
timeout_millis)
if result == _winapi.WAIT_TIMEOUT:
raise TimeoutExpired(self.args, timeout)
self.returncode = _winapi.GetExitCodeProcess(self._handle)
return self.returncode
此處返回是狀態,0為成功,其他為失敗
stdout 方法調用的是一個文件,因此可使用文件的形式進行處理
if c2pread != -1:
self.stdout = io.open(c2pread, 'rb', bufsize)
if universal_newlines:
self.stdout = io.TextIOWrapper(self.stdout)
#!/usr/bin/poython3.6
#conding:utf-8
import subprocess
from subprocess import Popen,PIPE
out=Popen("echo 'hello'",shell=True,stdout=PIPE)
code=out.wait(10)
txt=out.stdout.read()
print ("code={} txt={}".format(code,txt.decode()))
結果如下
mkdir mschedule -p
cd mschedule/
pyenv virtualenv 3.5.3 msch
pyenv local msch
#!/usr/bin/poython3.6
#conding:utf-8
from subprocess import PIPE,Popen
class Executor:
def run(self,script,timeout):
p=Popen(script,shell=True,stdout=PIPE)
code=p.wait(timeout=timeout)
txt=p.stdout.read()
return (code,txt)
if __name__ == "__main__":
exec=Executor()
print (exec.run("echo 'hello'",3))
結果如下
用戶和master server 通信,提交任務,此處是通過HTTP的方式提交任務
master 按照用戶要求將任務分發到指定的節點上,這些節點上需要有agent用于和master通信,接受master發布的任務,并執行這些任務
設計agent,越簡單越好,越簡單bug越少,越穩定。
從本質上來說,master,agent設計是典型的CS編程模式
master作為CS中的server,agent作為CS中的client
agent啟動后,需要主動連接server,并注冊自己
信息包括
hostname:報告自己的主機名稱,此主機名稱可能會重復UUID,用于唯一標識這臺主機
IP: 用于更加方便的管理主機
其它相關信息視情況而定
{
"type": "register", # 此處用于定義消息類型
"payload":{
"id" : uuid, #用于唯一標識一臺主機
"hostname": "xxxx", # 對應agent名稱
"IP": [], # agent IP地址,其可能包含多個IP地址,因此此處使用列表進行存儲
}
}
agent定時向master發送心跳包,包含UUID這個唯一標識,附帶hostname和ip地址,hostname和ip都可能變動,但agent不變,其UUID便不會發生變化,其他相關信息科一附加, 如更加flag,用于標識agent是否有正在執行的任務。
{
"type": "heartbeat", # 此處用于定義消息類型
"payload":{
"id" : uuid, #用于唯一標識一臺主機
"hostname": "xxxx", # 對應agent名稱
"IP": [], # agent IP地址,其可能包含多個IP地址,因此此處使用列表進行存儲
}
}
master分派任務給agent,發送任務描述信息到agent。
注意腳本字符串使用base64編碼
{
"type" :"task",
"payload" :{
"id" :"task-uuid", # 定義任務的唯一標識
"script" : "base64code", #定義執行任務的內容
"timeout" :0, # 定義超時時長
"parallel" :1, # 定義并行執行數
"fail_rate" :0, # 定義失敗率,及百分比為多少代表失敗
"fail_count" :-1 # 定義失敗的次數為多少次表示失敗,-1表示不關心
}
}
當agent任務執行完成后,返回給master該任務執行的狀態碼和輸出結果。
{
"type" :"result",
"payload" :{
"id": "task-uuid", # 定義任務唯一標識
"agent_id": "agent-uuid", #定義任務執行者
"code" : 0, #定義任務執行結果返回值。0 表示成功,其他表示失敗
"output" :"base64encode" # 定義任務執行結果,及輸出到控制臺的結果
}
}
以上的master,agent之間需要傳遞消息,消息采用json格式。
具體代碼如下
#!/usr/bin/poython3.6
#conding:utf-8
import logging
def getlogger(mod_name:str,filepath:str='/var/log/mschedule'):
logger=logging.getLogger(mod_name) # 獲取名字
logger.setLevel(logging.INFO) # 添加日志級別
logger.propagate=False # 配置不想上傳遞
handler=logging.FileHandler("{}/{}.log".format(filepath,mod_name))
fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s (%(filename)s:L%(lineno)d)",
datefmt='%Y-%m-%d %H:%M:%S')
handler.setFormatter(fmt)
logger.addHandler(handler)
return logger
if __name__ == "__main__":
log = getlogger('test')
log.info('13234545654')
結果如下
原生的socket編程過于底層,很少使用,任何一門語言都要避開直接使用socket庫開發,太過底層,難寫難維護。
zeroprc 是基于 ZeroMQ和MessagePack 來實現的通信工具。
官網地址
http://www.zerorpc.io
安裝
pip install zerorpc
根目錄創建app.py和appserver.py
server 端配置
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc
class HelloRPC(object): #定義方法
def hello(self, name):
return "Hello, %s" % name
s = zerorpc.Server(HelloRPC()) # 方法注入
s.bind("tcp://0.0.0.0:8080") # 綁定方法
s.run() # 運行方法
client端配置
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc
c = zerorpc.Client()
c.connect("tcp://127.0.0.1:8080")
print (c.hello("RPC"))
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc
import threading
c = zerorpc.Client()
c.connect("tcp://127.0.0.1:8080")
e=threading.Event()
while not e.wait(3):
print(c.hello('test client'))
print ('```````````````')
結果如下
使用uuid.uuid4().hex 獲取一個uuid,一個節點起始運行的時候是沒有uuid的,一旦運行會生成一個uuid,并持久化到一個文件中,下次運行先找這個文件,如果文件中有uuid,就直接讀取,沒有uuid就重新生成并寫入到該文件中。
#!/usr/bin/poython3.6
#conding:utf-8
#!/usr/bin/poython3.6
#conding:utf-8
import uuid
print (uuid.uuid4().hex)
print (uuid.uuid4().hex)
print (uuid.uuid4().hex)
結果如下
windows 和Linux 獲取主機名稱的方式是不同的
可以在所有平臺上是使用socket.gethostname()獲取主機名。
#!/usr/bin/poython3.6
#conding:utf-8
import socket
print (socket.gethostname())
pip install netifaces
netifaces.interfaces() 返回接口列表
netifaces.ifaddresss(interface) 獲取指定接口的IP地址,返回相關信息
ip地址判斷
#!/usr/bin/poython3.6
#conding:utf-8
import ipaddress
ips=['127.0.0.1','192.168.0.1','169.254.123.1','0.0.0.0','239.168.0.255','224.0.0.1','8.8.8.8']
for ip in ips:
print (ip)
ip=ipaddress.ip_address(ip)
print ('Linklocal {}'.format(ip.is_link_local)) # 169.254地址
print ('回環 {}'.format(ip.is_loopback)) # 回環
print ('多播 {}'.format(ip.is_multicast)) # 多播
print ('公網 {}'.format(ip.is_global)) # 公網,全球范圍地址
print ('私有 {}'.format(ip.is_private)) # 私有地址
print ('保留 {}'.format(ip.is_reserved)) # 保留地址
print ('版本 {}'.format(ip.version)) #ipv4地址
print ('----------------------------')
結果如下
#!/usr/bin/poython3.6
#conding:utf-8
import netifaces
print (netifaces.interfaces()) # 獲取所有的網卡接口
for i in netifaces.interfaces():
print ('i....',netifaces.ifaddresses(i)) # 使用ifaddress獲取端口對應的IP地址
print ()
print ('------------------------------')
print ()
print ('[2]',netifaces.ifaddresses(i)[2]) # 獲取字典key為2的對應的值
結果如下
其是一個字典,key為2就是ipv4地址
每一個接口返回的ipv4地址是一個列表,也就是說可以有多個,ipv4地址描述是在addr上
#!/usr/bin/poython3.6
#conding:utf-8
import netifaces
print (netifaces.interfaces()) # 獲取所有的網卡接口
for i in netifaces.interfaces():
for p in netifaces.ifaddresses(i)[2]:
if p['addr']:
print ('ip',p['addr']) # 獲取ip地址
結果如下
#!/usr/bin/poython3.6
#conding:utf-8
import netifaces
import ipaddress
print (netifaces.interfaces()) # 獲取所有的網卡接口
for i in netifaces.interfaces():
for p in netifaces.ifaddresses(i)[2]:
if p['addr']:
ip=ipaddress.ip_address(p['addr']) #獲取ip地址
if ip.is_loopback or ip.is_multicast or ip.is_link_local or ip.is_reserved: # 判斷IP地址
continue
print (ip)
結果如下
在agent文件包中創建msg.py文件,用于存儲相關主從信息和配置信息
#!/usr/bin/poython3.6
#conding:utf-8
import socket
import uuid
import netifaces
import ipaddress
import os
class Messgae:
def __init__(self,myidpath):
if os.path.exists(myidpath): # 如果存在
with open(myidpath) as f:
self.id=f.readline().strip()
else:
self.id=uuid.uuid4().hex
with open(myidpath,'w') as f:
f.write(self.id)
def get_ipaddress(self):
address=[]
for p in netifaces.interfaces(): # 獲取網口列表
n=netifaces.ifaddresses(p) # 獲取字典
if n.get(2): # 查看是否存在ipv4地址
for ip in n[2]: # 此處獲取對應列表的值
if ip['addr']: # 查看ip地址是否存在
ip=ipaddress.ip_address(ip['addr'])
if ip.is_reserved or ip.is_multicast or ip.is_link_local or ip.is_loopback:
continue
address.append(str(ip))
return address
def hearbeat(self):
return {
"type" :"hearbeat",
"payload" :{
"ip" : self.get_ipaddress(),
"hostname" : socket.gethostname(),
"id" : self.id
}
}
def reg(self):
return {
"type" :"register",
"payload" :{
"ip" : self.get_ipaddress(),
"hostname" : socket.gethostname(),
"id" : self.id
}
}
if __name__ == "__main__":
msg=Messgae('/var/log/mschedule/uuid')
print (msg.reg())
測試結果如下
agent中創建config模塊用于添加相關鏈接服務端IP地址
agent中創建cm 模塊用于處理鏈接相關配置
config.py 配置如下
#!/usr/bin/poython3.6
#conding:utf-8
CONN_URL="tcp://127.0.0.1:9000"
cm.py 模塊配置如下
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc #添加模塊
import threading # 用于處理中斷相關
from .msg import Messgae # 獲取消息
from .config import CONN_URL
from utils import getlogger
class Conn_Manager:
def __init__(self,timeout=3):
self.timeout=timeout
self.client=zerorpc.Client()
self.event=threading.Event()
self.message=Messgae('/var/log/mschedule/uuid') # 此處用于初始化消息
self.log=getlogger('agent') # 此處填寫相關的log日志名稱
def start(self):
self.client.connect(CONN_URL) # 鏈接處理
self.log.info('注冊消息發送 {}'.format(self.client.send(self.message.reg()))) # 發送心跳信息
self.client.send(self.message.reg()) #處理注冊消息
while not self.event.wait(self.timeout): # 等待的時間
self.log.info('心跳消息發送 {}'.format(self.client.send(self.message.hearbeat()))) # 發送心跳信息
def shutdown(self):
self.log.info("關閉操作")
self.client.close()
self.event.set()
agent 中 _init_.py 端配置
#!/usr/bin/poython3.6
#conding:utf-8
from .cm import Conn_Manager
class app:
def __init__(self,timeout):
self.conn=Conn_Manager(timeout)
def start(self):
self.conn.start()
def shutdown(self):
self.conn.shutdown()
全局根目錄下 app.py 端配置如下
#!/usr/bin/poython3.6
#conding:utf-8
from agent import app
if __name__ == "__main__":
agent=app(3)
try:
agent.start()
except KeyboardInterrupt:
agent.shutdown()
服務端測試文件appserver 配置如下
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc
class HelloRPC(object): #定義方法
def send(self, name):
return "Hello, %s" % name
s = zerorpc.Server(HelloRPC()) # 方法注入
s.bind("tcp://0.0.0.0:9000") # 綁定方法
s.run() # 運行方法
啟動結果如下
日志結果如下
處理客戶端重連機制
默認的,服務端關閉后,客戶端結果如下
處理結果如下
cm.py如下
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc #添加模塊
import threading # 用于處理中斷相關
from .msg import Messgae # 獲取消息
from .config import CONN_URL
from utils import getlogger
class Conn_Manager:
def __init__(self,timeout=3):
self.timeout=timeout
self.client=zerorpc.Client()
self.event=threading.Event()
self.message=Messgae('/var/log/mschedule/uuid') # 此處用于初始化消息
self.log=getlogger('agent') # 此處填寫相關的log日志名稱
def start(self):
try:
self.client.connect(CONN_URL) # 鏈接處理
self.log.info('注冊消息發送 {}'.format(self.client.send(self.message.reg()))) # 發送心跳信息
self.client.send(self.message.reg()) #處理注冊消息
while not self.event.wait(self.timeout): # 等待的時間
self.log.info('心跳消息發送 {}'.format(self.client.send(self.message.hreadbeat()))) # 發送心跳信息
except Exception as e:
print ('--------------------')
self.event.set()
raise e # 此處是拋出異常到上一級
def shutdown(self):
self.log.info("關閉操作")
self.client.close()
self.event.set()
agent._init_.py 結果如下
#!/usr/bin/poython3.6
#conding:utf-8
from .cm import Conn_Manager
import threading
class app:
def __init__(self,timeout):
self.conn=Conn_Manager(timeout)
self.event=threading.Event()
def start(self):
while not self.event.is_set():
try:
self.conn.start()
except Exception as e:
print('重連')
self.conn.shutdown()
self.event.wait(3)
def shutdown(self):
self.event.set()
self.conn.shutdown()
app.py 如下
#!/usr/bin/poython3.6
#conding:utf-8
from agent import app
if __name__ == "__main__":
agent=app(3)
try:
agent.start()
except KeyboardInterrupt:
agent.shutdown()
結果如下
綁定端口,啟動監聽,等待agent鏈接。
存儲agent列表
存儲用戶提交的Task列表,用戶通過WEB提交的任務信息存儲下來。
將注冊信息寫入agent列表
接受心跳信息
接受agent端發送的心跳信息
將用戶提交的任務分配到agent端
用于指定服務端綁定IP地址和端口號
#!/usr/bin/poython3.6
#conding:utf-8
MASTER_URL="tcp://0.0.0.0:9000"
if __name__ == "__main__":
pass
主要負責客戶端數據的調度
#!/usr/bin/poython3.6
#conding:utf-8
from utils import getlogger
log=getlogger('handler')
class Handler(object):
def send(self,msg): # 定義一個可調用的基礎函數
log.info(" ack ok {}".format(msg))
return " ack ok {}".format(msg)
用于tcp 鏈接建立和關閉
#!/usr/bin/poython3.6
#conding:utf-8
from utils import getlogger
from .config import MASTER_URL
import zerorpc
from .handler import Handler
log=getlogger('server')
class Master_Listen:
def __init__(self):
self.server=zerorpc.Server(Handler())
def start(self):
self.server.bind(MASTER_URL)
log.info('Master 啟動配置')
self.server.run()
def shutdown(self):
self.server.close()
#!/usr/bin/poython3.6
#conding:utf-8
from .cm import Master_Listen
class appserver:
def __init__(self):
self.appserver=Master_Listen()
def start(self):
self.appserver.start()
def shutdown(self):
self.appserver.shutdown()
#!/usr/bin/poython3.6
#conding:utf-8
from master import appserver
if __name__ == "__main__":
appserver=appserver()
try:
appserver.start()
except KeyboardInterrupt:
appserver.shutdown()
啟動服務測試如下
結果如下
上述代碼實現了基本的注冊,心跳部分的功能
經觀察可知,目前注冊和心跳除了類型不同外,其可以認為第一次心跳成功就是注冊。
master端核心需要存儲2中數據:agent端數據,用戶客戶端瀏覽器提交的任務Task,構造出一個數據結構,存儲相關信息.具體數據結構如下
{
"agents" :{
"agent_id" :{
"heartbeat" :"timestamp",
"busy" :False,
"info" :{
"hostname" :"",
"ip" :[]
}
}
}
}
數據結構解釋如下
1 agents里面記錄了所有注冊的agent
agent_id,字典的key,每一個agent 都有一個不同uuid,所以這個字典的鍵就是uuid,
heartbeat 由于設計中并沒有讓agent端發送心跳時間,所以就在master端記錄了收到的時間
busy 如果agent 上有任務在執行。則此值表現為True
info 記錄agent上發過來的hostname和ip列表
{
"tasks" :{
"task_id" :{
"script" :"base64encode",
"targets" :{
"agent_id" :{
"state":"WAITING",
"output" :""
}
},
"state" :"WAITING"
}
}
}
task 記錄所有任務及target(agent)的狀態
task_id ,字典的key對應一個一個task,item 也是taskid:{} 結構
task 任務,task.json 的payload信息
targets目標,用于指定agent的節點,記錄agent上的state和輸出output
state狀態,單個agent上的執行狀態state 這是一個task的狀態,整個任務的狀態,比如統計達到了agent失敗上限了,這個task的state 就置為失敗
狀態常量
"WAITING" "RUNNING" "SUCCEED" "FAILED"
創建 storage.py 模塊
構建Storage 類,用于存儲用戶信息
#!/usr/bin/poython3.6
#conding:utf-8
import datetime
class Storage:
def __init__(self):
self.agents={} # 此處用于存儲用戶信息
self.tasks={} # 此處用于存儲作業信息
def reg_hb(self,agent_id,info): # id 及就是客戶端的id ,info 及就是host和ip地址
self.agents[agent_id] = {
'heaerbeat' : datetime.datetime.now(),
'info' :info,
'busy':self.agents.get(agent_id,{}).get('busy',False)
}
# busy 讀不到置False,讀到了不變
handler.py端配置如下
#!/usr/bin/poython3.6
#conding:utf-8
from utils import getlogger
from .storage import Storage
log=getlogger('handler')
class Handler(object):
def __init__(self):
self.store=Storage()
def send(self,msg): # 定義一個可調用的基礎函數,此處的msg及就是對應的函數
log.info('客戶端agent發送消息為:{}'.format(msg))
try:
if msg['type'] in {'hearbeat','register'}:
payload=msg['payload']
info={'hostname' :payload['hostname'],'ip' :payload['ip']}
self.store.reg_hb(payload['id'],info)
log.info("客戶端數據列表為:{}".format(self.store.agents)) # 客戶端的列表
return "agent信息為: {}".format(msg)
except Exception as e:
log.error("注冊客戶端信息錯誤為:{}".format(e))
return "Bad Request...."
運行結果如下
用戶通過WEB(HTTP)提交新的任務,任務json信息有:
1 任務腳本script,base64編碼
2 超時時間timeout
3 并行度 parallel
4 失敗率 fail_rate
5 失敗次數fail_count
6 targets 是跑任務的Agent的agent_id列表,這個目前也是在用戶端選好的,如yoghurt需要在主機名為webserver-xxxx的幾臺設備上運行腳本,為了用戶方便,可以使用類似ansible的分組。在Master端受到信息后,需要添加2個信息
task_id 是Mater 端新建任務時生成的uuid
state 默認狀態是WAITING
在WEB server 中最后將用戶端發送來的數據組成下面的字典
task={
"task_id" :t.id,
"script" :t.script,
"timeout":t.timeout,
"parallel" :t.parallelm,
"fail_rate":t.fail_rate,
"fail_count":t.fail_count,
"state":t.state,
"targets":t.targets
}
用于處理相關消息的類型
#!/usr/bin/poython3.6
#conding:utf-8
WAITING='WAITING'
RUNNING='RUNNING'
SUCCEED='SUCCEED'
FAILED='FAILED'
創建master/task.py 類處理webserver端數據
\
#!/usr/bin/poython3.6
#conding:utf-8
import uuid # 獲取唯一的task_id
from .state import *
class Task:
def __init__(self,task_id,script,targets,timeout=0,parallel=1,fail_rate=0,fail_count=-1):
self.id=task_id # task唯一標識,用于確定任務
self.script=script # 對應的腳本內容,客戶端輸入的腳本
self.timeout=timeout # 超時時間
self.parallel=parallel # 并行執行數量
self.fail_rate=fail_rate #失敗率
self.fail_count=fail_count #失敗數
self.state=WAITING # 對應的消息的狀態
self.targets={agent_id:{'state' : WAITING,'output':''} for agent_id in targets} # 此處對應客戶端列表
self.target_count=len(self.targets) # 此處對應客戶端的數量
在master.storage.py模塊中進行相關方法調用,并將其存儲進入task中
#!/usr/bin/poython3.6
#conding:utf-8
import datetime
from .task import Task
class Storage:
def __init__(self):
self.agents={} # 此處用于存儲用戶信息
self.tasks={} # 此處用于存儲作業信息
def reg_hb(self,agent_id,info): # id 及就是客戶端的id ,info 及就是host和ip地址
self.agents[agent_id] = {
'heaerbeat' : datetime.datetime.now(),
'info' :info,
'busy':self.agents.get(agent_id,{}).get('busy',False)
}
# busy 讀不到置False,讀到了不變
def add_task(self,task:dict): # 此處用于從客戶端獲取相關的數據
t=Task(**task) # 此處進行參數解構
self.tasks[t.id]=t
return t.id # 此處用于獲取處理id
在master/handler.py 中處理用于webservr調用相關配置
#!/usr/bin/poython3.6
#conding:utf-8
from utils import getlogger
from .storage import Storage
import uuid
log=getlogger('handler')
class Handler(object):
def __init__(self):
self.store=Storage()
def send(self,msg): # 定義一個可調用的基礎函數,此處的msg及就是對應的函數
log.info('客戶端agent發送消息為:{}'.format(msg))
try:
if msg['type'] in {'hearbeat','register'}:
payload=msg['payload']
info={'hostname' :payload['hostname'],'ip' :payload['ip']}
self.store.reg_hb(payload['id'],info)
log.info("客戶端數據列表為:{}".format(self.store.agents)) # 客戶端的列表
return "agent信息為: {}".format(msg)
except Exception as e:
log.error("注冊客戶端信息錯誤為:{}".format(e))
return "Bad Request...."
def add_task(self,task): # 此處用于在webserver 端創建的agent調用方法返回結果
task['task_id']=uuid.uuid4().hex # 用于生成相關的任務id
return self.store.add_task(task) # 此處用于調用相關配置
def get_agents(self):
return self.store.get_agents()
任務在Storage中存儲,一旦有了任務,需要將任務分派到指定節點執行,交給這些節點上的agent
不過,目前使用zerorpc,master是被動的接受agent端的數據并進行相關的響應操作,所以可以考慮使用一種agent端主動拉取數據的機制,提供一個接口,讓agent訪問,如果agent處于空閑狀態,則就主動拉取任務,有任務就領走。
當agent少的時候,master推送任務到agent端,或者agent端主動拉取任務都是可以的,但是如果考慮到agent多的時候,或許使用agent拉模式是一個更好的選擇。本次采用agent拉取模式實現,所以master就不需要設計調度器了
agent/state.py
#!/usr/bin/poython3.6
#conding:utf-8
WAITING='WAITING'
RUNNING='RUNNING'
SUCCEED='SUCCEED'
FAILED='FAILED'
用于返回至server端,用于最后返回至web瀏覽器端
#!/usr/bin/poython3.6
#conding:utf-8
import socket
import uuid
import netifaces
import ipaddress
import os
class Messgae:
def __init__(self,myidpath):
if os.path.exists(myidpath): # 如果存在
with open(myidpath) as f:
self.id=f.readline().strip()
else:
self.id=uuid.uuid4().hex
with open(myidpath,'w') as f:
f.write(self.id)
def get_ipaddress(self):
address=[]
for p in netifaces.interfaces(): # 獲取網口列表
n=netifaces.ifaddresses(p) # 獲取字典
if n.get(2): # 查看是否存在ipv4地址
for ip in n[2]: # 此處獲取對應列表的值
if ip['addr']: # 查看ip地址是否存在
ip=ipaddress.ip_address(ip['addr'])
if ip.is_reserved or ip.is_multicast or ip.is_link_local or ip.is_loopback:
continue
address.append(str(ip))
return address
def hearbeat(self):
return {
"type" :"hearbeat",
"payload" :{
"ip" : self.get_ipaddress(),
"hostname" : socket.gethostname(),
"id" : self.id
}
}
def reg(self):
return {
"type" :"register",
"payload" :{
"ip" : self.get_ipaddress(),
"hostname" : socket.gethostname(),
"id" : self.id
}
}
def result(self,task_id,code,output): # 返回數據至web端,處理相關數據執行結果的返回
return {
"type" :"result",
"payload" :{
"id" : task_id, # 此處用于定義task_id 及任務id
"agent_id" :self.id, # 此處用于獲取客戶端id
"code" : code, # 此處用于對執行結果狀態進行保存
"output" : output #此處用于對執行結果的輸出信息進行保存,并進行相關配置
}
}
用于處理配置拉取相關事宜
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc #添加模塊
import threading # 用于處理中斷相關
from .msg import Messgae # 獲取消息
from .state import *
from .config import CONN_URL
from .executor import Executor
from utils import getlogger
class Conn_Manager:
def __init__(self,timeout=3):
self.timeout=timeout
self.client=zerorpc.Client()
self.event=threading.Event()
self.message=Messgae('/var/log/mschedule/uuid') # 此處用于初始化消息
self.log=getlogger('agent') # 此處填寫相關的log日志名稱
self.state=WAITING
self.exec=Executor()
def start(self):
try:
self.event.clear()
self.client.connect(CONN_URL) # 鏈接處理
self.log.info('注冊消息發送 {}'.format(self.client.send(self.message.reg()))) # 發送心跳信息
self.client.send(self.message.reg()) #處理注冊消息
while not self.event.wait(self.timeout): # 等待的時間
self.log.info('心跳消息發送 {}'.format(self.client.send(self.message.hearbeat()))) # 發送心跳信息
task=self.client.get_task(self.message.id) # 此處返回三個參數,1 為taskid,二是script ,三是timeout
if task:
code,output=self.exec.run(task[1],task[2])
self.client.send(self.message.result(task[0],code,output))
else:
return "目前無消息"
except Exception as e:
self.event.set()
raise e # 此處是拋出異常到上一級
def shutdown(self):
self.log.info("關閉操作")
self.client.close()
self.event.set()
master/storage.py 用于配置獲取agent_id和task相關信息
#!/usr/bin/poython3.6
#conding:utf-8
import datetime
from .task import Task
from .state import *
class Storage:
def __init__(self):
self.agents={} # 此處用于存儲用戶信息
self.tasks={} # 此處用于存儲作業信息
def reg_hb(self,agent_id,info): # id 及就是客戶端的id ,info 及就是host和ip地址
self.agents[agent_id] = {
'heaerbeat' : datetime.datetime.now(),
'info' :info,
'busy':self.agents.get(agent_id,{}).get('busy',False)
}
# busy 讀不到置False,讀到了不變
def get_agents(self):
return self.agents
def add_task(self,task:dict): # 此處用于從客戶端獲取相關的數據
t=Task(**task) # 此處進行參數解構
self.tasks[t.id]=t
return t.id # 此處用于獲取處理id
@property
def itme_task(self):
yield from (task for task in self.tasks.values()) # 此處返回task
def get_task(self,agent_id):
return [task.id,task.script,task.timeout]
master/handler.py 配置如下
#!/usr/bin/poython3.6
#conding:utf-8
from utils import getlogger
from .storage import Storage
import uuid
log=getlogger('handler')
class Handler(object):
def __init__(self):
self.store=Storage()
def send(self,msg): # 定義一個可調用的基礎函數,此處的msg及就是對應的函數
log.info('客戶端agent發送消息為:{}'.format(msg))
try:
if msg['type'] in {'hearbeat','register'}:
payload=msg['payload']
info={'hostname' :payload['hostname'],'ip' :payload['ip']}
self.store.reg_hb(payload['id'],info)
log.info("客戶端數據列表為:{}".format(self.store.agents)) # 客戶端的列表
return "agent信息為: {}".format(msg)
except Exception as e:
log.error("注冊客戶端信息錯誤為:{}".format(e))
return "Bad Request...."
def add_task(self,task): # 此處用于在webserver 端創建的agent調用方法返回結果
task['task_id']=uuid.uuid4().hex # 用于生成相關的任務id
return self.store.add_task(task) # 此處用于調用相關配置
def get_agents(self):
return self.store.get_agents()
def get_task(self,agent_id):
return self.store.get_task(agent_id)
master/handler.py中配置
#!/usr/bin/poython3.6
#conding:utf-8
from utils import getlogger
from .storage import Storage
import uuid
log=getlogger('handler')
class Handler(object):
def __init__(self):
self.store=Storage()
def send(self,msg): # 定義一個可調用的基礎函數,此處的msg及就是對應的函數
log.info('客戶端agent發送消息為:{}'.format(msg))
try:
if msg['type'] in {'hearbeat','register'}:
payload=msg['payload']
info={'hostname' :payload['hostname'],'ip' :payload['ip']}
self.store.reg_hb(payload['id'],info)
log.info("客戶端數據列表為:{}".format(self.store.agents)) # 客戶端的列表
return "agent信息為: {}".format(msg)
elif msg['type']=="result": # 此處用于處理相關返回信息
self.store.result(msg['payload']) # 調用對應方法
except Exception as e:
log.error("注冊客戶端信息錯誤為:{}".format(e))
return "Bad Request...."
def add_task(self,task): # 此處用于在webserver 端創建的agent調用方法返回結果
task['task_id']=uuid.uuid4().hex # 用于生成相關的任務id
return self.store.add_task(task) # 此處用于調用相關配置
def get_agents(self):
return self.store.get_agents()
def get_task(self,agent_id):
return self.store.get_task(agent_id)
def get_result(self,task_id): # 此處返回對應的值
return self.store.get_result(task_id)
master/stroage.py端配置
#!/usr/bin/poython3.6
#conding:utf-8
import datetime
from .task import Task
from .state import *
class Storage:
def __init__(self):
self.agents={} # 此處用于存儲用戶信息
self.tasks={} # 此處用于存儲作業信息
self.result={} # 用于存儲agent端返回的結果
def reg_hb(self,agent_id,info): # id 及就是客戶端的id ,info 及就是host和ip地址
self.agents[agent_id] = {
'heaerbeat' : datetime.datetime.now().timestamp(),
'info' :info,
'busy':self.agents.get(agent_id,{}).get('busy',False)
}
# busy 讀不到置False,讀到了不變
def get_agents(self):
return self.agents
def add_task(self,task:dict): # 此處用于從客戶端獲取相關的數據
t=Task(**task) # 此處進行參數解構
self.tasks[t.id]=t
return t.id # 此處用于獲取處理id
@property
def itme_task(self):
yield from (task for task in self.tasks.values()) # 此處返回task
def get_task(self,agent_id):
for task in self.itme_task:
if agent_id in task.targets: # 此處用于判斷當前節點接入任務情況
return [task.id,task.script,task.timeout]
def add_result(self,payload:dict):
self.result[payload['id']]=payload # 此處以task_id 為鍵,以payload為值進行處理
def get_result(self,task_id:dict):
return self.result.get(task_id['task_id']) # task_id,獲取對應的payload值
用戶通過WEB(HTTP)提交新的任務,任務json信息有:
1 任務腳本script,base64編碼
2 超時時間timeout
3 并行度 parallel
4 失敗率 fail_rate
5 失敗次數 fail_count
6 targets 是跑在agent上的agent_id 列表,可以讓用戶看到一個列表,通過列表的勾選來完成相關的操作
根目錄創建appwebserver.py配置
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc
from aiohttp import request,web_response,web,log
CONN_URL="tcp://127.0.0.1:9000"
client=zerorpc.Client()
client.connect(CONN_URL)
async def targetshandler(request:web.Request):
txt=client.get_agents() #通過zerorpc調用master端接口
return web.json_response(txt) # 返回json端數據
app=web.Application()
app.router.add_get('/task/targets',targetshandler) # 使用get方法進行處理
{
"script" : "echo hello",
"timeout" :20,
"targets" :[]
}
async def taskhandler(request:web.Request):
j = await request.json() # 獲取post 提交的數據,用于task任務數據生成
txt=client.add_task(j)
return web.Response(text=txt,status=201)
app.router.add_post('/task',taskhandler)
async def taskresult(request:web.Request):
j = await request.json()
txt =client.get_result(j)
return web.json_response(txt)
app.router.add_post('/result',taskresult)
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc
from aiohttp import request,web_response,web,log
CONN_URL="tcp://127.0.0.1:9000"
client=zerorpc.Client()
client.connect(CONN_URL)
async def targetshandler(request:web.Request):
txt=client.get_agents() #通過zerorpc調用master端接口
return web.json_response(txt) # 返回json端數據
app=web.Application()
app.router.add_get('/task/targets',targetshandler) # 使用get方法進行處理
async def taskhandler(request:web.Request):
j = await request.json()
txt=client.add_task(j)
return web.Response(text=txt,status=201)
app.router.add_post('/task',taskhandler)
async def taskresult(request:web.Request):
j = await request.json()
txt =client.get_result(j)
return web.json_response(txt)
app.router.add_post('/result',taskresult)
if __name__ == "__main__":
web.run_app(app,host='0.0.0.0',port=80)
當節點在進行相關事件調度處理時,其狀態應該是RUNNING狀態,當處理完成后,其狀態應該恢復稱為WAITING狀態。
當當前agent下的所有該任務都執行完成時的狀態,此處設計較為簡單,只是全部執行就將其狀態置位成功,否則為RUNNING狀態或者WAITING,當有一個agent領取任務時,其狀態將被置為RUNNING。
及就是當前節點執行當前任務的狀態,此狀態保存在task中的targets字典中,用于對其客戶端執行結果進行判斷而獲取其對應狀態。
主要是cm.py調整如下
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc #添加模塊
import threading # 用于處理中斷相關
from .msg import Messgae # 獲取消息
from .state import *
from .config import CONN_URL
from .executor import Executor
from utils import getlogger
class Conn_Manager:
def __init__(self,timeout=3):
self.timeout=timeout
self.client=zerorpc.Client()
self.event=threading.Event()
self.message=Messgae('/var/log/mschedule/uuid') # 此處用于初始化消息
self.log=getlogger('agent') # 此處填寫相關的log日志名稱
self.state=WAITING
self.exec=Executor()
def start(self):
try:
self.event.clear()
self.client.connect(CONN_URL) # 鏈接處理
self.log.info('注冊消息發送 {}'.format(self.client.send(self.message.reg()))) # 發送心跳信息
self.client.send(self.message.reg()) #處理注冊消息
while not self.event.wait(self.timeout): # 等待的時間
self.log.info('心跳消息發送 {}'.format(self.client.send(self.message.hearbeat()))) # 發送心跳信息
if self.state == WAITING: # 如果此處是空閑狀態,則進行領任務處理
print('獲取任務task')
task = self.client.get_task(self.message.id) # 此處返回三個參數,1 為taskid,二是script ,三是timeout
if task: # 領取成功,則進行執行相關任務.并上傳至服務器端其狀態
self.state = RUNNING # 此處任務成功的情況
code,output=self.exec.run(task[1],task[2])
self.client.send(self.message.result(task[0], code, output))
self.state=WAITING #狀態更新為當前正常狀態
else:
return "目前無消息"
except Exception as e:
self.event.set()
raise e # 此處是拋出異常到上一級
def shutdown(self):
self.log.info("關閉操作")
self.client.close()
self.event.set()
master/storage.py
#!/usr/bin/poython3.6
#conding:utf-8
import datetime
from .task import Task
from .state import *
from utils import getlogger
log=getlogger('storage')
class Storage:
def __init__(self):
self.agents={} # 此處用于存儲用戶信息
self.tasks={} # 此處用于存儲作業信息
self.result={} # 用于存儲agent端返回的結果
self.task_state=0 # 用于處理當所有agent狀態都修改為成功或失敗時將task的狀態也進行相關的修改
def reg_hb(self,agent_id,info): # id 及就是客戶端的id ,info 及就是host和ip地址
self.agents[agent_id] = {
'heaerbeat' : datetime.datetime.now().timestamp(),
'info' :info,
'busy':self.agents.get(agent_id,{}).get('busy',False)
}
# busy 讀不到置False,讀到了不變
def get_agents(self):
return self.agents
def add_task(self,task:dict): # 此處用于從客戶端獲取相關的數據
t=Task(**task) # 此處進行參數解構
self.tasks[t.id]=t
return t.id # 此處用于獲取處理id
@property
def itme_task(self):
yield from (task for task in self.tasks.values() if task.state in {WAITING,RUNNING}) # 此處返回task,當其中有成功或者失敗時,則不用進行相關的操作處理
#當為WAITING或者RUNNING 時,則進行相關的操作,其他情況則不進行相關操作
def get_task(self,agent_id):
for task in self.itme_task:
if agent_id in task.targets: # 此處用于判斷當前節點接入任務情況
if task.state==WAITING:
task.state=RUNNING #當前消息的狀態
task.targets[agent_id]['state']=RUNNING # 此處是指此消息中的agent是否執行的狀態的處理,若獲取了,則此處的狀態為RUNNING
return [task.id,task.script,task.timeout]
def add_result(self,payload:dict):
for task in self.itme_task:
if payload['code']==0:
task.targets[payload['agent_id']]['state']=SUCCEED # 此處是指對此消息進行處理,若code=0,則表示客戶端執行成功,若為1,則表示失敗
self.task_state+=1
else:
task.targets[payload['agent_id']]['state']= FAILED#
self.task_state+=1
if self.task_state==task.target_count:
task.state=SUCCEED
self.task_state=0
payload['agent_state']=task.targets[payload['agent_id']]['state']
log.info("當前消息內容為:{}".format(self.result))
self.result[payload['id']]=payload # 此處以task_id 為鍵,以payload為值進行處理
def get_result(self,task_id:dict):
task_id=task_id['task_id']
return self.result.get(task_id) # task_id,獲取對應的payload值
webappserver.py
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc
from aiohttp import request,web_response,web,log
CONN_URL="tcp://127.0.0.1:9000"
client=zerorpc.Client()
client.connect(CONN_URL)
async def targetshandler(request:web.Request):
txt=client.get_agents() #通過zerorpc調用master端接口
return web.json_response(txt) # 返回json端數據
app=web.Application()
app.router.add_get('/task/targets',targetshandler) # 使用get方法進行處理
async def taskhandler(request:web.Request):
j = await request.json()
txt=client.add_task(j)
return web.Response(text=txt,status=201)
app.router.add_post('/task',taskhandler)
async def taskresult(request:web.Request):
j = await request.json()
txt =client.get_result(j)
if txt['code'] !=0:
txt['output']='參數不正確,請重新輸入'
return web.json_response(txt)
app.router.add_post('/result',taskresult)
if __name__ == "__main__":
web.run_app(app,host='0.0.0.0',port=80)
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。