您好,登錄后才能下訂單哦!
這篇文章主要講解了“Python如何異步發送日志到遠程服務器”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Python如何異步發送日志到遠程服務器”吧!
首先我們先來寫一套簡單輸出到cmd和文件中的代碼:
# -*- coding: utf-8 -*- """ ------------------------------------------------- File Name: loger Description : Author : yangyanxing date: 2020/9/23 ------------------------------------------------- """ import logging import sys import os # 初始化logger logger = logging.getLogger("yyx") logger.setLevel(logging.DEBUG) # 設置日志格式 fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S') # 添加cmd handler cmd_handler = logging.StreamHandler(sys.stdout) cmd_handler.setLevel(logging.DEBUG) cmd_handler.setFormatter(fmt) # 添加文件的handler logpath = os.path.join(os.getcwd(), 'debug.log') file_handler = logging.FileHandler(logpath) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(fmt) # 將cmd和file handler添加到logger中 logger.addHandler(cmd_handler) logger.addHandler(file_handler) logger.debug("今天天氣不錯")
先初始化一個logger, 并且設置它的日志級別是DEBUG,然后添初始化了 cmd_handler和 file_handler,最后將它們添加到logger中, 運行腳本,會在cmd中打印出
[2020-09-23 10:45:56] [DEBUG] 今天天氣不錯
且會寫入到當前目錄下的debug.log文件中
如果想要在記錄時將日志發送到遠程服務器上,可以添加一個 HTTPHandler , 在python標準庫logging.handler中,已經為我們定義好了很多handler,有些我們可以直接用,本地使用tornado寫一個接收 日志的接口,將接收到的參數全都打印出來
# 添加一個httphandler import logging.handlers http_handler = logging.handlers.HTTPHandler(r"127.0.0.1:1987", '/api/log/get') http_handler.setLevel(logging.DEBUG) http_handler.setFormatter(fmt) logger.addHandler(http_handler) logger.debug("今天天氣不錯") 結果在服務端我們收到了很多信息 { 'name': [b 'yyx'], 'msg': [b '\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'], 'args': [b '()'], 'levelname': [b 'DEBUG'], 'levelno': [b '10'], 'pathname': [b 'I:/workplace/yangyanxing/test/loger.py'], 'filename': [b 'loger.py'], 'module': [b 'loger'], 'exc_info': [b 'None'], 'exc_text': [b 'None'], 'stack_info': [b 'None'], 'lineno': [b '41'], 'funcName': [b '<module>'], 'created': [b '1600831054.8881223'], 'msecs': [b '888.1223201751709'], 'relativeCreated': [b '22.99976348876953'], 'thread': [b '14876'], 'threadName': [b 'MainThread'], 'processName': [b 'MainProcess'], 'process': [b '8648'], 'message': [b '\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'], 'asctime': [b '2020-09-23 11:17:34'] }
可以說是信息非常之多,但是卻并不是我們想要的樣子,我們只是想要類似于
[2020-09-23 10:45:56][DEBUG] 今天天氣不錯
這樣的日志
logging.handlers.HTTPHandler 只是簡單的將日志所有信息發送給服務端,至于服務端要怎么組織內 容是由服務端來完成. 所以我們可以有兩種方法,一種是改服務端代碼,根據傳過來的日志信息重新組織一 下日志內容, 第二種是我們重新寫一個類,讓它在發送的時候將重新格式化日志內容發送到服務端。
我們采用第二種方法,因為這種方法比較靈活, 服務端只是用于記錄,發送什么內容應該是由客戶端來決定。
我們需要重新定義一個類,我們可以參考 logging.handlers.HTTPHandler 這個類,重新寫一個httpHandler類
每個日志類都需要重寫emit方法,記錄日志時真正要執行是也就是這個emit方法:
class CustomHandler(logging.Handler): def __init__(self, host, uri, method="POST"): logging.Handler.__init__(self) self.url = "%s/%s" % (host, uri) method = method.upper() if method not in ["GET", "POST"]: raise ValueError("method must be GET or POST") self.method = method def emit(self, record): ''' 重寫emit方法,這里主要是為了把初始化時的baseParam添加進來 :param record: :return: ''' msg = self.format(record) if self.method == "GET": if (self.url.find("?") >= 0): sep = '&' else: sep = '?' url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg})) requests.get(url, timeout=1) else: headers = { "Content-type": "application/x-www-form-urlencoded", "Content-length": str(len(msg)) } requests.post(self.url, data={'log': msg}, headers=headers, timeout=1)
上面代碼中有一行定義發送的參數 msg = self.format(record)這行代碼表示,將會根據日志對象設置的格式返回對應的內容。
之后再將內容通過requests庫進行發送,無論使用get 還是post方式,服務端都可以正常的接收到日志
{'log': [b'[2020-09-23 11:39:45] [DEBUG] \xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99']}
將bytes類型轉一下就得到了:
[2020-09-23 11:43:50] [DEBUG] 今天天氣不錯
異步的發送遠程日志
現在我們考慮一個問題,當日志發送到遠程服務器過程中,如果遠程服務器處理的很慢,會耗費一定的時間, 那么這時記錄日志就會都變慢修改服務器日志處理類,讓其停頓5秒鐘,模擬長時間的處理流程
async def post(self): print(self.getParam('log')) await asyncio.sleep(5) self.write({"msg": 'ok'})
此時我們再打印上面的日志:
logger.debug("今天天氣不錯") logger.debug("是風和日麗的")
得到的輸出為:
[2020-09-23 11:47:33] [DEBUG] 今天天氣不錯
[2020-09-23 11:47:38] [DEBUG] 是風和日麗的
我們注意到,它們的時間間隔也是5秒。
那么現在問題來了,原本只是一個記錄日志,現在卻成了拖累整個腳本的累贅,所以我們需要異步的來 處理遠程寫日志。
首先想的是應該是用多線程來執行發送日志方法;
def emit(self, record): msg = self.format(record) if self.method == "GET": if (self.url.find("?") >= 0): sep = '&' else: sep = '?' url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg})) t = threading.Thread(target=requests.get, args=(url,)) t.start() else: headers = { "Content-type": "application/x-www-form-urlencoded", "Content-length": str(len(msg)) } t = threading.Thread(target=requests.post, args=(self.url,), kwargs= {"data":{'log': msg},
這種方法是可以達到不阻塞主目的,但是每打印一條日志就需要開啟一個線程,也是挺浪費資源的。我們也 可以使用線程池來處理
python 的 concurrent.futures 中有ThreadPoolExecutor, ProcessPoolExecutor類,是線程池和進程池, 就是在初始化的時候先定義幾個線程,之后讓這些線程來處理相應的函數,這樣不用每次都需要新創建線程
線程池的基本使用:
exector = ThreadPoolExecutor(max_workers=1) # 初始化一個線程池,只有一個線程 exector.submit(fn, args, kwargs) # 將函數submit到線程池中
如果線程池中有n個線程,當提交的task數量大于n時,則多余的task將放到隊列中。
再次修改上面的emit函數
exector = ThreadPoolExecutor(max_workers=1) def emit(self, record): msg = self.format(record) timeout = aiohttp.ClientTimeout(total=6) if self.method == "GET": if (self.url.find("?") >= 0): sep = '&' else: sep = '?' url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg})) exector.submit(requests.get, url, timeout=6) else: headers = { "Content-type": "application/x-www-form-urlencoded", "Content-length": str(len(msg)) } exector.submit(requests.post, self.url, data={'log': msg}, headers=headers, timeout=6)
這里為什么要只初始化一個只有一個線程的線程池? 因為這樣的話可以保證先進隊列里的日志會先被發 送,如果池子中有多個線程,則不一定保證順序了。
上面的CustomHandler類中的emit方法使用的是requests.post來發送日志,這個requests本身是阻塞運 行的,也正上由于它的存在,才使得腳本卡了很長時間,所們我們可以將阻塞運行的requests庫替換為異步 的aiohttp來執行get和post方法, 重寫一個CustomHandler中的emit方法
class CustomHandler(logging.Handler): def __init__(self, host, uri, method="POST"): logging.Handler.__init__(self) self.url = "%s/%s" % (host, uri) method = method.upper() if method not in ["GET", "POST"]: raise ValueError("method must be GET or POST") self.method = method async def emit(self, record): msg = self.format(record) timeout = aiohttp.ClientTimeout(total=6) if self.method == "GET": if (self.url.find("?") >= 0): sep = '&' else: sep = '?' url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg})) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(self.url) as resp: print(await resp.text()) else: headers = { "Content-type": "application/x-www-form-urlencoded", "Content-length": str(len(msg)) } async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session: async with session.post(self.url, data={'log': msg}) as resp: print(await resp.text())
這時代碼執行崩潰了:
C:\Python37\lib\logging\__init__.py:894: RuntimeWarning: coroutine 'CustomHandler.emit' was never awaited self.emit(record) RuntimeWarning: Enable tracemalloc to get the object allocation traceback
服務端也沒有收到發送日志的請求。
究其原因是由于emit方法中使用 async with session.post 函數,它需要在一個使用async 修飾的函數 里執行,所以修改emit函數,使用async來修飾,這里emit函數變成了異步的函數, 返回的是一個 coroutine 對象,要想執行coroutine對象,需要使用await, 但是腳本里卻沒有在哪里調用 await emit() ,所以崩潰信息 中顯示 coroutine 'CustomHandler.emit' was never awaited。
既然emit方法返回的是一個coroutine對象,那么我們將它放一個loop中執行
async def main(): await logger.debug("今天天氣不錯") await logger.debug("是風和日麗的") loop = asyncio.get_event_loop() loop.run_until_complete(main())
執行依然報錯:
raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
意思是需要的是一個coroutine,但是傳進來的對象不是。
這似乎就沒有辦法了,想要使用異步庫來發送,但是卻沒有可以調用await的地方。
解決辦法是有的,我們使用 asyncio.get_event_loop() 獲取一個事件循環對象, 我們可以在這個對象上注冊很多協程對象,這樣當執行事件循環的時候,就是去執行注冊在該事件循環上的協程,
我們通過一個小例子來看一下:
import asyncio async def test(n): while n > 0: await asyncio.sleep(1) print("test {}".format(n)) n -= 1 return n async def test2(n): while n >0: await asyncio.sleep(1) print("test2 {}".format(n)) n -= 1 def stoploop(task): print("執行結束, task n is {}".format(task.result())) loop.stop() loop = asyncio.get_event_loop() task = loop.create_task(test(5)) task2 = loop.create_task(test2(3)) task.add_done_callback(stoploop) task2 = loop.create_task(test2(3)) loop.run_forever()
我們使用 loop = asyncio.get_event_loop() 創建了一個事件循環對象loop, 并且在loop上創建了兩個task, 并且給task1添加了一個回調函數,在task1它執行結束以后,將loop停掉。
注意看上面的代碼,我們并沒有在某處使用await來執行協程,而是通過將協程注冊到某個事件循環對象上, 然后調用該循環的 run_forever() 函數,從而使該循環上的協程對象得以正常的執行。
上面得到的輸出為:
test 5
test2 3
test 4
test2 2
test 3
test2 1
test 2
test 1
執行結束, task n is 0
可以看到,使用事件循環對象創建的task,在該循環執行run_forever() 以后就可以執行了如果不執行 loop.run_forever() 函數,則注冊在它上面的協程也不會執行
loop = asyncio.get_event_loop() task = loop.create_task(test(5)) task.add_done_callback(stoploop) task2 = loop.create_task(test2(3)) time.sleep(5) # loop.run_forever()
上面的代碼將loop.run_forever() 注釋掉,換成time.sleep(5) 停5秒, 這時腳本不會有任何輸出,在停了5秒 以后就中止了,
回到之前的日志發送遠程服務器的代碼,我們可以使用aiohttp封裝一個發送數據的函數, 然后在emit中將 這個函數注冊到全局的事件循環對象loop中,最后再執行loop.run_forever()
loop = asyncio.get_event_loop() class CustomHandler(logging.Handler): def __init__(self, host, uri, method="POST"): logging.Handler.__init__(self) self.url = "%s/%s" % (host, uri) method = method.upper() if method not in ["GET", "POST"]: raise ValueError("method must be GET or POST") self.method = method # 使用aiohttp封裝發送數據函數 async def submit(self, data): timeout = aiohttp.ClientTimeout(total=6) if self.method == "GET": if self.url.find("?") >= 0: sep = '&' else: sep = '?' url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": data})) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(url) as resp: print(await resp.text()) else: headers = { "Content-type": "application/x-www-form-urlencoded", } async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session: async with session.post(self.url, data={'log': data}) as resp: print(await resp.text()) return True def emit(self, record): msg = self.format(record) loop.create_task(self.submit(msg)) # 添加一個httphandler http_handler = CustomHandler(r"http://127.0.0.1:1987", 'api/log/get') http_handler.setLevel(logging.DEBUG) http_handler.setFormatter(fmt) logger.addHandler(http_handler) logger.debug("今天天氣不錯") logger.debug("是風和日麗的") loop.run_forever()
這時腳本就可以正常的異步執行了:
loop.create_task(self.submit(msg)) 也可以使用
asyncio.ensure_future(self.submit(msg), loop=loop) 來代替,目的都是將協程對象注冊到事件循環中。
但這種方式有一點要注意,loop.run_forever() 將會一直阻塞,所以需要有個地方調用 loop.stop() 方法. 可以注冊到某個task的回調中。
感謝各位的閱讀,以上就是“Python如何異步發送日志到遠程服務器”的內容了,經過本文的學習后,相信大家對Python如何異步發送日志到遠程服務器這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。