91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python怎么異步發送日志到遠程服務器

發布時間:2022-07-06 10:18:03 來源:億速云 閱讀:171 作者:iii 欄目:開發技術

本文小編為大家詳細介紹“Python怎么異步發送日志到遠程服務器”,內容詳細,步驟清晰,細節處理妥當,希望這篇“Python怎么異步發送日志到遠程服務器”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。

StreamHandler和FileHandler

首先我們先來寫一套簡單輸出到cmd和文件中的代碼:

# -*- coding: utf-8 -*-
"""
-------------------------------------------------
 File Name:   loger
 Description :
 Author :    yangyanxing
 date:     2020/9/23
-------------------------------------------------
"""
import logging
import sys
import os
# 初始化logger
logger = logging.getLogger("yyx")
logger.setLevel(logging.DEBUG)
# 設置日志格式
fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d
%H:%M:%S')
# 添加cmd handler
cmd_handler = logging.StreamHandler(sys.stdout)
cmd_handler.setLevel(logging.DEBUG)
cmd_handler.setFormatter(fmt)
# 添加文件的handler
logpath = os.path.join(os.getcwd(), 'debug.log')
file_handler = logging.FileHandler(logpath)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)
# 將cmd和file handler添加到logger中
logger.addHandler(cmd_handler)
logger.addHandler(file_handler)
logger.debug("今天天氣不錯")

先初始化一個logger, 并且設置它的日志級別是DEBUG,然后添初始化了 cmd_handler和 file_handler,最后將它們添加到logger中, 運行腳本,會在cmd中打印出

[2020-09-23 10:45:56] [DEBUG] 今天天氣不錯且會寫入到當前目錄下的debug.log文件中

添加HTTPHandler

如果想要在記錄時將日志發送到遠程服務器上,可以添加一個 HTTPHandler , 在python標準庫logging.handler中,已經為我們定義好了很多handler,有些我們可以直接用,本地使用tornado寫一個接收 日志的接口,將接收到的參數全都打印出來

