您好,登錄后才能下訂單哦!
進程的創建
進程 VS 程序
編寫完畢的代碼,在沒有運行的時候,稱之為程序
正在運行著的代碼,就成為進程
注意: 進程,除了包含代碼以外,還有需要運行的環境等,所以和程序是有區別的
進程的創建
創建子進程:
Python的os模塊封裝了常?的系統調用,其中就包括fork,可以在Python程 序中
輕松創建子進程:
import os
import time
#定義一個全局變量money
money = 100
print("當前進程的pid:", os.getpid())
print("當前進程的父進程pid:", os.getppid())
#time.sleep(115)
p = os.fork()
#子進程返回的是0
if p == 0:
money = 200
print("子進程返回的信息, money=%d" %(money))
#父進程返回的是子進程的pid
else:
print("創建子進程%s, 父進程是%d" %(p, os.getppid()))
print(money)
Process([group [, target [, name [, args [, kwargs]]]]])
target:表示這個進程實例所調?對象;
args:表示調?對象的位置參數元組;
kwargs:表示調?對象的關鍵字參數字典;
name:為當前進程實例的別名;
group:?多數情況下?不到;
Process類常??法:
is_alive(): 判斷進程實例是否還在執?;
join([timeout]): 是否等待進程實例執?結束,或等待多少秒;
start(): 啟動進程實例(創建?進程);
run(): 如果沒有給定target參數,對這個對象調?start()?法時,
就將執 ?對象中的run()?法;
terminate(): 不管任務是否完成,?即終?;
from multiprocessing import Process
import time
def task1():
print("正在聽音樂")
time.sleep(1)
def task2():
print("正在編程......")
time.sleep(0.5)
def no_multi():
task1()
task2()
def use_multi():
p1 = Process(target=task1)
p2 = Process(target=task2)
p1.start()
p2.start()
p1.join()
p2.join()
#p.join() 阻塞當前進程, 當p1.start()之后, p1就提示主進程, 需要等待p1進程執行結束才能向下執行, 那么主進程就乖乖等著, 自然不會執行p2.start()
#[process.join() for process in processes]
if __name__ == '__main__':
# 主進程
start_time= time.time()
#no_multi()
use_multi()
end_time = time.time()
print(end_time-start_time
進程池
為什么需要進程池Pool?
? 當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,
十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時
可以發揮進程池的功效。
? Pool可以提供指定數量的進程供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,
那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,
那么該請求就會等待,直到池中有進程結束,才會創建新的進程來它。
def is_prime(num):
"""判斷素數"""
if num == 1:
return False
for i in range(2, num):
if num % i == 0:
return False
else:
return True
def task(num):
if is_prime(num):
print("%d是素數" % (num))
from multiprocessing import Process
#判斷1000-1200之間所有的素數
def use_mutli():
ps = []
#不要開啟太多進程, 創建子進程會耗費時間和空間(內存);
for num in range(1, 10000):
# 實例化子進程對象
p = Process(target=task, args=(num,))
#開啟子進程
p.start()
#存儲所有的子進程對象
ps.append(p)
#阻塞子進程, 等待所有的子進程執行結束, 再執行主進程;
[p.join() for p in ps]
#判斷1000-1200之間所有的素數
def no_mutli():
for num in range(1, 100000):
task(num)
def use_pool():
"""使用進程池"""
from multiprocessing import Pool
from multiprocessing import cpu_count # 4個
p = Pool(cpu_count())
p.map(task, list(range(1, 100000)))
p.close() # 關閉進程池
p.join() # 阻塞, 等待所有的子進程執行結束, 再執行主進程;
if __name__ == '__main__':
import time
start_time = time.time()
#數據量大小 # 1000-1200 1-10000 # 1-100000
#no_mutli() # 0.0077722072601 # 1.7887046337127686 # 90.75180315971375
use_mutli() # 1.806459665298462
use_pool() # 0.15455389022827148 # 1.2682361602783203 # 35.63375639915466
end_time = time.time()
print(end_time - start_time)
進程間通信
How?
可以使用multiprocessing模塊的Queue實現多進程之間的數據傳遞,Queue 本身是
一個消息列隊程序。
Queue.qsize(): 返回當前隊列包含的消息數量;
Queue.empty(): 如果隊列為空,返回True,反之False ;
Queue.full():
如果隊列滿了,返回True,反之False;
Queue.get([block[, timeout]]):
獲取隊列中的一條消息,然后將其從列隊中移除,block默認值為True;
Queue.get_nowait():
相當Queue.get(False);
Queue.put(item,[block[, timeout]]):
將item消息寫入隊列,block默認值 為True;
Queue.put_nowait(item):
相當Queue.put(item, False)
多線程編程:
線程(英語:thread)是操作系統能夠進行運算調度的最小單位。它被包含在
進程之中,是進程中的實際運作單位。
每個進程至少有一個線程,即進程本身。進程可以啟動多個線程。操作系統像并
行“進程”一樣執行這些線程
線程和進程各自有什么區別和優劣呢?
? 進程是資源分配的最小單位,線程是程序執行的最小單位。
? 進程有自己的獨立地址空間。線程是共享進程中的數據的,使用相同的地址空間.
? 進程之間的通信需要以通信的方式(IPC)進行。線程之間的通信更方便,同一進程下的線程
共享全局變量、靜態變量等數據,難點:處理好同步與互斥。
線程分類
有兩種不同的線程:
? 內核線程
? 用戶空間線程或用戶線程
內核線程是操作系統的一部分,而內核中沒有實現用戶空間線程。
方法一分析
? 多線程程序的執行順序是不確定的。
? 當執行到sleep語句時,線程將被阻塞(Blocked),到sleep結束后,線程進入就緒
(Runnable)狀態,等待調度。而線程調度將自行選擇一個線程執行。
? 代碼中只能保證每個線程都運行完整個run函數,但是線程的啟動順序、 run函數中
每次循環的執行順序都不能確定。
"""
創建子類, 繼承的方式
"""
from multiprocessing import Process
import time
class MyProcess(Process):
"""
創建自己的進程, 父類是Process
"""
def __init__(self, music_name):
super(MyProcess, self).__init__()
self.music_name = music_name
def run(self):
"""重寫run方法, 內容是你要執行的任務"""
print("聽音樂%s" %(self.music_name))
time.sleep(1)
#開啟進程: p.start() ====== p.run()
if __name__ == '__main__':
for i in range(10):
p = MyProcess("音樂%d" %(i))
p.start()
"""
通過實例化對象的方式實現多線程
"""
import time
import threading
def task():
"""當前要執行的任務"""
print("聽音樂........")
time.sleep(1)
if __name__ == '__main__':
start_time = time.time()
threads = []
for count in range(5):
t = threading.Thread(target=task)
#讓線程開始執行任務
t.start()
threads.append(t)
#等待所有的子線程執行結束, 再執行主線程;
[thread.join() for thread in threads]
end_time = time.time()
print(end_time-start_time)
項目案例: 基于多線程的批量主機存活探測
項目描述: 如果要在本地網絡中確定哪些地址處于活動狀態或哪些計算機處于活動狀態,
則可以使用此腳本。我們將依次ping地址, 每次都要等幾秒鐘才能返回值。這可以在Python
中編程,在IP地址的地址范圍內有一個for循環和一個os.popen(“ping -q -c2”+ ip)。
項目瓶頸: 沒有線程的解決方案效率非常低,因為腳本必須等待每次ping。
優點: 在一個進程內的所有線程共享全局變量,能夠在不使用其他方式的前提 下完成多線
程之間的數據共享(這點要比多進程要好)
缺點: 線程是對全局變量隨意遂改可能造成多線程之間對全局變量 的混亂(即線程非安全)
import requests
import json
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from threading import Thread
def task(ip):
"""獲取指定IP的所在城市和國家并存儲到數據庫中"""
#獲取網址的返回內容
url = 'http://ip-api.com/json/%s' % (ip)
try:
response = requests.get(url)
except Exception as e:
print("網頁獲取錯誤:", e)
else:
# 默認返回的是字符串
"""
{"as":"AS174 Cogent Communications","city":"Beijing","country":"China","countryCode":"CN","isp":"China Unicom Shandong Province network","lat":39.9042,"lon":116.407,"org":"NanJing XinFeng Information Technologies, Inc.","query":"114.114.114.114","region":"BJ","regionName":"Beijing","status":"success","timezone":"Asia/Shanghai","zip":""}
"""
contentPage = response.text
#將頁面的json字符串轉換成便于處理的字典;
data_dict = json.loads(contentPage)
#獲取對應的城市和國家
city = data_dict.get('city', 'null') # None
country = data_dict.get('country', 'null')
print(ip, city, country)
#存儲到數據庫表中ips
ipObj = IP(ip=ip, city=city, country=country)
session.add(ipObj)
session.commit()
if __name__ == '__main__':
engine = create_engine("mysql+pymysql://root:westos@172.25.254.123/pymysql",
encoding='utf8',
# echo=True
)
#創建緩存對象
Session = sessionmaker(bind=engine)
session = Session()
#聲明基類
Base = declarative_base()
class IP(Base):
__tablename__ = 'ips'
id = Column(Integer, primary_key=True, autoincrement=True)
ip = Column(String(20), nullable=False)
city = Column(String(30))
country = Column(String(30))
def __repr__(self):
return self.ip
#創建數據表
Base.metadata.create_all(engine)
#1.1.1.1 -- 1.1.1.10
threads = []
for item in range(10):
ip = '1.1.1.' + str(item + 1) # 1.1.1.1 -1.1.1.10
#task(ip)
#多線程執行任務
thread = Thread(target=task, args=(ip,))
#啟動線程并執行任務
thread.start()
#存儲創建的所有線程對象;
threads.append(thread)
[thread.join() for thread in threads]
print("任務執行結束.........")
print(session.query(IP).all())
"""
創建子類
"""
from threading import Thread
class GetHostAliveThread(Thread):
"""
創建子線程, 執行的任務:判斷指定的IP是否存活
"""
def __init__(self, ip):
super(GetHostAliveThread, self).__init__()
self.ip = ip
def run(self):
# # 重寫run方法: 判斷指定的IP是否存活
#"""
#>>> # os.system() 返回值如果為0, 代表命令正確執行,沒有報錯; 如果不為0, 執行報錯;
#...
#>>> os.system('ping -c1 -w1 172.25.254.49 &> /dev/null')
#0
#>>> os.system('ping -c1 -w1 172.25.254.1 &> /dev/null')
#256
#"""
import os
#需要執行的shell命令
cmd = 'ping -c1 -w1 %s &> /dev/null' %(self.ip)
result = os.system(cmd)
#返回值如果為0, 代表命令正確執行,沒有報錯; 如果不為0, 執行報錯;
if result != 0:
print("%s主機沒有ping通" %(self.ip))
if __name__ == '__main__':
print("打印172.25.254.0網段沒有使用的IP地址".center(50, '*'))
for i in range(1, 255):
ip = '172.25.254.' + str(i)
thread = GetHostAliveThread(ip)
thread.start()
共享全局變量:如何解決線程不安全問題?
GIL(global interpreter lock): python解釋器中任意時刻都只有一個線程在執行;
Python代碼的執行由Python 虛擬機(也叫解釋器主循環,CPython版本)來控
制,Python 在設計之初就考慮到要在解釋器的主循環中,同時只有一個線程
在執行,即在任意時刻,只有一個線程在解釋器中運行。對Python 虛擬機的
訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個
線程在運行。
money = 0
def add():
for i in range(1000000):
global money
lock.acquire()
money += 1
lock.release()
def reduce():
for i in range(1000000):
global money
lock.acquire()
money -= 1
lock.release()
if __name__ == '__main__':
from threading import Thread, Lock
# 創建線程鎖
lock = Lock()
t1 = Thread(target=add)
t2 = Thread(target=reduce)
t1.start()
t2.start()
t1.join()
t2.join()
print(money)
線程同步
線程同步:即當有一個線程在對內存進行操作時,其他線程都不可以對這個內
存地址進行操作,直到該線程完成操作, 其他線程才能對該內存地址進行操作.
同步就是協同步調,按預定的先后次序進行運行。如:你說完,我再說。
"同"字從字面上容易理解為一起動作 其實不是,
"同"字應是指協同、協助、互相配合。
死鎖
在線程間共享多個資源的時候,如果兩個線程分別占有一部分資源并且同時 等待對方的資
源,就會造成死鎖。
"""
在線程間共享多個資源的時候,如果兩個線程分別占有?部分資源并且同時 等待對?的資源,就會造成死鎖。
"""
import time
import threading
class Account(object):
def __init__(self, id, money, lock):
self.id = id
self.money = money
self.lock = lock
def reduce(self, money):
self.money -= money
def add(self, money):
self.money += money
def transfer(_from, to, money):
if _from.lock.acquire():
_from.reduce(money)
time.sleep(1)
if to.lock.acquire():
to.add(money)
to.lock.release()
_from.lock.release()
if __name__ == '__main__':
a = Account('a', 1000, threading.Lock()) # 900
b = Account('b', 1000, threading.Lock()) # 1100
t1 = threading.Thread(target=transfer, args=(a, b, 200))
t2 = threading.Thread(target=transfer, args=(b, a, 100))
t1.start()
t2.start()
print(a.money)
print(b.money)
協程,又稱微線程,纖程。英文名Coroutine。協程看上去也是子程序,但執行過程中,
在子程序內部可中斷,然后轉而執行別的子程序,在適當的時候再返回來接著執行。
協程優勢
? 執行效率極高,因為子程序切換(函數)不是線程切換,由程序自身控制,
? 沒有切換線程的開銷。所以與多線程相比,線程的數量越多,協程性能的優
勢越明顯。
?
不需要多線程的鎖機制,因為只有一個線程,也不存在同時寫變量沖突,在控
制共享資源時也不需要加鎖,因此執行效率高很多
gevent實現協程
基本思想:
當一個greenlet遇到IO操作時,比如訪問網絡,就自動切換到其他的greenlet,等到
IO操作完成,再在適當的時候切換回來繼續執行。由于IO操作非常耗時,經常使程
序處于等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在運行,而
不是等待IO。
import gevent
import requests
import json
from gevent import monkey
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from threading import Thread
from gevent import monkey
#打補丁
monkey.patch_all()
def task(ip):
"""獲取指定IP的所在城市和國家并存儲到數據庫中"""
#獲取網址的返回內容
url = 'http://ip-api.com/json/%s' % (ip)
try:
response = requests.get(url)
except Exception as e:
print("網頁獲取錯誤:", e)
else:
# 默認返回的是字符串
"""
{"as":"AS174 Cogent Communications","city":"Beijing","country":"China","countryCode":"CN","isp":"China Unicom Shandong Province network","lat":39.9042,"lon":116.407,"org":"NanJing XinFeng Information Technologies, Inc.","query":"114.114.114.114","region":"BJ","regionName":"Beijing","status":"success","timezone":"Asia/Shanghai","zip":""}
"""
contentPage = response.text
#將頁面的json字符串轉換成便于處理的字典;
data_dict = json.loads(contentPage)
#獲取對應的城市和國家
city = data_dict.get('city', 'null') # None
country = data_dict.get('country', 'null')
print(ip, city, country)
#存儲到數據庫表中ips
ipObj = IP(ip=ip, city=city, country=country)
session.add(ipObj)
session.commit()
if __name__ == '__main__':
engine = create_engine("mysql+pymysql://root:westos@172.25.254.123/pymysql",
encoding='utf8',
# echo=True
)
#創建緩存對象
Session = sessionmaker(bind=engine)
session = Session()
#聲明基類
Base = declarative_base()
class IP(Base):
__tablename__ = 'ips'
id = Column(Integer, primary_key=True, autoincrement=True)
ip = Column(String(20), nullable=False)
city = Column(String(30))
country = Column(String(30))
def __repr__(self):
return self.ip
#創建數據表
Base.metadata.create_all(engine)
#使用協程
gevents = [gevent.spawn(task, '1.1.1.' + str(ip + 1)) for ip in range(10)]
gevent.joinall(gevents)
print("執行結束....")
總結
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。