您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關利用python怎么實現一個HTTP連接池,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
首先, HTTP連接是基于TCP連接的, 與服務器之間進行HTTP通信, 本質就是與服務器之間建立了TCP連接后, 相互收發基于HTTP協議的數據包. 因此, 如果我們需要頻繁地去請求某個服務器的資源, 我們就可以一直維持與個服務器的TCP連接不斷開, 然后在需要請求資源的時候, 把連接拿出來用就行了.
一個項目可能需要與服務器之間同時保持多個連接, 比如一個爬蟲項目, 有的線程需要請求服務器的網頁資源, 有的線程需要請求服務器的圖片等資源, 而這些請求都可以建立在同一條TCP連接上.
因此, 我們使用一個管理器來對這些連接進行管理, 任何程序需要使用這些連接時, 向管理器申請就可以了, 等到用完之后再將連接返回給管理器, 以供其他程序重復使用, 這個管理器就是連接池.
基于上一章的分析, 連接池應該是一個收納連接的容器, 同時對這些連接有管理能力:
class HTTPConnectionPool: def __init__(self, host: str, port: int = None, max_size: int = None, idle_timeout: int = None) -> None: """ :param host: pass :param port: pass :param max_size: 同時存在的最大連接數, 默認None->連接數無限,沒了就創建 :param idle_timeout: 單個連接單次最長空閑時間,超時自動關閉,默認None->不限時 """ self.host = host self.port = port self.max_size = max_size self.idle_timeout = idle_timeout self._lock = threading.Condition() self._pool = [] # 這里的conn_num指的是總連接數,包括其它線程拿出去正在使用的連接 self.conn_num = 0 self.is_closed = False def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection: ... def release(self, conn: WrapperHTTPConnection) -> None: ...
因此, 我們定義這樣一個HTTPConnectionPool類, 使用一個列表來保存可用的連接. 對于外部來說, 只需要調用這個連接池對象的acquire和release方法就能取得和釋放連接.
對于線程池內部來說, 至少需要三個關于連接的操作: 從連接池中取得連接, 將連接放回連接池, 以及創建一個連接:
def _get_connection(self) -> WrapperHTTPConnection: # 這個方法會把連接從_idle_conn移動到_used_conn列表中,并返回這個連接 try: return self._pool.pop() except IndexError: raise EmptyPoolError def _put_connection(self, conn: WrapperHTTPConnection) -> None: self._pool.append(conn) def _create_connection(self) -> WrapperHTTPConnection: self.conn_num += 1 return WrapperHTTPConnection(self, HTTPConnection(self.host, self.port))
對于連接池外部來說, 主要有申請連接和釋放連接這兩個操作, 實際上這就是個簡單的生產者消費者模型. 考慮到外部可能是多線程的環境, 我們使用threading.Condition來保證線程安全. 關于Condition的資料可以看這里.
def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection: if self.is_closed: raise ConnectionPoolClosed with self._lock: if self.max_size is None or not self.is_full(): # 在還能創建新連接的情況下,如果沒有空閑連接,直接創建一個就行了 if self.is_pool_empty(): self._put_connection(self._create_connection()) else: # 不能創建新連接的情況下,如果設置了blocking=False,沒連接就報錯 # 否則,就基于timeout進行阻塞,直到超時或者有可用連接為止 if not blocking: if self.is_pool_empty(): raise EmptyPoolError elif timeout is None: while self.is_pool_empty(): self._lock.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: end_time = time.time() + timeout while self.is_pool_empty(): remaining = end_time - time.time() if remaining <= 0: raise EmptyPoolError self._lock.wait(remaining) # 走到這一步了,池子里一定有空閑連接 return self._get_connection() def release(self, conn: WrapperHTTPConnection) -> None: if self.is_closed: # 如果這個連接是在連接池關閉后才釋放的,那就不用回連接池了,直接放生 conn.close() return # 實際上,python列表的append操作是線程安全的,可以不加鎖 # 這里調用鎖是為了通過notify方法通知其它正在wait的線程:現在有連接可用了 with self._lock: if not conn.is_available: # 如果這個連接不可用了,就應該創建一個新連接放進去,因為可能還有其它線程在等著連接用 conn.close() self.conn_num -= 1 conn = self._create_connection() self._put_connection(conn) self._lock.notify()
我們首先看看acquire方法, 這個方法其實就是在申請到鎖之后調用內部的_get_connection方法獲取連接, 這樣就線程安全了. 需要注意的是, 如果當前的條件無法獲取連接, 就會調用條件變量的wait方法, 及時釋放鎖并阻塞住當前線程. 然后, 當其它線程作為生產者調用release方法釋放連接時, 會觸發條件變量的notify方法, 從而喚醒一個阻塞在wait階段的線程, 即消費者. 這個消費者再從池中取出剛放回去的線程, 這樣整個生產者消費者模型就運轉起來了.
對于一個程序來說, 它使用連接池的形式是獲取連接->使用連接->釋放連接. 因此, 我們應該通過with語句來管理這個連接, 以免在程序的最后遺漏掉釋放連接這一步驟.
基于這個原因, 我們通過一個WrapperHTTPConnection類來對HTTPConnection進行封裝, 以實現上下文管理器的功能. HTTPConnection的代碼可以看《用python實現一個HTTP客戶端》這篇文章.
class WrapperHTTPConnection: def __init__(self, pool: 'HTTPConnectionPool', conn: HTTPConnection) -> None: self.pool = pool self.conn = conn self.response = None self.is_available = True def __enter__(self) -> 'WrapperHTTPConnection': return self def __exit__(self, *exit_info: Any) -> None: # 如果response沒讀完并且連接需要復用,就棄用這個連接 if not self.response.will_close and not self.response.is_closed(): self.close() self.pool.release(self) def request(self, *args: Any, **kwargs: Any) -> HTTPResponse: self.conn.request(*args, **kwargs) self.response = self.conn.get_response() return self.response def close(self) -> None: self.conn.close() self.is_available = False
同樣的, 連接池可能也需要關閉, 因此我們給連接池也加上上下文管理器的功能:
class HTTPConnectionPool: ... def close(self) -> None: if self.is_closed: return self.is_closed = True pool, self._pool = self._pool, None for conn in pool: conn.close() def __enter__(self) -> 'HTTPConnectionPool': return self def __exit__(self, *exit_info: Any) -> None: self.close()
這樣, 我們就可以通過with語句優雅地管理連接池了:
with HTTPConnectionPool(**kwargs) as pool: with pool.acquire() as conn: res = conn.request('GET', '/') ...
如果一個連接池的所需連接數是隨時間變化的, 那么就會出現一種情況: 在高峰期, 我們創建了非常多的連接, 然后進入低谷期之后, 連接過剩, 大量的連接處于空閑狀態, 浪費資源. 因此, 我們可以設置一個定時任務, 定期清理空閑時間過長的連接, 減少連接池的資源占用.
首先, 我們需要為連接對象添加一個last_time屬性, 每當連接釋放進入連接池后, 就修改這個屬性的值為當前時間, 這樣我們就能明確知道, 連接池內的每個空閑連接空閑了多久:
class WrapperHTTPConnection: ... def __init__(self, pool: 'HTTPConnectionPool', conn: HTTPConnection) -> None: ... self.last_time = None class HTTPConnectionPool: ... def _put_connection(self, conn: WrapperHTTPConnection) -> None: conn.last_time = time.time() self._pool.append(conn)
然后, 我們通過threading.Timer來實現一個定時任務:
def start_clear_conn(self) -> None: if self.idle_timeout is None: # 如果空閑連接的超時時間為無限,那么就不應該清理連接 return self.clear_idle_conn() self._clearer = threading.Timer(self.idle_timeout, self.start_clear_conn) self._clearer.start() def stop_clear_conn(self) -> None: if self._clearer is not None: self._clearer.cancel()
threading.Timer只會執行一次定時任務, 因此, 我們需要在start_clear_conn中不斷地把自己設置為定時任務. 這其實等同于新開了一個線程來執行start_clear_conn方法, 因此并不會出現遞歸過深問題. 不過需要注意的是, threading.Timer雖然不會阻塞當前線程, 但是卻會阻止當前線程結束, 就算把它設置為守護線程都不行, 唯一可行的辦法就是調用stop_clear_conn方法取消這個定時任務.
最后, 我們定義clear_idle_conn方法來清理閑置時間超時的連接:
def clear_idle_conn(self) -> None: if self.is_closed: raise ConnectionPoolClosed # 這里開一個新線程來清理空閑連接,避免了阻塞主線程導致的定時精度出錯 threading.Thread(target=self._clear_idle_conn).start() def _clear_idle_conn(self) -> None: if not self._lock.acquire(timeout=self.idle_timeout): # 因為是每隔self.idle_timeout秒檢查一次 # 如果過了self.idle_timeout秒還沒申請到鎖,下一次都開始了,本次也就不用繼續了 return current_time = time.time() if self.is_pool_empty(): pass elif current_time - self._pool[-1].last_time >= self.idle_timeout: # 這里處理下面的二分法沒法處理的邊界情況,即所有連接都閑置超時的情況 self.conn_num -= len(self._pool) self._pool.clear() else: # 通過二分法找出從左往右第一個不超時的連接的指針 left, right = 0, len(self._pool) - 1 while left < right: mid = (left + right) // 2 if current_time - self._pool[mid].last_time >= self.idle_timeout: left = mid + 1 else: right = mid self._pool = self._pool[left:] self.conn_num -= left self._lock.release()
由于我們獲取和釋放連接都是從self._pool的尾部開始操作的, 因此self._pool這個容器是一個先進后出隊列, 它里面放著的連接, 一定是越靠近頭部的閑置時間越長, 從頭到尾閑置時間依次遞減. 基于這個原因, 我們使用二分法來找出列表中第一個沒有閑置超時的連接, 然后把在它之前的連接一次性刪除, 這樣就能達到O(logN)的時間復雜度, 算是一種比較高效的方法. 需要注意的是, 如果連接池內所有的連接都是超時的, 那么這種方法是刪不干凈的, 需要對這種邊界情況單獨處理.
這個連接池的完整代碼如下:
import threading import time from typing import Any from client import HTTPConnection, HTTPResponse class WrapperHTTPConnection: def __init__(self, pool: 'HTTPConnectionPool', conn: HTTPConnection) -> None: self.pool = pool self.conn = conn self.response = None self.last_time = time.time() self.is_available = True def __enter__(self) -> 'WrapperHTTPConnection': return self def __exit__(self, *exit_info: Any) -> None: # 如果response沒讀完并且連接需要復用,就棄用這個連接 if not self.response.will_close and not self.response.is_closed(): self.close() self.pool.release(self) def request(self, *args: Any, **kwargs: Any) -> HTTPResponse: self.conn.request(*args, **kwargs) self.response = self.conn.get_response() return self.response def close(self) -> None: self.conn.close() self.is_available = False class HTTPConnectionPool: def __init__(self, host: str, port: int = None, max_size: int = None, idle_timeout: int = None) -> None: """ :param host: pass :param port: pass :param max_size: 同時存在的最大連接數, 默認None->連接數無限,沒了就創建 :param idle_timeout: 單個連接單次最長空閑時間,超時自動關閉,默認None->不限時 """ self.host = host self.port = port self.max_size = max_size self.idle_timeout = idle_timeout self._lock = threading.Condition() self._pool = [] # 這里的conn_num指的是總連接數,包括其它線程拿出去正在使用的連接 self.conn_num = 0 self.is_closed = False self._clearer = None self.start_clear_conn() def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection: if self.is_closed: raise ConnectionPoolClosed with self._lock: if self.max_size is None or not self.is_full(): # 在還能創建新連接的情況下,如果沒有空閑連接,直接創建一個就行了 if self.is_pool_empty(): self._put_connection(self._create_connection()) else: # 不能創建新連接的情況下,如果設置了blocking=False,沒連接就報錯 # 否則,就基于timeout進行阻塞,直到超時或者有可用連接為止 if not blocking: if self.is_pool_empty(): raise EmptyPoolError elif timeout is None: while self.is_pool_empty(): self._lock.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: end_time = time.time() + timeout while self.is_pool_empty(): remaining = end_time - time.time() if remaining <= 0: raise EmptyPoolError self._lock.wait(remaining) # 走到這一步了,池子里一定有空閑連接 return self._get_connection() def release(self, conn: WrapperHTTPConnection) -> None: if self.is_closed: # 如果這個連接是在連接池關閉后才釋放的,那就不用回連接池了,直接放生 conn.close() return # 實際上,python列表的append操作是線程安全的,可以不加鎖 # 這里調用鎖是為了通過notify方法通知其它正在wait的線程:現在有連接可用了 with self._lock: if not conn.is_available: # 如果這個連接不可用了,就應該創建一個新連接放進去,因為可能還有其它線程在等著連接用 conn.close() self.conn_num -= 1 conn = self._create_connection() self._put_connection(conn) self._lock.notify() def _get_connection(self) -> WrapperHTTPConnection: # 這個方法會把連接從_idle_conn移動到_used_conn列表中,并返回這個連接 try: return self._pool.pop() except IndexError: raise EmptyPoolError def _put_connection(self, conn: WrapperHTTPConnection) -> None: conn.last_time = time.time() self._pool.append(conn) def _create_connection(self) -> WrapperHTTPConnection: self.conn_num += 1 return WrapperHTTPConnection(self, HTTPConnection(self.host, self.port)) def is_pool_empty(self) -> bool: # 這里指的是,空閑可用的連接是否為空 return len(self._pool) == 0 def is_full(self) -> bool: if self.max_size is None: return False return self.conn_num >= self.max_size def close(self) -> None: if self.is_closed: return self.is_closed = True self.stop_clear_conn() pool, self._pool = self._pool, None for conn in pool: conn.close() def clear_idle_conn(self) -> None: if self.is_closed: raise ConnectionPoolClosed # 這里開一個新線程來清理空閑連接,避免了阻塞主線程導致的定時精度出錯 threading.Thread(target=self._clear_idle_conn).start() def _clear_idle_conn(self) -> None: if not self._lock.acquire(timeout=self.idle_timeout): # 因為是每隔self.idle_timeout秒檢查一次 # 如果過了self.idle_timeout秒還沒申請到鎖,下一次都開始了,本次也就不用繼續了 return current_time = time.time() if self.is_pool_empty(): pass elif current_time - self._pool[-1].last_time >= self.idle_timeout: # 這里處理下面的二分法沒法處理的邊界情況,即所有連接都閑置超時的情況 self.conn_num -= len(self._pool) self._pool.clear() else: # 通過二分法找出從左往右第一個不超時的連接的指針 left, right = 0, len(self._pool) - 1 while left < right: mid = (left + right) // 2 if current_time - self._pool[mid].last_time >= self.idle_timeout: left = mid + 1 else: right = mid self._pool = self._pool[left:] self.conn_num -= left self._lock.release() def start_clear_conn(self) -> None: if self.idle_timeout is None: # 如果空閑連接的超時時間為無限,那么就不應該清理連接 return self.clear_idle_conn() self._clearer = threading.Timer(self.idle_timeout, self.start_clear_conn) self._clearer.start() def stop_clear_conn(self) -> None: if self._clearer is not None: self._clearer.cancel() def __enter__(self) -> 'HTTPConnectionPool': return self def __exit__(self, *exit_info: Any) -> None: self.close() class EmptyPoolError(Exception): pass class ConnectionPoolClosed(Exception): pass
首先, 這個連接池的核心就是對連接進行管理, 而這包含取出連接和釋放連接兩個過程. 因此這東西的本質就是一個生產者消費者模型, 取出線程時是消費者, 放入線程時是生產者, 使用threading自帶的Condition對象就能完美解決線程安全問題, 使二者協同合作.
解決獲取連接和釋放連接這個問題之后, 其實這個連接池就已經能用了. 但是如果涉及到更多細節方面的東西, 比如判斷連接是否可用, 自動釋放連接, 清理閑置連接等等, 就需要對這個連接進行封裝, 為它添加更多的屬性和方法, 這就引入了WrapperHTTPConnection這個類. 實現它的__enter___和__exit__方法之后, 就能使用上下文管理器來自動釋放連接. 至于清理閑置連接, 通過last_time屬性記錄每個連接的最后釋放時間, 然后在連接池中添加一個定時任務就行了.
關于利用python怎么實現一個HTTP連接池就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。