91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python怎么實現將MongoDB中的數據導入到MySQL

發布時間:2023-05-04 15:19:36 來源:億速云 閱讀:103 作者:iii 欄目:開發技術

本文小編為大家詳細介紹“Python怎么實現將MongoDB中的數據導入到MySQL”,內容詳細,步驟清晰,細節處理妥當,希望這篇“Python怎么實現將MongoDB中的數據導入到MySQL”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。

實現代碼

import pymysql
from loguru import logger

class MongoToMysql:
    def __init__(self, mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port, mysql_user,
                 mysql_password, mysql_db,table_name=None,set_max_length=False,batch_size=10000,table_description=''):
        self.mongo_host = mongo_host
        self.mongo_port = mongo_port
        self.mongo_db = mongo_db
        self.mongo_collection = mongo_collection
        self.mysql_host = mysql_host
        self.mysql_port = mysql_port
        self.mysql_user = mysql_user
        self.mysql_password = mysql_password
        self.mysql_db = mysql_db
        self.table_name = table_name
        self.set_max_length = set_max_length
        self.batch_size = batch_size
        self.table_description = table_description
        self.data_types = self.get_mongo_data_types()
        self.create_mysql_table(self.data_types,set_max_length= self.set_max_length,table_description=self.table_description)
        self.push_data_to_mysql(self.batch_size)

    def get_mongo_data_types(self):
        logger.debug('正在獲取mongo中字段的類型!')
        client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
        db = client[self.mongo_db]
        collection = db[self.mongo_collection]
        data_types = {}
        for field in collection.find_one().keys():
            data_types[field] = type(collection.find_one()[field]).__name__
        return data_types

    def check_mysql_table_exists(self):
        logger.debug('檢查是否存在該表,有則刪之!')
        conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
                               password=self.mysql_password, db=self.mysql_db)
        cursor = conn.cursor()
        sql = f"DROP TABLE IF EXISTS {self.mongo_collection}"
        cursor.execute(sql)
        conn.commit()
        conn.close()

    def get_max_length(self, field):
        logger.debug(f'正在獲取字段 {field} 最大長度......')
        client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
        db = client[self.mongo_db]
        collection = db[self.mongo_collection]
        max_length = 0
        for item in collection.find({},{field:1,'_id':0}):
            value = item.get(field)
            if isinstance(value, str) and len(value) > max_length:
                max_length = len(value)
        return max_length

    def create_mysql_table(self, data_types,set_max_length,table_description):
        logger.debug('正在mysql中創建表結構!')
        self.check_mysql_table_exists()
        conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
                               password=self.mysql_password, db=self.mysql_db)
        cursor = conn.cursor()
        if self.table_name:
            table_name = self.table_name
        else:
            table_name = self.mongo_collection
        fields = []
        for field, data_type in data_types.items():
            if data_type == 'int':
                fields.append(f"{field} INT")
            elif data_type == 'float':
                fields.append(f"{field} FLOAT")
            elif data_type == 'bool':
                fields.append(f"{field} BOOLEAN")
            else:
                if set_max_length:
                    fields.append(f"{field} TEXT)")
                else:
                    max_length = self.get_max_length(field)
                    fields.append(f"{field} VARCHAR({max_length + 200})")
        fields_str = ','.join(fields)
        sql = f"CREATE TABLE {table_name} (id INT PRIMARY KEY AUTO_INCREMENT,{fields_str}) COMMENT='{table_description}'"
        cursor.execute(sql)
        conn.commit()
        conn.close()


    def push_data_to_mysql(self, batch_size=10000):
        logger.debug('--- 正在準備從mongo中每次推送10000條數據到mysql ----')
        client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port)
        db = client[self.mongo_db]
        collection = db[self.mongo_collection]
        conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user,
                               password=self.mysql_password, db=self.mysql_db)
        cursor = conn.cursor()
        if self.table_name:
            table_name = self.table_name
        else:
            table_name = self.mongo_collection
        # table_name = self.mongo_collection
        data = []
        count = 0
        for item in collection.find():
            count += 1
            row = []
            for field, data_type in self.data_types.items():
                value = item.get(field)
                if value is None:
                    row.append(None)
                elif data_type == 'int':
                    row.append(str(item.get(field, 0)))
                elif data_type == 'float':
                    row.append(str(item.get(field, 0.0)))
                elif data_type == 'bool':
                    row.append(str(item.get(field, False)))
                else:
                    row.append(str(item.get(field, '')))
            data.append(row)
            if count % batch_size == 0:
                placeholders = ','.join(['%s'] * len(data[0]))
                sql = f"INSERT INTO {table_name} VALUES (NULL,{placeholders})"
                cursor.executemany(sql, data)
                conn.commit()
                data = []
                logger.debug(f'--- 已完成推送:{count} 條數據! ----')
        if data:
            placeholders = ','.join(['%s'] * len(data[0]))
            sql = f"INSERT INTO {table_name} VALUES (NULL,{placeholders})"
            cursor.executemany(sql, data)
            conn.commit()
            logger.debug(f'--- 已完成推送:{count} 條數據! ----')
        conn.close()


if __name__ == '__main__':
    """MySQL"""
    mongo_host = '127.0.0.1'
    mongo_port = 27017
    mongo_db = 'db_name'
    mongo_collection = 'collection_name'

    """MongoDB"""
    mysql_host = '127.0.0.1'
    mysql_port = 3306
    mysql_user = 'root'
    mysql_password = '123456'
    mysql_db = 'mysql_db'

    table_description = ''  # 表描述

    mongo_to_mysql = MongoToMysql(mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port,
                                  mysql_user, mysql_password, mysql_db,table_description=table_description)


    #
    # table_name = None  # 默認為None 則MySQL中的表名跟Mongo保持一致
    # set_max_length = False # 默認為False 根據mongo中字段最大長度 加200 來設置字段的VARCHART長度 , 否則定義TEXT類型
    # batch_size = 10000   # 控制每次插入數據量的大小
    # table_description = '' # 表描述
    # mongo_to_mysql = MongoToMysql(mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port,
    #                               mysql_user, mysql_password, mysql_db,table_name,set_max_length,batch_size,table_description)

代碼使用了 PyMongo、PyMySQL 和 Loguru 等 Python 庫,并封裝了一個 MongoToMysql 類。在類的初始化時,會自動獲取 MongoDB 中字段的類型,并根據字段類型創建 MySQL 表結構。在將數據從 MongoDB 推送到 MySQL 時,還可以控制每次插入數據量的大小,以避免一次性插入大量數據造成系統崩潰或性能下降。

需要注意的是,在創建 MySQL 表結構時,如果用戶選擇了設置最大長度,則會創建 TEXT 類型的字段,否則會根據 MongoDB 中字段的最大長度加上200來設置 VARCHAR 類型的字段長度。

讀到這里,這篇“Python怎么實現將MongoDB中的數據導入到MySQL”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

正定县| 北川| 迁西县| 柘城县| 曲靖市| 华阴市| 淳化县| 贺兰县| 黔西县| 蒙城县| 连云港市| 宿迁市| 五华县| 兰坪| 祥云县| 民勤县| 甘南县| 海晏县| 海南省| 邯郸市| 嘉祥县| 徐汇区| 蓝山县| 武隆县| 九寨沟县| 林芝县| 武功县| 永平县| 水富县| 祁阳县| 逊克县| 景谷| 韶山市| 西乌珠穆沁旗| 中牟县| 左权县| 拉萨市| 桂阳县| 泾源县| 阳信县| 会东县|