您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關PyODPS DataFrame 處理笛卡爾積的幾種方式分別是什么,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
PyODPS 提供了 DataFrame API 來用類似 pandas 的接口進行大規模數據分析以及預處理,本文主要介紹如何使用 PyODPS 執行笛卡爾積的操作。
笛卡爾積最常出現的場景是兩兩之間需要比較或者運算。以計算地理位置距離為例,假設大表 Coordinates1 存儲目標點經緯度坐標,共有 M 行數據,小表 Coordinates2 存儲出發點經緯度坐標,共有 N 行數據,現在需要計算所有離目標點最近的出發點坐標。對于一個目標點來說,我們需要計算所有的出發點到目標點的距離,然后找到最小距離,所以整個中間過程需要產生 M * N 條數據,也就是一個笛卡爾積問題。
首先簡單介紹一下背景知識,已知兩個地理位置的坐標點的經緯度,求解兩點之間的距離可以使用 haversine 公式,使用 Python 的表達如下:
def haversine(lat1, lon1, lat2, lon2): # lat1, lon1 為位置 1 的經緯度坐標 # lat2, lon2 為位置 2 的經緯度坐標 import numpy as np dlon = np.radians(lon2 - lon1) dlat = np.radians(lat2 - lat1) a = np.sin( dlat /2 ) **2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin( dlon /2 ) **2 c = 2 * np.arcsin(np.sqrt(a)) r = 6371 # 地球平均半徑,單位為公里 return c * r
目前最推薦的方法就是使用 mapjoin,PyODPS 中使用 mapjoin 的方式十分簡單,只需要兩個 dataframe join 時指定 mapjoin=True
,執行時會對右表做 mapjoin 操作。
In [3]: df1 = o.get_table('coordinates1').to_df() In [4]: df2 = o.get_table('coordinates2').to_df() In [5]: df3 = df1.join(df2, mapjoin=True) In [6]: df1.schema Out[6]: odps.Schema { latitude float64 longitude float64 id string } In [7]: df2.schema Out[7]: odps.Schema { latitude float64 longitude float64 id string } In [8]: df3.schema Out[8]: odps.Schema { latitude_x float64 longitude_x float64 id_x string latitude_y float64 longitude_y float64 id_y string }
可以看到在執行 join 時默認會將重名列加上 _x 和 _y 后綴,可通過在 suffixes
參數中傳入一個二元 tuple 來自定義后綴,當有了 join 之后的表后,通過 PyODPS 中 DataFrame 的自建函數就可以計算出距離,十分簡潔明了,并且效率很高。
In [9]: r = 6371 ...: dis1 = (df3.latitude_y - df3.latitude_x).radians() ...: dis2 = (df3.longitude_y - df3.longitude_x).radians() ...: a = (dis1 / 2).sin() ** 2 + df3.latitude_x.radians().cos() * df3.latitude_y.radians().cos() * (dis2 / 2).sin() ** 2 ...: df3['dis'] = 2 * a.sqrt().arcsin() * r In [12]: df3.head(10) Out[12]: latitude_x longitude_x id_x latitude_y longitude_y id_y dis 0 76.252432 59.628253 0 84.045210 6.517522 0 1246.864981 1 76.252432 59.628253 0 59.061796 0.794939 1 2925.953147 2 76.252432 59.628253 0 42.368304 30.119837 2 4020.604942 3 76.252432 59.628253 0 81.290936 51.682749 3 584.779748 4 76.252432 59.628253 0 34.665222 147.167070 4 6213.944942 5 76.252432 59.628253 0 58.058854 165.471565 5 4205.219179 6 76.252432 59.628253 0 79.150677 58.661890 6 323.070785 7 76.252432 59.628253 0 72.622352 123.195778 7 1839.380760 8 76.252432 59.628253 0 80.063614 138.845193 8 1703.782421 9 76.252432 59.628253 0 36.231584 90.774527 9 4717.284949 In [13]: df1.count() Out[13]: 2000 In [14]: df2.count() Out[14]: 100 In [15]: df3.count() Out[15]: 200000
df3
已經是有 M * N 條數據了,接下來如果需要知道最小距離,直接對 df3
調用 groupby 接上 min
聚合函數就可以得到每個目標點的最小距離。
In [16]: df3.groupby('id_x').dis.min().head(10) Out[16]: dis_min 0 323.070785 1 64.755493 2 1249.283169 3 309.818288 4 1790.484748 5 385.107739 6 498.816157 7 615.987467 8 437.765432 9 272.589621
如果我們需要知道對應最小距離的點的城市,也就是表中對應的 id ,可以在 mapjoin 之后調用 MapReduce,不過我們還有另一種方式是使用 DataFrame 的 apply 方法。要對一行數據使用自定義函數,可以使用 apply 方法,axis 參數必須為 1,表示在行上操作。
要注意 apply 是在服務端執行的 UDF,所以不能在函數內使用類似于df=o.get_table('table_name').to_df()
的表達式去獲得表數據,具體原理可以參考PyODPS DataFrame 的代碼在哪里跑。以本文中的情況為例,要想將表 1 與表 2 中所有的記錄計算,那么需要將表 2 作為一個資源表,然后在自定義中引用該表資源。PyODPS 中使用表資源也十分方便,只需要將一個 collection 傳入 resources
參數即可。collection 是個可迭代對象,不是一個 DataFrame 對象,不可以直接調用 DataFrame 的接口,每個迭代值是一個 namedtuple,可以通過字段名或者偏移來取對應的值。
## use dataframe udf df1 = o.get_table('coordinates1').to_df() df2 = o.get_table('coordinates2').to_df() def func(collections): import pandas as pd collection = collections[0] ids = [] latitudes = [] longitudes = [] for r in collection: ids.append(r.id) latitudes.append(r.latitude) longitudes.append(r.longitude) df = pd.DataFrame({'id': ids, 'latitude':latitudes, 'longitude':longitudes}) def h(x): df['dis'] = haversine(x.latitude, x.longitude, df.latitude, df.longitude) return df.iloc[df['dis'].idxmin()]['id'] return h df1[df1.id, df1.apply(func, resources=[df2], axis=1, reduce=True, types='string').rename('min_id')].execute( libraries=['pandas.zip', 'python-dateutil.zip', 'pytz.zip', 'six.tar.gz'])
在自定義函數中,將表資源通過循環讀成 pandas DataFrame,利用 pandas 的 loc 可以很方便的找到最小值對應的行,從而得到距離最近的出發點 id。另外,如果在自定義函數中需要使用到三方包(例如本例中的 pandas)可以參考這篇文章。
當小表的數據量十分小的時候,我們甚至可以將小表數據作為全局變量在自定義函數中使用。
df1 = o.get_table('coordinates1').to_df() df2 = o.get_table('coordinates2').to_df() df = df2.to_pandas() def func(x): df['dis'] = haversine(x.latitude, x.longitude, df.latitude, df.longitude) return df.iloc[df['dis'].idxmin()]['id'] df1[df1.id, df1.apply(func, axis=1, reduce=True, types='string').rename('min_id')].execute( libraries=['pandas.zip', 'python-dateutil.zip', 'pytz.zip', 'six.tar.gz'])
在上傳函數的時候,會將函數內使用到的全局變量(上面代碼中的 df
) pickle 到 UDF 中。但是注意這種方式使用場景很局限,因為 ODPS 的上傳的文件資源大小是有限制的,所以數據量太大會導致 UDF 生成的資源太大從而無法上傳,而且這種方式最好保證三方包的客戶端與服務端的版本一致,否則很有可能出現序列化的問題,所以建議只在數據量非常小的時候使用。
使用 PyODPS 解決笛卡爾積的問題主要分為兩種方式,一種是 mapjoin,比較直觀,性能好,一般能用 mapjoin 解決的我們都推薦使用 mapjoin,并且最好使用內建函數計算,能到達最高的效率,但是它不夠靈活。另一種是使用 DataFrame 自定義函數,比較靈活,性能相對差一點(可以使用 pandas 或者 numpy 獲得性能上的提升),通過使用表資源,將小表作為表資源傳入 DataFrame 自定義函數中,從而完成笛卡爾積的操作。
看完上述內容,你們對PyODPS DataFrame 處理笛卡爾積的幾種方式分別是什么有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。