您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關python中Paramiko的SSH怎么用,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
說明
1、將所有設備信息寫入文本文檔。
簡單地使用txt,將登錄信息構建成字典。
2、初始化SSH連接和執行命令。
3、分析此需求指定的命令和輸出結果。
將結果存儲在文件中。
4、增加多線程執行。
提高效率。
5、添加Linux的crontab。
每小時收集一次信息(服務器配置)
實例
#!/usr/bin/env python # -*- coding:utf-8 -*- import re import time from concurrent.futures import ThreadPoolExecutor import paramiko def get_device_list(filename): """從文本文件讀取設備列表,返回由字典組成的列表。 文本內容格式為:ip,用戶名,密碼,別名,例如: 1.1.1.1 admin admin sw1 1.1.1.2 admin admin sw2 ...... Args: filename ([str]): 文件名稱 """ with open(filename, 'r') as f: device_list = [] for line in f.readlines(): ip, username, password, name = line.strip().split() device_list.append( { "ip": ip, "username": username, "password": password, "name": name, } ) return device_list class NetworkDevice(object): def __init__(self, ip="", username="", password="'", name="", port=22,): self.conn = None if ip: self.ip = ip.strip() elif name: self.name = name.strip() else: raise ValueError("需要設備連接地址(ip 或 別名)") self.port = int(port) self.username = username self.password = password self._open_ssh() def _open_ssh(self): """初始化 SSH 連接,調起一個模擬終端,會話結束前可以一直執行命令。 Raises: e: 拋出 paramiko 連接失敗的任何異常 """ ssh_connect_params = { "hostname": self.ip, "port": self.port, "username": self.username, "password": self.password, "look_for_keys": False, "allow_agent": False, "timeout": 5, # TCP 連接超時時間 } conn = paramiko.SSHClient() conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: conn.connect(**ssh_connect_params) except Exception as e: raise e self.conn = conn.invoke_shell(term="vt100", width=500, height=1000) return "" def exec_cmd(self, cmd, recv_time=3): """登錄設備,執行命令 Args: cmd ([type]): 命令字符串 recv_time (int, optional): 讀取回顯信息的超時時間. Defaults to 3. Raises: EOFError: 沒有任何信息輸出,說明連接失敗。 Returns: output: """ cmd = cmd.strip() + "\n" self.conn.sendall("screen disable\n") self.conn.sendall(cmd) time.sleep(int(recv_time)) output = self.conn.recv(1024*1024) if len(output) == 0: raise EOFError("連接可能被關閉,沒有任何信息輸出") return output.decode('utf-8', 'ignore') dev = { "ip":"192.168.56.21", "username":"netdevops", "password":"Admin@h4c.com", "name": "sw1" } # sw1 = NetworkDevice(**dev) # ret = sw1.exec_cmd("dis version") # print(ret) def parse_interface_drop(output): """把設備的輸出隊列丟包信息解析成累加值 命令及輸出示例如下: # [H3C]dis qos queue-statistics interface outbound | in "^ Drop" # Dropped: 0 packets, 0 bytes """ ptn = re.compile(r"\s(\S+):\s+(\d+)\s+(\S+),\s+(\d+)\s+(\S+)") count = 0 for i in ptn.findall(output): count += int(i[1]) return count def run(cmd, **conn_parms): """登錄單臺設備,執行指定命令,解析丟包統計 """ sw = NetworkDevice(**conn_parms) output = sw.exec_cmd(cmd) drop_count = parse_interface_drop(output) return "%s %s %s"%( conn_parms.get("name"), conn_parms.get("ip"), drop_count) # cmd = r'dis qos queue-statistics interface outbound | in "^ Drop"' # ret = run(cmd,**dev) # print(ret) if __name__== "__main__": """獲取設備列表,使用多線程登錄設備獲取信息并返回 """ with ThreadPoolExecutor(10) as pool: futures = [] cmd = r'dis qos queue-statistics interface outbound | in "^ Drop"' dev_info = get_device_list("./iplist.txt") for d in dev_info: future = pool.submit(run, cmd, **d) futures.append(future) # for f in futures: # print(f.result()) # 根據執行時間把結果寫入文件,精確到小時 with open("./drops/%s.log"%time.strftime("%Y%m%d_%H"),'w') as f: for line in futures: f.write(line.result() + "\n")
關于“python中Paramiko的SSH怎么用”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。