# 添加一個httphandler
import logging.handlers
http_handler = logging.handlers.HTTPHandler(r"127.0.0.1:1987", '/api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天氣不錯")
結果在服務端我們收到了很多信息

{
'name': [b 'yyx'],
'msg': [b
'\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
'args': [b '()'],
'levelname': [b 'DEBUG'],
'levelno': [b '10'],
'pathname': [b 'I:/workplace/yangyanxing/test/loger.py'],
'filename': [b 'loger.py'],
'module': [b 'loger'],
'exc_info': [b 'None'],
'exc_text': [b 'None'],
'stack_info': [b 'None'],
'lineno': [b '41'],
'funcName': [b '<module>'],
'created': [b '1600831054.8881223'],
'msecs': [b '888.1223201751709'],
'relativeCreated': [b '22.99976348876953'],
'thread': [b '14876'],
'threadName': [b 'MainThread'],
'processName': [b 'MainProcess'],
'process': [b '8648'],
'message': [b
'\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
'asctime': [b '2020-09-23 11:17:34']
}

可以說是信息非常之多,但是卻并不是我們想要的樣子,我們只是想要類似于

[2020-09-23 10:45:56][DEBUG] 今天天氣不錯這樣的日志
logging.handlers.HTTPHandler 只是簡單的將日志所有信息發送給服務端,至于服務端要怎么組織內 容是由服務端來完成. 所以我們可以有兩種方法,一種是改服務端代碼,根據傳過來的日志信息重新組織一 下日志內容, 第二種是我們重新寫一個類,讓它在發送的時候將重新格式化日志內容發送到服務端。

我們采用第二種方法,因為這種方法比較靈活, 服務端只是用于記錄,發送什么內容應該是由客戶端來決定。

我們需要重新定義一個類,我們可以參考 logging.handlers.HTTPHandler 這個類,重新寫一個httpHandler類

每個日志類都需要重寫emit方法,記錄日志時真正要執行是也就是這個emit方法:

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  def emit(self, record):
    '''
   重寫emit方法,這里主要是為了把初始化時的baseParam添加進來
   :param record:
   :return:
   '''
    msg = self.format(record)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      requests.get(url, timeout=1)
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      requests.post(self.url, data={'log': msg}, headers=headers,
timeout=1)

上面代碼中有一行定義發送的參數 msg = self.format(record)這行代碼表示,將會根據日志對象設置的格式返回對應的內容。

之后再將內容通過requests庫進行發送,無論使用get 還是post方式,服務端都可以正常的接收到日志

{'log': [b'[2020-09-23 11:39:45] [DEBUG]
\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99']}

將bytes類型轉一下就得到了:

[2020-09-23 11:43:50] [DEBUG] 今天天氣不錯

異步的發送遠程日志

現在我們考慮一個問題,當日志發送到遠程服務器過程中,如果遠程服務器處理的很慢,會耗費一定的時間, 那么這時記錄日志就會都變慢修改服務器日志處理類,讓其停頓5秒鐘,模擬長時間的處理流程

async def post(self):
  print(self.getParam('log'))
  await asyncio.sleep(5)
  self.write({"msg": 'ok'})

此時我們再打印上面的日志:

logger.debug("今天天氣不錯")
logger.debug("是風和日麗的")

得到的輸出為:

[2020-09-23 11:47:33] [DEBUG] 今天天氣不錯
[2020-09-23 11:47:38] [DEBUG] 是風和日麗的

我們注意到,它們的時間間隔也是5秒。
那么現在問題來了,原本只是一個記錄日志,現在卻成了拖累整個腳本的累贅,所以我們需要異步的來 處理遠程寫日志。

1使用多線程處理

首先想的是應該是用多線程來執行發送日志方法;

def emit(self, record):
  msg = self.format(record)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = '&'
    else:
      sep = '?'
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    t = threading.Thread(target=requests.get, args=(url,))
    t.start()
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    t = threading.Thread(target=requests.post, args=(self.url,), kwargs=
{"data":{'log': msg},

這種方法是可以達到不阻塞主目的,但是每打印一條日志就需要開啟一個線程,也是挺浪費資源的。我們也 可以使用線程池來處理

2使用線程池處理

python 的 concurrent.futures 中有ThreadPoolExecutor, ProcessPoolExecutor類,是線程池和進程池, 就是在初始化的時候先定義幾個線程,之后讓這些線程來處理相應的函數,這樣不用每次都需要新創建線程

線程池的基本使用:

exector = ThreadPoolExecutor(max_workers=1) # 初始化一個線程池,只有一個線程
exector.submit(fn, args, kwargs) # 將函數submit到線程池中

如果線程池中有n個線程,當提交的task數量大于n時,則多余的task將放到隊列中。
再次修改上面的emit函數

exector = ThreadPoolExecutor(max_workers=1)
def emit(self, record):
  msg = self.format(record)
  timeout = aiohttp.ClientTimeout(total=6)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = '&'
    else:
      sep = '?'
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    exector.submit(requests.get, url, timeout=6)
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    exector.submit(requests.post, self.url, data={'log': msg},
headers=headers, timeout=6)

這里為什么要只初始化一個只有一個線程的線程池? 因為這樣的話可以保證先進隊列里的日志會先被發 送,如果池子中有多個線程,則不一定保證順序了。

3使用異步aiohttp庫來發送請求

上面的CustomHandler類中的emit方法使用的是requests.post來發送日志,這個requests本身是阻塞運 行的,也正上由于它的存在,才使得腳本卡了很長時間,所們我們可以將阻塞運行的requests庫替換為異步 的aiohttp來執行get和post方法, 重寫一個CustomHandler中的emit方法

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  async def emit(self, record):
    msg = self.format(record)
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
      async with session.get(self.url) as resp:
          print(await resp.text())
      else:
        headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
      async with session.post(self.url, data={'log': msg}) as resp:
          print(await resp.text())

這時代碼執行崩潰了:

C:\Python37\lib\logging\__init__.py:894: RuntimeWarning: coroutine
'CustomHandler.emit' was never awaited
self.emit(record)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

服務端也沒有收到發送日志的請求。
究其原因是由于emit方法中使用 async with session.post 函數,它需要在一個使用async 修飾的函數 里執行,所以修改emit函數,使用async來修飾,這里emit函數變成了異步的函數, 返回的是一個 coroutine 對象,要想執行coroutine對象,需要使用await, 但是腳本里卻沒有在哪里調用 await emit() ,所以崩潰信息 中顯示 coroutine 'CustomHandler.emit' was never awaited。

既然emit方法返回的是一個coroutine對象,那么我們將它放一個loop中執行

async def main():
  await logger.debug("今天天氣不錯")
  await logger.debug("是風和日麗的")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

執行依然報錯:

raise TypeError('An asyncio.Future, a coroutine or an awaitable is '

意思是需要的是一個coroutine,但是傳進來的對象不是。
這似乎就沒有辦法了,想要使用異步庫來發送,但是卻沒有可以調用await的地方。

解決辦法是有的,我們使用 asyncio.get_event_loop() 獲取一個事件循環對象, 我們可以在這個對象上注冊很多協程對象,這樣當執行事件循環的時候,就是去執行注冊在該事件循環上的協程,

我們通過一個小例子來看一下:

import asyncio
async def test(n):
 while n > 0:
   await asyncio.sleep(1)
   print("test {}".format(n))
   n -= 1
 return n

async def test2(n):
 while n >0:
   await asyncio.sleep(1)
   print("test2 {}".format(n))
   n -= 1
def stoploop(task):
 print("執行結束, task n is {}".format(task.result()))
 loop.stop()
loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task2 = loop.create_task(test2(3))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
loop.run_forever()

我們使用 loop = asyncio.get_event_loop() 創建了一個事件循環對象loop, 并且在loop上創建了兩個task, 并且給task1添加了一個回調函數,在task1它執行結束以后,將loop停掉。
注意看上面的代碼,我們并沒有在某處使用await來執行協程,而是通過將協程注冊到某個事件循環對象上, 然后調用該循環的 run_forever() 函數,從而使該循環上的協程對象得以正常的執行。

上面得到的輸出為:

test 5
test2 3
test 4
test2 2
test 3
test2 1
test 2
test 1
執行結束, task n is 0

可以看到,使用事件循環對象創建的task,在該循環執行run_forever() 以后就可以執行了如果不執行 loop.run_forever() 函數,則注冊在它上面的協程也不會執行

loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
time.sleep(5)
# loop.run_forever()

上面的代碼將loop.run_forever() 注釋掉,換成time.sleep(5) 停5秒, 這時腳本不會有任何輸出,在停了5秒 以后就中止了,
回到之前的日志發送遠程服務器的代碼,我們可以使用aiohttp封裝一個發送數據的函數, 然后在emit中將 這個函數注冊到全局的事件循環對象loop中,最后再執行loop.run_forever()

loop = asyncio.get_event_loop()
class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  # 使用aiohttp封裝發送數據函數
  async def submit(self, data):
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if self.url.find("?") >= 0:
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
data}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get(url) as resp:
          print(await resp.text())
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
        async with session.post(self.url, data={'log': data}) as resp:
          print(await resp.text())
    return True
  def emit(self, record):
    msg = self.format(record)
    loop.create_task(self.submit(msg))
# 添加一個httphandler
http_handler = CustomHandler(r"http://127.0.0.1:1987", 'api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天氣不錯")
logger.debug("是風和日麗的")
loop.run_forever()

這時腳本就可以正常的異步執行了:

loop.create_task(self.submit(msg)) 也可以使用
asyncio.ensure_future(self.submit(msg), loop=loop) 來代替,目的都是將協程對象注冊到事件循環中。

但這種方式有一點要注意,loop.run_forever() 將會一直阻塞,所以需要有個地方調用 loop.stop() 方法. 可以注冊到某個task的回調中。

讀到這里,這篇“Python怎么異步發送日志到遠程服務器”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

黄陵县| 鄂州市| 巴彦县| 安吉县| 桃江县| 琼结县| 彝良县| 金川县| 诸城市| 北宁市| 县级市| 仙桃市| 叶城县| 日照市| 泸州市| 鄂尔多斯市| 长岛县| 易门县| 沙河市| 长汀县| 泌阳县| 眉山市| 上思县| 新津县| 五指山市| 密云县| 安达市| 吴江市| 咸丰县| 浠水县| 准格尔旗| 巴林右旗| 兰西县| 镇安县| 湖南省| 曲周县| 岚皋县| 许昌市| 和田县| 金门县| 商洛市|