您好,登錄后才能下訂單哦!
這篇文章主要講解了“怎么用Mars Remote API執行Python函數”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“怎么用Mars Remote API執行Python函數”吧!
Mars 是一個并行和分布式 Python 框架,能輕松把單機大家耳熟能詳的的 numpy、pandas、scikit-learn 等庫,以及 Python 函數利用多核或者多機加速。這其中,并行和分布式 Python 函數主要利用 Mars Remote API。
啟動 Mars 分布式環境可以參考:
命令行方式在集群中部署。
Kubernetes 中部署。
MaxCompute 開箱即用的環境,購買了 MaxCompute 服務的可以直接使用。
使用 Mars Remote API 非常簡單,只需要對原有的代碼做少許改動,就可以分布式執行。
采用蒙特卡洛方法計算 π 為例。代碼如下,我們編寫了兩個函數,calc_chunk 用來計算每個分片內落在圓內的點的個數,calc_pi 用來把多個分片 calc_chunk 計算的結果匯總最后得出 π 值。
from typing import List import numpy as np def calc_chunk(n: int, i: int): # 計算n個隨機點(x和y軸落在-1到1之間)到原點距離小于1的點的個數 rs = np.random.RandomState(i) a = rs.uniform(-1, 1, size=(n, 2)) d = np.linalg.norm(a, axis=1) return (d < 1).sum() def calc_pi(fs: List[int], N: int): # 將若干次 calc_chunk 計算的結果匯總,計算 pi 的值 return sum(fs) * 4 / N N = 200_000_000 n = 10_000_000 fs = [calc_chunk(n, i) for i in range(N // n)] pi = calc_pi(fs, N) print(pi)
%%time 下可以看到結果:
3.1416312 CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 s Wall time: 12.3 s
在單機需要 12.3 s。
要讓這個計算使用 Mars Remote API 并行起來,我們不需要對函數做任何改動,需要變動的僅僅是最后部分。
import mars.remote as mr # 函數調用改成 mars.remote.spawn fs = [mr.spawn(calc_chunk, args=(n, i)) for i in range(N // n)] # 把 spawn 的列表傳入作為參數,再 spawn 新的函數 pi = mr.spawn(calc_pi, args=(fs, N)) # 通過 execute() 觸發執行,fetch() 獲取結果 print(pi.execute().fetch())
%%time 下看到結果:
3.1416312 CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 ms Wall time: 2.85 s
結果一模一樣,但是卻有數倍的性能提升。
可以看到,對已有的 Python 代碼,Mars remote API 幾乎不需要做多少改動,就能有效并行和分布式來加速執行過程。
為了讓讀者理解 Mars Remote API 的作用,我們從另一個例子開始。現在我們有一個數據集,我們希望對它們做一個分類任務。要做分類,我們有很多算法和庫可以選擇,這里我們用 RandomForest、LogisticRegression,以及 XGBoost。
困難的地方是,除了有多個模型選擇,這些模型也會包含多個超參,那哪個超參效果最好呢?對于調參不那么有經驗的同學,跑過了才知道。所以,我們希望能生成一堆可選的超參,然后把他們都跑一遍,看看效果。
準備數據
這個例子里我們使用 otto 數據集。
首先,我們準備數據。讀取數據后,我們按 2:1 的比例把數據分成訓練集和測試集。
import pandas as pd from sklearn.preprocessing import LabelEncoder from sklearn.model_selection import train_test_split def gen_data(): df = pd.read_csv('otto/train.csv') X = df.drop(['target', 'id'], axis=1) y = df['target'] label_encoder = LabelEncoder() label_encoder.fit(y) y = label_encoder.transform(y) return train_test_split(X, y, test_size=0.33, random_state=123) X_train, X_test, y_train, y_test = gen_data()
模型
接著,我們使用 scikit-learn 的 RandomForest 和 LogisticRegression 來處理分類。
RandomForest:
from sklearn.ensemble import RandomForestClassifier def random_forest(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw): model = RandomForestClassifier(verbose=verbose, **kw) model.fit(X_train, y_train) return model
接著,我們生成供 RandomForest 使用的超參,我們用 yield 的方式來迭代返回。
def gen_random_forest_parameters(): for n_estimators in [50, 100, 600]: for max_depth in [None, 3, 15]: for criterion in ['gini', 'entropy']: yield { 'n_estimators': n_estimators, 'max_depth': max_depth, 'criterion': criterion }
LogisticRegression 也是這個過程。我們先定義模型。
from sklearn.linear_model import LogisticRegression def logistic_regression(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw): model = LogisticRegression(verbose=verbose, **kw) model.fit(X_train, y_train) return model
接著生成供 LogisticRegression 使用的超參。
def gen_lr_parameters(): for penalty in ['l2', 'none']: for tol in [0.1, 0.01, 1e-4]: yield { 'penalty': penalty, 'tol': tol }
XGBoost 也是一樣,我們用 XGBClassifier 來執行分類任務。
from xgboost import XGBClassifier def xgb(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw): model = XGBClassifier(verbosity=int(verbose), **kw) model.fit(X_train, y_train) return model
生成一系列超參。
def gen_xgb_parameters(): for n_estimators in [100, 600]: for criterion in ['gini', 'entropy']: for learning_rate in [0.001, 0.1, 0.5]: yield { 'n_estimators': n_estimators, 'criterion': criterion, 'learning_rate': learning_rate }
驗證
接著我們編寫驗證邏輯,這里我們使用 log_loss 來作為評價函數。
from sklearn.metrics import log_loss def metric_model(model, X_test: pd.DataFrame, y_test: pd.Series) -> float: if isinstance(model, bytes): model = pickle.loads(model) y_pred = model.predict_proba(X_test) return log_loss(y_test, y_pred) def train_and_metric(train_func, train_params: dict, X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series, verbose: bool = False ): # 把訓練和驗證封裝到一起 model = train_func(X_train, y_train, verbose=verbose, **train_params) metric = metric_model(model, X_test, y_test) return model, metric
做好準備工作后,我們就開始來跑模型了。針對每個模型,我們把每次生成的超參們送進去訓練,除了這些超參,我們還把 n_jobs 設成 -1,這樣能更好利用單機的多核。
results = [] # ------------- # Random Forest # ------------- for params in gen_random_forest_parameters(): print(f'calculating on {params}') # fixed random_state params['random_state'] = 123 # use all CPU cores params['n_jobs'] = -1 model, metric = train_and_metric(random_forest, params, X_train, y_train, X_test, y_test) print(f'metric: {metric}') results.append({'model': model, 'metric': metric}) # ------------------- # Logistic Regression # ------------------- for params in gen_lr_parameters(): print(f'calculating on {params}') # fixed random_state params['random_state'] = 123 # use all CPU cores params['n_jobs'] = -1 model, metric = train_and_metric(logistic_regression, params, X_train, y_train, X_test, y_test) print(f'metric: {metric}') results.append({'model': model, 'metric': metric}) # ------- # XGBoost # ------- for params in gen_xgb_parameters(): print(f'calculating on {params}') # fixed random_state params['random_state'] = 123 # use all CPU cores params['n_jobs'] = -1 model, metric = train_and_metric(xgb, params, X_train, y_train, X_test, y_test) print(f'metric: {metric}') results.append({'model': model, 'metric': metric})
運行一下,需要相當長時間,我們省略掉一部分輸出內容。
calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'gini'} metric: 0.6964123781828575 calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'entropy'} metric: 0.6912312790832288 # 省略其他模型的輸出結果 CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28s Wall time: 31min 44s
從 CPU 時間和 Wall 時間,能看出來這些訓練還是充分利用了多核的性能。但整個過程還是花費了 31 分鐘。
現在我們嘗試使用 Remote API 通過分布式方式加速整個過程。
集群方面,我們使用最開始說的第三種方式,直接在 MaxCompute 上拉起一個集群。大家可以選擇其他方式,效果是一樣的。
n_cores = 8 mem = 2 * n_cores # 16G # o 是 MaxCompute 入口,這里創建 10 個 worker 的集群,每個 worker 8核16G cluster = o.create_mars_cluster(10, n_cores, mem, image='extended')
為了方便在分布式讀取數據,我們對數據處理稍作改動,把數據上傳到 MaxCompute 資源。對于其他環境,用戶可以考慮 HDFS、Aliyun OSS 或者 Amazon S3 等存儲。
if not o.exist_resource('otto_train.csv'): with open('otto/train.csv') as f: # 上傳資源 o.create_resource('otto_train.csv', 'file', fileobj=f) def gen_data(): # 改成從資源讀取 df = pd.read_csv(o.open_resource('otto_train.csv')) X = df.drop(['target', 'id'], axis=1) y = df['target'] label_encoder = LabelEncoder() label_encoder.fit(y) y = label_encoder.transform(y) return train_test_split(X, y, test_size=0.33, random_state=123)
稍作改動之后,我們使用 mars.remote.spawn 方法來讓 gen_data 調度到集群上運行。
import mars.remote as mr # n_output 說明是 4 輸出 # execute() 執行后,數據會讀取到 Mars 集群內部 data = mr.ExecutableTuple(mr.spawn(gen_data, n_output=4)).execute() # remote_ 開頭的都是 Mars 對象,這時候數據在集群內,這些對象只是引用 remote_X_train, remote_X_test, remote_y_train, remote_y_test = data
目前 Mars 能正確序列化 numpy ndarray、pandas DataFrame 等,還不能序列化模型,所以,我們要對 train_and_metric 稍作改動,把模型 pickle 了之后再返回。
def distributed_train_and_metric(train_func, train_params: dict, X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series, verbose: bool = False ): model, metric = train_and_metric(train_func, train_params, X_train, y_train, X_test, y_test, verbose=verbose) return pickle.dumps(model), metric
后續 Mars 支持了序列化模型后可以直接 spawn 原本的函數。
接著我們就對前面的執行過程稍作改動,把函數調用全部都用 mars.remote.spawn 來改寫。
import numpy as np tasks = [] models = [] metrics = [] # ------------- # Random Forest # ------------- for params in gen_random_forest_parameters(): # fixed random_state params['random_state'] = 123 task = mr.spawn(distributed_train_and_metric, args=(random_forest, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs={'verbose': 2}, n_output=2 ) tasks.extend(task) # 把模型和評價分別存儲 models.append(task[0]) metrics.append(task[1]) # ------------------- # Logistic Regression # ------------------- for params in gen_lr_parameters(): # fixed random_state params['random_state'] = 123 task = mr.spawn(distributed_train_and_metric, args=(logistic_regression, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs={'verbose': 2}, n_output=2 ) tasks.extend(task) # 把模型和評價分別存儲 models.append(task[0]) metrics.append(task[1]) # ------- # XGBoost # ------- for params in gen_xgb_parameters(): # fixed random_state params['random_state'] = 123 # 再指定并發為核的個數 params['n_jobs'] = n_cores task = mr.spawn(distributed_train_and_metric, args=(xgb, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs={'verbose': 2}, n_output=2 ) tasks.extend(task) # 把模型和評價分別存儲 models.append(task[0]) metrics.append(task[1]) # 把順序打亂,目的是能分散到 worker 上平均一點 shuffled_tasks = np.random.permutation(tasks) _ = mr.ExecutableTuple(shuffled_tasks).execute()
可以看到代碼幾乎一致。
運行查看結果:
CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 ms Wall time: 1min 59s
時間一下子從 31 分鐘多來到了 2 分鐘,提升 15x+。但代碼修改的代價可以忽略不計。
細心的讀者可能注意到了,分布式運行的代碼中,我們把模型的 verbose 給打開了,在分布式環境下,因為這些函數遠程執行,打印的內容只會輸出到 worker 的標準輸出流,我們在客戶端不會看到打印的結果,但 Mars 提供了一個非常有用的接口來讓我們查看每個模型運行時的輸出。
以第0個模型為例,我們可以在 Mars 對象上直接調用 fetch_log 方法。
print(models[0].fetch_log())
輸出我們簡略一部分。
building tree 1 of 50 building tree 2 of 50 building tree 3 of 50 building tree 4 of 50 building tree 5 of 50 building tree 6 of 50 # 中間省略 building tree 49 of 50 building tree 50 of 50
要看哪個模型都可以通過這種方式。試想下,如果沒有 fetch_log API,你確想看中間過程的輸出有多麻煩。首先這個函數在哪個 worker 上執行,不得而知;然后,即便知道是哪個 worker,因為每個 worker 上可能有多個函數執行,這些輸出就可能混雜在一起,甚至被龐大日志淹沒了。fetch_log 接口讓用戶不需要關心在哪個 worker 上執行,也不用擔心日志混合在一起。
感謝各位的閱讀,以上就是“怎么用Mars Remote API執行Python函數”的內容了,經過本文的學習后,相信大家對怎么用Mars Remote API執行Python函數這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。