您好,登錄后才能下訂單哦!
本篇內容介紹了“怎么在Python中異步操作數據庫”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
異步操作 MySQL 的話,需要使用一個 aiomysql,直接 pip install aiomysql 即可。
aiomysql 底層依賴于 pymysql,所以 aiomysql 并沒有單獨實現相應的連接驅動,而是在 pymysql 之上進行了封裝。
下面先來看看如何查詢記錄。
import asyncio import aiomysql.sa as aio_sa async def main(): # 創建一個異步引擎 engine = await aio_sa.create_engine(host="xx.xxx.xx.xxx", port=3306, user="root", password="root", db="_hanser", connect_timeout=10) # 通過 engine.acquire() 獲取一個連接 async with engine.acquire() as conn: # 異步執行, 返回一個對象 result = await conn.execute("SELECT * FROM girl") # 通過 await result.fetchone() 可以獲取滿足條件的第一條記錄, 一個對象 data = await result.fetchone() # 可以將對象想象成一個字典 print(data.keys())# KeysView((1, '古明地覺', 16, '地靈殿')) print(list(data.keys()))# ['id', 'name', 'age', 'place'] print(data.values())# ValuesView((1, '古明地覺', 16, '地靈殿')) print(list(data.values()))# [1, '古明地覺', 16, '地靈殿'] print(data.items())# ItemsView((1, '古明地覺', 16, '地靈殿')) print(list(data.items()))# [('id', 1), ('name', '古明地覺'), ('age', 16), ('place', '地靈殿')] # 直接轉成字典也是可以的 print(dict(data))# {'id': 1, 'name': '古明地覺', 'age': 16, 'place': '地靈殿'} # 最后別忘記關閉引擎, 當然你在創建引擎的時候也可以通過 async with aio_sa.create_engine 的方式創建 # async with 語句結束后會自動執行下面兩行代碼 engine.close() await engine.wait_closed() loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
怎么樣,是不是很簡單呢,和同步庫的操作方式其實是類似的。但是很明顯,我們在獲取記錄的時候不會只獲取一條,而是會獲取多條,獲取多條的話使用 await result.fetchall() 即可。
import asyncio from pprint import pprint import aiomysql.sa as aio_sa async def main(): # 通過異步上下文管理器的方式創建, 會自動幫我們關閉引擎 async with aio_sa.create_engine(host="xx.xxx.xx.xxx", port=3306, user="root", password="root", db="_hanser", connect_timeout=10) as engine: async with engine.acquire() as conn: result = await conn.execute("SELECT * FROM girl") # 此時的 data 是一個列表, 列表里面是對象 data = await result.fetchall() # 將里面的元素轉成字典 pprint(list(map(dict, data))) """ [{'age': 16, 'id': 1, 'name': '古明地覺', 'place': '地靈殿'}, {'age': 16, 'id': 2, 'name': '霧雨魔理沙', 'place': '魔法森林'}, {'age': 400, 'id': 3, 'name': '芙蘭朵露', 'place': '紅魔館'}] """ loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
除了 fetchone、fetchall 之外,還有一個 fetchmany,可以獲取指定記錄的條數。
import asyncio from pprint import pprint import aiomysql.sa as aio_sa async def main(): # 通過異步上下文管理器的方式創建, 會自動幫我們關閉引擎 async with aio_sa.create_engine(host="xx.xxx.xx.xxx", port=3306, user="root", password="root", db="_hanser", connect_timeout=10) as engine: async with engine.acquire() as conn: result = await conn.execute("SELECT * FROM girl") # 默認是獲取一條, 得到的仍然是一個列表 data = await result.fetchmany(2) pprint(list(map(dict, data))) """ [{'age': 16, 'id': 1, 'name': '古明地覺', 'place': '地靈殿'}, {'age': 16, 'id': 2, 'name': '霧雨魔理沙', 'place': '魔法森林'}] """ loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
以上就是通過 aiomysql 查詢數據庫中的記錄,沒什么難度。但是值得一提的是,await conn.execute 里面除了可以傳遞一個原生的 SQL 語句之外,我們還可以借助 SQLAlchemy。
import asyncio from pprint import pprint import aiomysql.sa as aio_sa from sqlalchemy.sql.selectable import Select from sqlalchemy import text async def main(): async with aio_sa.create_engine(host="xx.xxx.xx.xxx", port=3306, user="root", password="root", db="_hanser", connect_timeout=10) as engine: async with engine.acquire() as conn: sql = Select([text("id, name, place")], whereclause=text("id != 1"), from_obj=text("girl")) result = await conn.execute(sql) data = await result.fetchall() pprint(list(map(dict, data))) """ [{'id': 2, 'name': '霧雨魔理沙', 'place': '魔法森林'}, {'id': 3, 'name': '芙蘭朵露', 'place': '紅魔館'}] """ loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
然后是添加記錄,我們同樣可以借助 SQLAlchemy 幫助我們拼接 SQL 語句。
import asyncio from pprint import pprint import aiomysql.sa as aio_sa from sqlalchemy import Table, MetaData, create_engine async def main(): async with aio_sa.create_engine(host="xx.xx.xx.xxx", port=3306, user="root", password="root", db="_hanser", connect_timeout=10) as engine: async with engine.acquire() as conn: # 我們還需要創建一個 SQLAlchemy 中的引擎, 然后將表反射出來 s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser") tbl = Table("girl", MetaData(bind=s_engine), autoload=True insert_sql = tbl.insert().values( [{"name": "十六夜咲夜", "age": 17, "place": "紅魔館"}, {"name": "琪露諾", "age": 60, "place": "霧之湖"}]) # 注意: 執行的執行必須開啟一個事務, 否則數據是不會進入到數據庫中的 async with conn.begin(): # 同樣會返回一個對象 # 盡管我們插入了多條, 但只會返回最后一條的插入信息 result = await conn.execute(insert_sql) # 返回最后一條記錄的自增 id print(result.lastrowid) # 影響的行數 print(result.rowcount) # 重新查詢, 看看記錄是否進入到數據庫中 async with engine.acquire() as conn: data = await (await conn.execute("select * from girl")).fetchall() data = list(map(dict, data)) pprint(data) """ [{'age': 16, 'id': 1, 'name': '古明地覺', 'place': '地靈殿'}, {'age': 16, 'id': 2, 'name': '霧雨魔理沙', 'place': '魔法森林'}, {'age': 400, 'id': 3, 'name': '芙蘭朵露', 'place': '紅魔館'}, {'age': 17, 'id': 16, 'name': '十六夜咲夜', 'place': '紅魔館'}, {'age': 60, 'id': 17, 'name': '琪露諾', 'place': '霧之湖'}] """ loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
還是很方便的,但是插入多條記錄的話只會返回插入的最后一條記錄的信息,所以如果你希望獲取每一條的信息,那么就一條一條插入。
修改記錄和添加記錄是類似的,我們來看一下。
import asyncio from pprint import pprint import aiomysql.sa as aio_sa from sqlalchemy import Table, MetaData, create_engine, text async def main(): async with aio_sa.create_engine(host="xx.xx.xx.xxx", port=3306, user="root", password="root", db="_hanser", connect_timeout=10) as engine: async with engine.acquire() as conn: s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser") tbl = Table("girl", MetaData(bind=s_engine), autoload=True) update_sql = tbl.update().where(text("name = '古明地覺'")).values({"place": "東方地靈殿"}) # 同樣需要開啟一個事務 async with conn.begin(): result = await conn.execute(update_sql) print(result.lastrowid)# 0 print(result.rowcount) # 1 # 查詢結果 async with engine.acquire() as conn: data = await (await conn.execute("select * from girl where name = '古明地覺'")).fetchall() data = list(map(dict, data)) pprint(data) """ [{'age': 16, 'id': 1, 'name': '古明地覺', 'place': '東方地靈殿'}] """ loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
可以看到,記錄被成功的修改了。
刪除記錄就更簡單了,直接看代碼。
import asyncio import aiomysql.sa as aio_sa from sqlalchemy import Table, MetaData, create_engine, text async def main(): async with aio_sa.create_engine(host="xx.xx.xx.xxx", port=3306, user="root", password="root", db="_hanser", connect_timeout=10) as engine: async with engine.acquire() as conn: s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser") tbl = Table("girl", MetaData(bind=s_engine), autoload=True) update_sql = tbl.delete()# 全部刪除 # 同樣需要開啟一個事務 async with conn.begin(): result = await conn.execute(update_sql) # 返回最后一條記錄的自增 id, 我們之前修改了 id = 0 記錄, 所以它跑到最后了 print(result.lastrowid)# 0 # 受影響的行數 print(result.rowcount) # 6 loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
此時數據庫中的記錄已經全部被刪除了。
整體來看還是比較簡單的,并且支持的功能也比較全面。
異步操作 PostgreSQL 的話,我們有兩個選擇,一個是 asyncpg 庫,另一個是 aiopg 庫。
asyncpg 是自己實現了一套連接驅動,而 aiopg 則是對 psycopg2 進行了封裝,個人更推薦 asyncpg,性能和活躍度都比 aiopg 要好。
下面來看看如何使用 asyncpg,首先是安裝,直接 pip install asyncpg 即可。
首先是查詢記錄。
import asyncio from pprint import pprint import asyncpg async def main(): # 創建連接數據庫的驅動 conn = await asyncpg.connect(host="localhost", port=5432, user="postgres", password="zgghyys123", database="postgres", timeout=10) # 除了上面的方式,還可以使用類似于 SQLAlchemy 的方式創建 # await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres") # 調用 await conn.fetchrow 執行 select 語句,獲取滿足條件的單條記錄 # 調用 await conn.fetch 執行 select 語句,獲取滿足條件的全部記錄 row1 = await conn.fetchrow("select * from girl") row2 = await conn.fetch("select * from girl") # 返回的是一個 Record 對象,這個 Record 對象等于將返回的記錄進行了一個封裝 # 至于怎么用后面會說 print(row1)#pprint(row2) """ [,,] """ # 關閉連接 await conn.close() loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
以上我們演示了如何使用 asyncpg 來獲取數據庫中的記錄,我們看到執行 select 語句的話,我們可以使用 conn.fetchrow(query) 來獲取滿足條件的單條記錄,conn.fetch(query) 來獲取滿足條件的所有記錄。
Record 對象
我們說使用 conn.fetchone 查詢得到的是一個 Record 對象,使用 conn.fetch 查詢得到的是多個 Record 對象組成的列表,那么這個 Rcord 對象怎么用呢?
import asyncio import asyncpg async def main(): conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres") row = await conn.fetchrow("select * from girl") print(type(row))#print(row)## 這個 Record 對象可以想象成一個字典 # 我們可以將返回的字段名作為 key, 通過字典的方式進行獲取 print(row["id"], row["name"])# 1 古明地覺 # 除此之外,還可以通過 get 獲取,獲取不到的時候會返回默認值 print(row.get("id"), row.get("name"))# 1 古明地覺 print(row.get("xxx"), row.get("xxx", "不存在的字段"))# None 不存在的字段 # 除此之外還可以調用 keys、values、items,這個不用我說,都應該知道意味著什么 # 只不過返回的是一個迭代器 print(row.keys())#print(row.values())#print(row.items())## 我們需要轉成列表或者元組 print(list(row.keys()))# ['id', 'name', 'age', 'place'] print(list(row.values()))# [1, '古明地覺', 16, '地靈殿'] print(dict(row.items()))# {'id': 1, 'name': '古明地覺', 'age': 16, 'place': '地靈殿'} print(dict(row))# {'id': 1, 'name': '古明地覺', 'age': 16, 'place': '地靈殿'} # 關閉連接 await conn.close() if __name__ == '__main__': asyncio.run(main())
當然我們也可以借助 SQLAlchemy 幫我們拼接 SQL 語句。
import asyncio from pprint import pprint import asyncpg from sqlalchemy.sql.selectable import Select from sqlalchemy import text async def main(): conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres") sql = Select([text("id, name, place")], whereclause=text("id != 1"), from_obj=text("girl")) # 我們不能直接傳遞一個 Select 對象, 而是需要將其轉成原生的字符串才可以 rows = await conn.fetch(str(sql)) pprint(list(map(dict, rows))) """ [{'id': 2, 'name': '椎名真白', 'place': '櫻花莊'}, {'id': 3, 'name': '古明地戀', 'place': '地靈殿'}] """ # 關閉連接 await conn.close() if __name__ == '__main__': asyncio.run(main())
此外,conn.fetch 里面還支持占位符,使用百分號加數字的方式,舉個例子:
import asyncio from pprint import pprint import asyncpg async def main(): conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres") rows = await conn.fetch("select * from girl where id != $1", 1) pprint(list(map(dict, rows))) """ [{'age': 16, 'id': 2, 'name': '椎名真白', 'place': '櫻花莊'}, {'age': 15, 'id': 3, 'name': '古明地戀', 'place': '地靈殿'}] """ # 關閉連接 await conn.close() if __name__ == '__main__': asyncio.run(main())
還是推薦使用 SQLAlchemy 的方式,這樣更加方便一些,就像 aiomysql 一樣。但是對于 asyncpg 而言,實際上接收的是一個原生的 SQL 語句,是一個字符串,因此它不能像 aiomysql 一樣自動識別 Select 對象,我們還需要手動將其轉成字符串。而且這樣還存在一個問題,至于是什么我們下面介紹添加記錄的時候說。
然后是添加記錄,我們看看如何往庫里面添加數據。
import asyncio from pprint import pprint import asyncpg from sqlalchemy.sql.selectable import Select from sqlalchemy import text async def main(): conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres") # 執行 insert 語句我們可以使用 execute row = await conn.execute("insert into girl(name, age, place) values ($1, $2, $3)", '十六夜咲夜', 17, '紅魔館') pprint(row)# INSERT 0 1 pprint(type(row))#await conn.close() if __name__ == '__main__': asyncio.run(main())
通過 execute 可以插入單條記錄,同時返回相關信息,但是說實話這個信息沒什么太大用。除了 execute 之外,還有 executemany,用來執行多條插入語句。
import asyncio import asyncpg async def main(): conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres") # executemany:第一條參數是一個模板,第二條命令是包含多個元組的列表 # 執行多條記錄的話,返回的結果為 None rows = await conn.executemany("insert into girl(name, age, place) values ($1, $2, $3)", [('十六夜咲夜', 17, '紅魔館'), ('琪露諾', 60, '霧之湖')]) print(rows)# None # 關閉連接 await conn.close() if __name__ == '__main__': asyncio.run(main())
注意:如果是執行大量 insert 語句的話,那么 executemany 要比 execute 快很多,但是 executemany 不具備事務功。
“怎么在Python中異步操作數據庫”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。