您好,登錄后才能下訂單哦!
這篇文章主要介紹“Python如何實時獲取任務請求對應的Nginx日志”,在日常操作中,相信很多人在Python如何實時獲取任務請求對應的Nginx日志問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Python如何實時獲取任務請求對應的Nginx日志”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
項目需求測試過程中,需要向Nginx服務器發送一些用例請求,然后查看對應的Nginx日志,判斷是否存在特征內容,來判斷任務是否執行成功。為了提升效率,需要將這一過程實現自動化。
Python 3.6.5
代碼設計與實現
#!/usr/bin/env python # -*- coding:utf-8 -*- ''' @CreateTime: 2021/06/26 9:05 @Author : shouke ''' import time import threading import subprocess from collections import deque def collect_nginx_log(): global nginx_log_queue global is_tasks_compete global task_status args = 'tail -0f /usr/local/openresty/nginx/logs/access.log' while task_status != 'req_log_got': with subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines = True) as proc: log_for_req = '' outs, errs = '', '' try: outs, errs = proc.communicate(timeout=2) except subprocess.TimeoutExpired: print('獲取nginx日志超時,正在重試') proc.kill() try: outs, errs = proc.communicate(timeout=5) except subprocess.TimeoutExpired: print('獲取nginx日志超時,再次超時,停止重試') break finally: for line in outs.split('\n'): flag = '\"client_ip\":\"10.118.0.77\"' # 特征 if flag in line: # 查找包含特征內容的日志 log_for_req += line if task_status == 'req_finished': nginx_log_queue.append(log_for_req) task_status = 'req_log_got' def run_tasks(task_list): ''' 運行任務 :param task_list 任務列表 ''' global nginx_log_queue global is_tasks_compete global task_status for task in task_list: thread = threading.Thread(target=collect_nginx_log, name="collect_nginx_log") thread.start() time.sleep(1) # 執行任務前,讓收集日志線程先做好準備 print('正在執行任務:%s' % task.get('name')) # 執行Nginx任務請求 # ... task_status = 'req_finished' time_to_wait = 0.1 while task_status != 'req_log_got': # 請求觸發的nginx日志收集未完成 time.sleep(time_to_wait) time_to_wait += 0.01 else:# 獲取到用例請求觸發的nginx日志 if nginx_log_queue: nginx_log = nginx_log_queue.popleft() task_status = 'req_ready' # 解析日志 # do something here # ... else: print('存儲請求日志的隊列為空') # do something here # ... if __name__ == '__main__': nginx_log_queue = deque() is_tasks_compete = False # 所有任務是否執行完成 task_status = 'req_ready' # req_ready,req_finished,req_log_got # 存放執行次任務任務的一些狀態 print('###########################任務開始###########################') tast_list = [{'name':'test_task', 'other':'...'}] run_tasks(tast_list) is_tasks_compete = True current_active_thread_num = len(threading.enumerate()) while current_active_thread_num != 1: time.sleep(2) current_active_thread_num = len(threading.enumerate()) print('###########################任務完成###########################')
注意:
1、上述代碼為啥不一步到位,直接 tail -0f /usr/local/openresty/nginx/logs/access.log | grep "特征內容"
呢?這是因為這樣做無法獲取到Nginx的日志
2、實踐時發現,第一次執行proc.communicate(timeout=2)
獲取日志時,總是無法獲取,會超時,需要二次獲取,并且timeout
設置太小時(實踐時嘗試過設置為1秒
),也會導致第二次執行時無法獲取Nginx日志。
到此,關于“Python如何實時獲取任務請求對應的Nginx日志”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。