您好,登錄后才能下訂單哦!
本文小編為大家詳細介紹“Python怎么實現將MongoDB中的數據導入到MySQL”,內容詳細,步驟清晰,細節處理妥當,希望這篇“Python怎么實現將MongoDB中的數據導入到MySQL”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。
實現代碼
import pymysql from loguru import logger class MongoToMysql: def __init__(self, mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port, mysql_user, mysql_password, mysql_db,table_name=None,set_max_length=False,batch_size=10000,table_description=''): self.mongo_host = mongo_host self.mongo_port = mongo_port self.mongo_db = mongo_db self.mongo_collection = mongo_collection self.mysql_host = mysql_host self.mysql_port = mysql_port self.mysql_user = mysql_user self.mysql_password = mysql_password self.mysql_db = mysql_db self.table_name = table_name self.set_max_length = set_max_length self.batch_size = batch_size self.table_description = table_description self.data_types = self.get_mongo_data_types() self.create_mysql_table(self.data_types,set_max_length= self.set_max_length,table_description=self.table_description) self.push_data_to_mysql(self.batch_size) def get_mongo_data_types(self): logger.debug('正在獲取mongo中字段的類型!') client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port) db = client[self.mongo_db] collection = db[self.mongo_collection] data_types = {} for field in collection.find_one().keys(): data_types[field] = type(collection.find_one()[field]).__name__ return data_types def check_mysql_table_exists(self): logger.debug('檢查是否存在該表,有則刪之!') conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user, password=self.mysql_password, db=self.mysql_db) cursor = conn.cursor() sql = f"DROP TABLE IF EXISTS {self.mongo_collection}" cursor.execute(sql) conn.commit() conn.close() def get_max_length(self, field): logger.debug(f'正在獲取字段 {field} 最大長度......') client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port) db = client[self.mongo_db] collection = db[self.mongo_collection] max_length = 0 for item in collection.find({},{field:1,'_id':0}): value = item.get(field) if isinstance(value, str) and len(value) > max_length: max_length = len(value) return max_length def create_mysql_table(self, data_types,set_max_length,table_description): logger.debug('正在mysql中創建表結構!') self.check_mysql_table_exists() conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user, password=self.mysql_password, db=self.mysql_db) cursor = conn.cursor() if self.table_name: table_name = self.table_name else: table_name = self.mongo_collection fields = [] for field, data_type in data_types.items(): if data_type == 'int': fields.append(f"{field} INT") elif data_type == 'float': fields.append(f"{field} FLOAT") elif data_type == 'bool': fields.append(f"{field} BOOLEAN") else: if set_max_length: fields.append(f"{field} TEXT)") else: max_length = self.get_max_length(field) fields.append(f"{field} VARCHAR({max_length + 200})") fields_str = ','.join(fields) sql = f"CREATE TABLE {table_name} (id INT PRIMARY KEY AUTO_INCREMENT,{fields_str}) COMMENT='{table_description}'" cursor.execute(sql) conn.commit() conn.close() def push_data_to_mysql(self, batch_size=10000): logger.debug('--- 正在準備從mongo中每次推送10000條數據到mysql ----') client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port) db = client[self.mongo_db] collection = db[self.mongo_collection] conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user, password=self.mysql_password, db=self.mysql_db) cursor = conn.cursor() if self.table_name: table_name = self.table_name else: table_name = self.mongo_collection # table_name = self.mongo_collection data = [] count = 0 for item in collection.find(): count += 1 row = [] for field, data_type in self.data_types.items(): value = item.get(field) if value is None: row.append(None) elif data_type == 'int': row.append(str(item.get(field, 0))) elif data_type == 'float': row.append(str(item.get(field, 0.0))) elif data_type == 'bool': row.append(str(item.get(field, False))) else: row.append(str(item.get(field, ''))) data.append(row) if count % batch_size == 0: placeholders = ','.join(['%s'] * len(data[0])) sql = f"INSERT INTO {table_name} VALUES (NULL,{placeholders})" cursor.executemany(sql, data) conn.commit() data = [] logger.debug(f'--- 已完成推送:{count} 條數據! ----') if data: placeholders = ','.join(['%s'] * len(data[0])) sql = f"INSERT INTO {table_name} VALUES (NULL,{placeholders})" cursor.executemany(sql, data) conn.commit() logger.debug(f'--- 已完成推送:{count} 條數據! ----') conn.close() if __name__ == '__main__': """MySQL""" mongo_host = '127.0.0.1' mongo_port = 27017 mongo_db = 'db_name' mongo_collection = 'collection_name' """MongoDB""" mysql_host = '127.0.0.1' mysql_port = 3306 mysql_user = 'root' mysql_password = '123456' mysql_db = 'mysql_db' table_description = '' # 表描述 mongo_to_mysql = MongoToMysql(mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port, mysql_user, mysql_password, mysql_db,table_description=table_description) # # table_name = None # 默認為None 則MySQL中的表名跟Mongo保持一致 # set_max_length = False # 默認為False 根據mongo中字段最大長度 加200 來設置字段的VARCHART長度 , 否則定義TEXT類型 # batch_size = 10000 # 控制每次插入數據量的大小 # table_description = '' # 表描述 # mongo_to_mysql = MongoToMysql(mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port, # mysql_user, mysql_password, mysql_db,table_name,set_max_length,batch_size,table_description)
代碼使用了 PyMongo、PyMySQL 和 Loguru 等 Python 庫,并封裝了一個 MongoToMysql 類。在類的初始化時,會自動獲取 MongoDB 中字段的類型,并根據字段類型創建 MySQL 表結構。在將數據從 MongoDB 推送到 MySQL 時,還可以控制每次插入數據量的大小,以避免一次性插入大量數據造成系統崩潰或性能下降。
需要注意的是,在創建 MySQL 表結構時,如果用戶選擇了設置最大長度,則會創建 TEXT 類型的字段,否則會根據 MongoDB 中字段的最大長度加上200來設置 VARCHAR 類型的字段長度。
讀到這里,這篇“Python怎么實現將MongoDB中的數據導入到MySQL”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。