91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python讀取Hive數據庫代碼怎么寫

發布時間:2023-03-01 09:54:32 來源:億速云 閱讀:142 作者:iii 欄目:開發技術

今天小編給大家分享一下Python讀取Hive數據庫代碼怎么寫的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。

實際業務讀取hive數據庫的代碼

import logging
import pandas as pd
from impala.dbapi import connect
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import time
import os
import datetime
from dateutil.relativedelta import relativedelta
from typing import Dict, List
import logging
import threading
import pandas as pd
import pickle
class HiveHelper(object):
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        logger:logging.Logger=None
        ):
        self.host = host
        self.port = port
        self.database = database
        self.auth_mechanism = auth_mechanism
        self.user = user
        self.password = password
        self.logger = logger
        self.impala_conn = None
        self.conn = None
        self.cursor = None
        self.engine = None
        self.session = None
    def create_table_code(self, file_name):
        '''創建表類代碼'''
        os.system(f'sqlacodegen {self.connection_str} > {file_name}')
        return self.conn
    def get_conn(self):
        '''創建連接或獲取連接'''
        if self.conn is None:
            engine = self.get_engine()
            self.conn = engine.connect()
        return self.conn
    def get_impala_conn(self):
        '''創建連接或獲取連接'''
        if self.impala_conn is None:
            self.impala_conn = connect(
                host=self.host,
                port=self.port,
                database=self.database,
                auth_mechanism=self.auth_mechanism,
                user=self.user,
                password=self.password
                )
        return self.impala_conn
    def get_engine(self):
        '''創建連接或獲取連接'''
        if self.engine is None:
            self.engine = sqlalchemy.create_engine('impala://', creator=self.get_impala_conn)
        return self.engine
    def get_cursor(self):
        '''創建連接或獲取連接'''
        if self.cursor is None:
            self.cursor = self.conn.cursor()
        return self.cursor
    def get_session(self) -> sessionmaker:
        '''創建連接或獲取連接'''
        if self.session is None:
            engine = self.get_engine()
            Session = sessionmaker(bind=engine)
            self.session = Session()
        return self.session
    def close_conn(self):
        '''關閉連接'''
        if self.conn is not None:
            self.conn.close()
            self.conn = None
        self.dispose_engine()
        self.close_impala_conn()
    def close_impala_conn(self):
        '''關閉impala連接'''
        if self.impala_conn is not None:
            self.impala_conn.close()
            self.impala_conn = None
    def close_session(self):
        '''關閉連接'''
        if self.session is not None:
            self.session.close()
            self.session = None
        self.dispose_engine()
    def dispose_engine(self):
        '''釋放engine'''
        if self.engine is not None:
            # self.engine.dispose(close=False)
            self.engine.dispose()
            self.engine = None
    def close_cursor(self):
        '''關閉cursor'''
        if self.cursor is not None:
            self.cursor.close()
            self.cursor = None
    def get_data(self, sql, auto_close=True) -> pd.DataFrame:
        '''查詢數據'''
        conn = self.get_conn()
        data = None
        try:
            # 異常重試3次
            for i in range(3):
                try:
                    data = pd.read_sql(sql, conn)
                    break
                except Exception as ex:
                    if i == 2:
                        raise ex # 往外拋出異常
                    time.sleep(60) # 一分鐘后重試
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            if auto_close:
                self.close_conn()
        return data
pass
class VarsHelper():
    def __init__(self, save_dir, auto_save=True):
        self.save_dir = save_dir
        self.auto_save = auto_save
        self.values = {}
        if not os.path.exists(os.path.dirname(self.save_dir)):
            os.makedirs(os.path.dirname(self.save_dir))
        if os.path.exists(self.save_dir):
            with open(self.save_dir, 'rb') as f:
                self.values = pickle.load(f)
                f.close()
    def set_value(self, key, value):
        self.values[key] = value
        if self.auto_save:
            self.save_file()
    def get_value(self, key):
        return self.values[key]
    def has_key(self, key):
        return key in self.values.keys()
    def save_file(self):
        with open(self.save_dir, 'wb') as f:
            pickle.dump(self.values, f)
            f.close()
pass
class GlobalShareArgs():
    args = {
        "debug": False
    }
    def get_args():
        return GlobalShareArgs.args
    def set_args(args):
        GlobalShareArgs.args = args
    def set_args_value(key, value):
        GlobalShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return GlobalShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in GlobalShareArgs.args.keys()
    def update(args):
        GlobalShareArgs.args.update(args)
pass
class ShareArgs():
    args = {
        "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 標簽目錄
        "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚類導出標簽目錄
        "common_datas_dir":"./hjx/data", # 共用數據目錄。ur_bi_dw的公共
        "only_predict": False, # 只識別,不訓練
        "delete_model": True, # 先刪除模型,僅在訓練時使用
        "export_excel": False, # 導出excel
        "classes": 12, # 聚類數
        "batch_size": 16,
        "hidden_size": 32,
        "max_nrof_epochs": 100,
        "learning_rate": 0.0005,
        "loss_type": "categorical_crossentropy",
        "avg_model_num": 10,
        "steps_per_epoch": 4.0, # 4.0
        "lr_callback_patience": 4, 
        "lr_callback_cooldown": 1,
        "early_stopping_callback_patience": 6,
        "get_data": True,
    }
    def get_args():
        return ShareArgs.args
    def set_args(args):
        ShareArgs.args = args
    def set_args_value(key, value):
        ShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return ShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in ShareArgs.args.keys()
    def update(args):
        ShareArgs.args.update(args)
pass
class UrBiGetDatasBase():
    # 線程鎖列表,同保存路徑共用鎖
    lock_dict:Dict[str, threading.Lock] = {}
    # 時間列表,用于判斷是否超時
    time_dict:Dict[str, datetime.datetime] = {}
    # 用于記錄是否需要更新超時時間
    get_data_timeout_dict:Dict[str, bool] = {}
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        save_dir=None,
        logger:logging.Logger=None,
        ):
        self.save_dir = save_dir
        self.logger = logger
        self.db_helper = HiveHelper(
            host=host,
            port=port,
            database=database,
            auth_mechanism=auth_mechanism,
            user=user,
            password=password,
            logger=logger
            )
        # 創建子目錄
        if self.save_dir is not None and not os.path.exists(self.save_dir):
            os.makedirs(self.save_dir)
        self.vars_helper = None
        if GlobalShareArgs.get_args_value('debug'):
            self.vars_helper = VarsHelper('./hjx/data/vars/UrBiGetDatas') 
    def close(self):
        '''關閉連接'''
        self.db_helper.close_conn()
    def get_last_time(self, key_name) -> bool:
        '''獲取是否超時'''
        # 轉靜態路徑,確保唯一性
        key_name = os.path.abspath(key_name)
        if self.vars_helper is not None and self.vars_helper.has_key('UrBiGetDatasBase.time_list'):
            UrBiGetDatasBase.time_dict = self.vars_helper.get_value('UrBiGetDatasBase.time_list')
        timeout = 12 # 12小時
        if GlobalShareArgs.get_args_value('debug'):
            timeout = 24 # 24小時
        get_data_timeout = False
        if key_name not in UrBiGetDatasBase.time_dict.keys() or (datetime.datetime.today() - UrBiGetDatasBase.time_dict[key_name]).total_seconds()>(timeout*60*60):
            self.logger.info('超時%d小時,重新查數據:%s', timeout, key_name)
            # UrBiGetDatasBase.time_list[key_name] = datetime.datetime.today()
            get_data_timeout = True
        else:
            self.logger.info('未超時%d小時,跳過查數據:%s', timeout, key_name)
        # if self.vars_helper is not None :
        #     self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_list)
        UrBiGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout
        return get_data_timeout
    def save_last_time(self, key_name):
        '''更新狀態超時'''
        # 轉靜態路徑,確保唯一性
        key_name = os.path.abspath(key_name)
        if UrBiGetDatasBase.get_data_timeout_dict[key_name]:
            UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()
        if self.vars_helper is not None :
            UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()
            self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_dict)
    def get_lock(self, key_name) -> threading.Lock:
        '''獲取鎖'''
        # 轉靜態路徑,確保唯一性
        key_name = os.path.abspath(key_name)
        if key_name not in UrBiGetDatasBase.lock_dict.keys():
            UrBiGetDatasBase.lock_dict[key_name] = threading.Lock()
        return UrBiGetDatasBase.lock_dict[key_name]
    def get_data_of_date(
        self,
        save_dir,
        sql,
        sort_columns:List[str],
        del_index_list=[-1], # 刪除最后下標
        start_date = datetime.datetime(2017, 1, 1), # 開始時間
        offset = relativedelta(months=3), # 時間間隔
        date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查詢語句中替代時間參數的格式化
        filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查詢語句中替代時間參數的格式化
        stop_date = '20700101', # 超過時間則停止
        data_format_fun = None, # 格式化數據
        ):
        '''分時間增量讀取數據'''
        # 創建文件夾
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        else:
            #刪除最后一個文件
            file_list = os.listdir(save_dir)
            if len(file_list)>0:
                file_list.sort()
                for del_index in del_index_list:
                    os.remove(os.path.join(save_dir,file_list[del_index]))
                    print('刪除最后一個文件:', file_list[del_index])
        select_index = -1
        # start_date = datetime.datetime(2017, 1, 1)
        while True:
            end_date = start_date + offset
            start_date_str = date_format_fun(start_date)
            end_date_str = date_format_fun(end_date)
            self.logger.info('date: %s-%s', start_date_str, end_date_str)
            file_path = os.path.join(save_dir, filename_format_fun(start_date))
            # self.logger.info('file_path: %s', file_path)
            if not os.path.exists(file_path):
                data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))
                if data is None:
                    break
                self.logger.info('data: %d', len(data))
                # self.logger.info('data: %d', data.columns)
                if len(data)>0:
                    select_index+=1
                    if data_format_fun is not None:
                        data = data_format_fun(data)
                    # 排序
                    data = data.sort_values(sort_columns)
                    data.to_csv(file_path)
                elif select_index!=-1:
                    break
                elif stop_date < start_date_str:
                    raise Exception("讀取數據異常,時間超出最大值!")
            start_date = end_date
pass
class UrBiGetDatas(UrBiGetDatasBase):
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        save_dir='./hjx/data/ur_bi_dw_data',
        logger:logging.Logger=None
        ):
        self.save_dir = save_dir
        self.logger = logger
        super().__init__(
            host=host,
            port=port,
            database=database,
            auth_mechanism=auth_mechanism,
            user=user,
            password=password,
            save_dir=save_dir,
            logger=logger
            )
    def get_dim_date(self):
        '''日期數據'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_date.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設置超時4小時才重新查數據
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.dim_date'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'dim_date.'+c for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values(['dim_date.date_key'])
            data.to_csv(file_path)
            # 更新超時時間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dim_shop(self):
        '''店鋪數據'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_shop.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設置超時4小時才重新查數據
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.dim_shop'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'dim_shop.'+c for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values(['dim_shop.shop_no'])
            data.to_csv(file_path)
            # 更新超時時間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dim_vip(self):
        '''會員數據'''
        sub_dir = os.path.join(self.save_dir,'vip_no')
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加鎖
        try:
            # 設置超時4小時才重新查數據
            if not self.get_last_time(sub_dir):
                return
            sql = '''SELECT dv.*, dd.date_key, dd.date_name2 
            FROM ur_bi_dw.dim_vip as dv
            INNER JOIN ur_bi_dw.dim_date as dd
            ON dv.card_create_date=dd.date_name2 
            where dd.date_key >= %s
            and dd.date_key < %s'''
            # data:pd.DataFrame = self.db_helper.get_data(sql)
            sort_columns = ['dv.vip_no']
            # TODO:
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                start_date=datetime.datetime(2017, 1, 1), # 開始時間
                offset=relativedelta(years=1)
            )
            # 更新超時時間
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_weather(self):
        '''天氣數據'''
        sub_dir = os.path.join(self.save_dir,'weather')
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加鎖
        try:
            # 設置超時4小時才重新查數據
            if not self.get_last_time(sub_dir):
                return
            sql = """
            select weather.* from ur_bi_ods.ods_base_weather_data_1200 as weather
            where weather.date_key>=%s and weather.date_key<%s
            """
            sort_columns = ['weather.date_key','weather.areaid']
            def data_format_fun(data):
                columns = list(data.columns)
                columns = {c:'weather.'+c for c in columns}
                data = data.rename(columns=columns)
                return data
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                del_index_list=[-2, -1], # 刪除最后下標
                data_format_fun=data_format_fun,
            )
            # 更新超時時間
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_weather_city(self):
        '''天氣城市數據'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.weather_city.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設置超時4小時才重新查數據
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.dim_weather_city as weather_city'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'weather_city.'+c for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path)
            # 更新超時時間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dim_goods(self):
        '''貨品數據'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設置超時4小時才重新查數據
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.dim_goods'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'dim_goods.'+c for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path)
            # 更新超時時間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dim_goods_market_shop_date(self):
        '''店鋪商品生命周期數據'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods_market_shop_date.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設置超時4小時才重新查數據
            if not self.get_last_time(file_path):
                return
            # sql = 'SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date'
            sql = '''
            select shop_no, sku_no, shop_market_date, lifecycle_end_date, lifecycle_days
            FROM ur_bi_dw.dim_goods_market_shop_date
            where lifecycle_end_date is not null
            '''
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('lifecycle_end_date.','') for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values(['shop_market_date'])
            data.to_csv(file_path, index=False)
            # 更新超時時間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dim_goods_market_date(self):
        '''全國商品生命周期數據'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods_market_date.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設置超時4小時才重新查數據
            if not self.get_last_time(file_path):
                return
            sql = '''
            select * FROM ur_bi_dw.dim_goods_market_date
            '''
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'dim_goods_market_date.'+c for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values(['dim_goods_market_date.sku_no'])
            data.to_csv(file_path, index=False)
            # 更新超時時間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dim_goods_color_dev_sizes(self):
        '''商品開發碼數數據'''
        file_path = os.path.join(self.save_dir,'dim_goods_color_dev_sizes.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設置超時4小時才重新查數據
            if not self.get_last_time(file_path):
                return
            # sql = 'SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date'
            sql = 'SELECT * FROM ur_bi_dm.dim_goods_color_dev_sizes'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('dim_goods_color_dev_sizes.','') for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超時時間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dwd_daily_sales_size(self):
        '''實際銷售金額'''
        sub_dir = os.path.join(self.save_dir,'dwd_daily_sales_size_all')
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加鎖
        try:
            # 設置超時4小時才重新查數據
            if not self.get_last_time(sub_dir):
                return
            sql = """
            select shop_no,sku_no,date_key,`size`,
                sum(tag_price) as `tag_price`,
                sum(sales_qty) as `sales_qty`,
                sum(sales_tag_amt) as `sales_tag_amt`,
                sum(sales_amt) as `sales_amt`,
                count(0) as `sales_count`
            from ur_bi_dw.dwd_daily_sales_size as sales
            where sales.date_key>=%s and sales.date_key<%s
                and sales.currency_code='CNY'
            group by shop_no,sku_no,date_key,`size`
            """
            sort_columns = ['date_key','shop_no','sku_no']
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                start_date=datetime.datetime(2017, 1, 1), # 開始時間
            )
            # 更新超時時間
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dwd_daily_delivery_size(self):
        '''實際配貨金額'''
        sub_dir = os.path.join(self.save_dir,'dwd_daily_delivery_size_all')
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加鎖
        try:
            # 設置超時4小時才重新查數據
            if not self.get_last_time(sub_dir):
                return
            sql = """
            select shop_no,sku_no,date_key,`size`,
                sum(delivery.shop_distr_received_qty) as `shop_distr_received_qty`,
                sum(delivery.shop_distr_received_amt) as `shop_distr_received_amt`,
                sum(delivery.online_distr_received_qty) as `online_distr_received_qty`,
                sum(delivery.online_distr_received_amt) as `online_distr_received_amt`,
                sum(delivery.pr_received_qty) as `pr_received_qty`,
                count(0) as `delivery_count`
            from ur_bi_dw.dwd_daily_delivery_size as delivery
            where delivery.date_key>=%s and delivery.date_key<%s
                and delivery.currency_code='CNY'
            group by shop_no,sku_no,date_key,`size`
            """
            sort_columns = ['date_key','shop_no','sku_no']
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                start_date=datetime.datetime(2017, 1, 1), # 開始時間
            )
            # 更新超時時間
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_v_last_nation_sales_status(self):
        '''商品暢滯銷數據'''
        file_path = os.path.join(self.save_dir,'v_last_nation_sales_status.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設置超時4小時才重新查數據
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.v_last_nation_sales_status'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('v_last_nation_sales_status.','') for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超時時間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dwd_daily_finacial_goods(self):
        '''商品成本價數據'''
        file_path = os.path.join(self.save_dir,'dwd_daily_finacial_goods.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設置超時4小時才重新查數據
            if not self.get_last_time(file_path):
                return
            sql = """
            select t1.sku_no,t1.`size`,t1.cost_tax_incl from ur_bi_dw.dwd_daily_finacial_goods as t1
            inner join (
                select sku_no,`size`,max(date_key) as date_key
                from ur_bi_dw.dwd_daily_finacial_goods
                where currency_code='CNY' and country_code='CN'
                group by sku_no,`size`
            ) as t2
            on t2.sku_no=t1.sku_no
                and t2.`size`=t1.`size`
                and t2.date_key=t1.date_key
            where t1.currency_code='CNY' and t1.country_code='CN'
            """
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('t1.','') for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超時時間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dim_size_group(self):
        '''尺碼映射數據'''
        file_path = os.path.join(self.save_dir,'dim_size_group.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設置超時4小時才重新查數據
            if not self.get_last_time(file_path):
                return
            sql = """select * from ur_bi_dw.dim_size_group"""
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('dim_size_group.','') for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超時時間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
pass
def get_common_datas(
    host='10.2.32.22',
    port=21051,
    database='ur_ai_dw',
    auth_mechanism='LDAP',
    user='urbi',
    password='Ur#730xd',
    logger:logging.Logger=None):
    # 共用文件
    common_datas_dir = ShareArgs.get_args_value('common_datas_dir')
    common_ur_bi_dir = os.path.join(common_datas_dir, 'ur_bi_data')
    ur_bi_get_datas = UrBiGetDatas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        save_dir=common_ur_bi_dir,
        logger=logger
    )
    try:
        logger.info('正在查詢日期數據...')
        ur_bi_get_datas.get_dim_date()
        logger.info('查詢日期數據完成!')
        logger.info('正在查詢店鋪數據...')
        ur_bi_get_datas.get_dim_shop()
        logger.info('查詢店鋪數據完成!')
        logger.info('正在查詢天氣數據...')
        ur_bi_get_datas.get_weather()
        logger.info('查詢天氣數據完成!')
        logger.info('正在查詢天氣城市數據...')
        ur_bi_get_datas.get_weather_city()
        logger.info('查詢天氣城市數據完成!')
        logger.info('正在查詢貨品數據...')
        ur_bi_get_datas.get_dim_goods()
        logger.info('查詢貨品數據完成!')
        logger.info('正在查詢實際銷量數據...')
        ur_bi_get_datas.get_dwd_daily_sales_size()
        logger.info('查詢實際銷量數據完成!')
    except Exception as ex:
        logger.exception(ex)
        raise ex # 往外拋出異常
    finally:
        ur_bi_get_datas.close()
pass
class CustomUrBiGetDatas(UrBiGetDatasBase):
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        save_dir='./hjx/data/ur_bi_data',
        logger:logging.Logger=None
        ):
        self.save_dir = save_dir
        self.logger = logger
        super().__init__(
            host=host,
            port=port,
            database=database,
            auth_mechanism=auth_mechanism,
            user=user,
            password=password,
            save_dir=save_dir,
            logger=logger
            )
    def get_sales_goal_amt(self):
        '''銷售目標金額'''
        file_path = os.path.join(self.save_dir,'month_of_year_sales_goal_amt.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設置超時4小時才重新查數據
            if not self.get_last_time(file_path):
                return
            sql = '''
            select sales_goal.shop_no,
                if(sales_goal.serial='Y','W',sales_goal.serial) as `sales_goal.serial`,
                dates.month_of_year,
                sum(sales_goal.sales_goal_amt) as sales_goal_amt
            from ur_bi_dw.dwd_sales_goal_west as sales_goal
            inner join ur_bi_dw.dim_date as dates
                on sales_goal.date_key = dates.date_key
            group by sales_goal.shop_no,
                if(sales_goal.serial='Y','W',sales_goal.serial),
                dates.month_of_year
            '''
            data:pd.DataFrame = self.db_helper.get_data(sql)
            data = data.rename(columns={
                'shop_no':'sales_goal.shop_no',
                'serial':'sales_goal.serial',
                'month_of_year':'dates.month_of_year',
            })
            # 排序
            data = data.sort_values(['sales_goal.shop_no','sales_goal.serial','dates.month_of_year'])
            data.to_csv(file_path)
            # 更新超時時間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_shop_serial_area(self):
        '''店-系列面積'''
        file_path = os.path.join(self.save_dir,'shop_serial_area.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設置超時4小時才重新查數據
            if not self.get_last_time(file_path):
                return
            sql = '''
            select shop_serial_area.shop_no,
                if(shop_serial_area.serial='Y','W',shop_serial_area.serial) as `shop_serial_area.serial`,
                shop_serial_area.month_of_year,
                sum(shop_serial_area.area) as `shop_serial_area.area`
            from ur_bi_dw.dwd_shop_serial_area as shop_serial_area
            where shop_serial_area.area is not null
            group by shop_serial_area.shop_no,if(shop_serial_area.serial='Y','W',shop_serial_area.serial),shop_serial_area.month_of_year
            '''
            data:pd.DataFrame = self.db_helper.get_data(sql)
            data = data.rename(columns={
                'shop_no':'shop_serial_area.shop_no',
                'serial':'shop_serial_area.serial',
                'month_of_year':'shop_serial_area.month_of_year',
                'area':'shop_serial_area.area',
            })
            # 排序
            data = data.sort_values(['shop_serial_area.shop_no','shop_serial_area.serial','shop_serial_area.month_of_year'])
            data.to_csv(file_path)
            # 更新超時時間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
pass
def get_datas(
    host='10.2.32.22',
    port=21051,
    database='ur_ai_dw',
    auth_mechanism='LDAP',
    user='urbi',
    password='Ur#730xd',
    save_dir='./data/sales_forecast/ur_bi_dw_data',
    logger:logging.Logger=None):
    ur_bi_get_datas = CustomUrBiGetDatas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
    try:
        # 店,系列,品類,年月,銷售目標金額
        logger.info('正在查詢年月銷售目標金額數據...')
        ur_bi_get_datas.get_sales_goal_amt()
        logger.info('查詢年月銷售目標金額數據完成!')
    except Exception as ex:
        logger.exception(ex)
        raise ex # 往外拋出異常
    finally:
        ur_bi_get_datas.close()
pass
def getdata_ur_bi_dw(
    host='10.2.32.22',
    port=21051,
    database='ur_ai_dw',
    auth_mechanism='LDAP',
    user='urbi',
    password='Ur#730xd',
    save_dir='./data/sales_forecast/ur_bi_dw_data',
    logger=None
):
    get_common_datas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        logger=logger
    )
    get_datas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
pass
# 代碼入口
# getdata_ur_bi_dw(
#     host=ur_bi_dw_host,
#     port=ur_bi_dw_port,
#     database=ur_bi_dw_database,
#     auth_mechanism=ur_bi_dw_auth_mechanism,
#     user=ur_bi_dw_user,
#     password=ur_bi_dw_password,
#     save_dir=ur_bi_dw_save_dir,
#     logger=logger
#     )

代碼說明和領悟

每個類的具體作用說明,代碼需要根據下面的文字說明進行“食用”:

(第一層)HiveHelper完成了連接數據庫、關閉數據庫連接、生成事務、執行、引擎、連接等功能

VarsHelper提供了一個簡單的持久化功能,可以將對象以文件的形式存放在磁盤上。并提供設置值、獲取值、判斷值是否存在的方法

GlobalShareArgs提供了一個字典,并且提供了獲取字典、設置字典、設置字典鍵值對、設置字典鍵的值、判斷鍵是否在字典中、更新字典等方法

ShareArgs跟GlobalShareArgs類似,只是一開始字典的初始化的鍵值對比較多

(第二層)UrBiGetDataBase類,提供了線程鎖字典、時間字典、超時判斷字典,都是類變量;使用了HiveHelper類,但注意,不是繼承。在具體的sql讀數時,提供了線程固定和時間判斷

(第三層)UrBiGetDatas類,獲取hive數據庫那邊的日期數據、店鋪數據、會員數據、天氣數據、天氣城市數據、商品數據、店鋪生命周期數據、全國商品生命周期數據、商品開發碼數數據、實際銷售金額、實際配貨金額、商品暢滯銷數據、商品成本價數據、尺碼映射數據等。

(第四層)get_common_data函數,使用URBiGetData類讀取日期、店鋪、天氣、天氣城市、貨品、實際銷量數據,并緩存到文件夾./yongjian/data/ur_bi_data下面

CustomUrBiGetData類,繼承了UrBiGetDatasBase類,讀取銷售目標金額、點系列面積數據。

(這個也是第四層)get_datas函數,通過CustomUrBiGetData類,讀取年月銷售目標金額。

總的函數:(這個是總的調用入口函數)get_data_ur_bi_dw函數,調用了get_common_data和get_datas函數進行讀取數據,然后將數據保存到某個文件夾目錄下面。

舉一反三,如果你不是hive數據庫,你可以將第一層這個底層更換成mysql。主頁有解釋如果進行更換。第二層不需要改變,第三層就是你想要進行讀取的數據表,不同的數據庫你想要讀取的數據表也不同,所以sql需要你在這里寫,套用里面的方法即可,基本上就是修改sql就好了。

這種方法的好處在于,數據不會重復讀取,并且讀取的數據都可以得到高效的使用。

后續附上修改成mysql的一個例子代碼

import logging
import pandas as pd
from impala.dbapi import connect
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import time
import os
import datetime
from dateutil.relativedelta import relativedelta
from typing import Dict, List
import logging
import threading
import pandas as pd
import pickle
class MySqlHelper(object):
    def __init__(
        self,
        host='192.168.15.144',
        port=3306,
        database='test_ims',
        user='spkjz_writer',
        password='7cmoP3QDtueVJQj2q4Az',
        logger:logging.Logger=None
        ):
        self.host = host
        self.port = port
        self.database = database
        self.user = user
        self.password = password
        self.logger = logger
        self.connection_str = 'mysql+pymysql://%s:%s@%s:%d/%s' %(
            self.user, self.password, self.host, self.port, self.database
        )
        self.conn = None
        self.cursor = None
        self.engine = None
        self.session = None
    def create_table_code(self, file_name):
        '''創建表類代碼'''
        os.system(f'sqlacodegen {self.connection_str} > {file_name}')
        return self.conn
    def get_conn(self):
        '''創建連接或獲取連接'''
        if self.conn is None:
            engine = self.get_engine()
            self.conn = engine.connect()
        return self.conn
    def get_engine(self):
        '''創建連接或獲取連接'''
        if self.engine is None:
            self.engine = sqlalchemy.create_engine(self.connection_str)
        return self.engine
    def get_cursor(self):
        '''創建連接或獲取連接'''
        if self.cursor is None:
            self.cursor = self.conn.cursor()
        return self.cursor
    def get_session(self) -> sessionmaker:
        '''創建連接或獲取連接'''
        if self.session is None:
            engine = self.get_engine()
            Session = sessionmaker(bind=engine)
            self.session = Session()
        return self.session
    def close_conn(self):
        '''關閉連接'''
        if self.conn is not None:
            self.conn.close()
            self.conn = None
        self.dispose_engine()
    def close_session(self):
        '''關閉連接'''
        if self.session is not None:
            self.session.close()
            self.session = None
        self.dispose_engine()
    def dispose_engine(self):
        '''釋放engine'''
        if self.engine is not None:
            # self.engine.dispose(close=False)
            self.engine.dispose()
            self.engine = None
    def close_cursor(self):
        '''關閉cursor'''
        if self.cursor is not None:
            self.cursor.close()
            self.cursor = None
    def get_data(self, sql, auto_close=True) -> pd.DataFrame:
        '''查詢數據'''
        conn = self.get_conn()
        data = None
        try:
            # 異常重試3次
            for i in range(3):
                try:
                    data = pd.read_sql(sql, conn)
                    break
                except Exception as ex:
                    if i == 2:
                        raise ex # 往外拋出異常
                    time.sleep(60) # 一分鐘后重試
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            if auto_close:
                self.close_conn()
        return data
pass
class VarsHelper():
    def __init__(self, save_dir, auto_save=True):
        self.save_dir = save_dir
        self.auto_save = auto_save
        self.values = {}
        if not os.path.exists(os.path.dirname(self.save_dir)):
            os.makedirs(os.path.dirname(self.save_dir))
        if os.path.exists(self.save_dir):
            with open(self.save_dir, 'rb') as f:
                self.values = pickle.load(f)
                f.close()
    def set_value(self, key, value):
        self.values[key] = value
        if self.auto_save:
            self.save_file()
    def get_value(self, key):
        return self.values[key]
    def has_key(self, key):
        return key in self.values.keys()
    def save_file(self):
        with open(self.save_dir, 'wb') as f:
            pickle.dump(self.values, f)
            f.close()
pass
class GlobalShareArgs():
    args = {
        "debug": False
    }
    def get_args():
        return GlobalShareArgs.args
    def set_args(args):
        GlobalShareArgs.args = args
    def set_args_value(key, value):
        GlobalShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return GlobalShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in GlobalShareArgs.args.keys()
    def update(args):
        GlobalShareArgs.args.update(args)
pass
class ShareArgs():
    args = {
        "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 標簽目錄
        "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚類導出標簽目錄
        "common_datas_dir":"./hjx/data", # 共用數據目錄。ur_bi_dw的公共
        "only_predict": False, # 只識別,不訓練
        "delete_model": True, # 先刪除模型,僅在訓練時使用
        "export_excel": False, # 導出excel
        "classes": 12, # 聚類數
        "batch_size": 16,
        "hidden_size": 32,
        "max_nrof_epochs": 100,
        "learning_rate": 0.0005,
        "loss_type": "categorical_crossentropy",
        "avg_model_num": 10,
        "steps_per_epoch": 4.0, # 4.0
        "lr_callback_patience": 4, 
        "lr_callback_cooldown": 1,
        "early_stopping_callback_patience": 6,
        "get_data": True,
    }
    def get_args():
        return ShareArgs.args
    def set_args(args):
        ShareArgs.args = args
    def set_args_value(key, value):
        ShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return ShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in ShareArgs.args.keys()
    def update(args):
        ShareArgs.args.update(args)
pass
class IMSGetDatasBase():
    # 線程鎖列表,同保存路徑共用鎖
    lock_dict:Dict[str, threading.Lock] = {}
    # 時間列表,用于判斷是否超時
    time_dict:Dict[str, datetime.datetime] = {}
    # 用于記錄是否需要更新超時時間
    get_data_timeout_dict:Dict[str, bool] = {}
    def __init__(
        self,
        host='192.168.15.144',
        port=3306,
        database='test_ims',
        user='spkjz_writer',
        password='Ur#7cmoP3QDtueVJQj2q4Az',
        save_dir=None,
        logger:logging.Logger=None,
        ):
        self.save_dir = save_dir
        self.logger = logger
        self.db_helper = MySqlHelper(
            host=host,
            port=port,
            database=database,
            user=user,
            password=password,
            logger=logger
            )
        # 創建子目錄
        if self.save_dir is not None and not os.path.exists(self.save_dir):
            os.makedirs(self.save_dir)
        self.vars_helper = None
        if GlobalShareArgs.get_args_value('debug'):
            self.vars_helper = VarsHelper('./hjx/data/vars/IMSGetDatas') # 把超時時間保存到文件,注釋該行即可停掉,只用于調試
    def close(self):
        '''關閉連接'''
        self.db_helper.close_conn()
    def get_last_time(self, key_name) -> bool:
        '''獲取是否超時'''
        # 轉靜態路徑,確保唯一性
        key_name = os.path.abspath(key_name)
        if self.vars_helper is not None and self.vars_helper.has_key('IMSGetDatasBase.time_list'):
            IMSGetDatasBase.time_dict = self.vars_helper.get_value('IMSGetDatasBase.time_list')
        timeout = 12 # 12小時
        if GlobalShareArgs.get_args_value('debug'):
            timeout = 24 # 24小時
        get_data_timeout = False
        if key_name not in IMSGetDatasBase.time_dict.keys() or (datetime.datetime.today() - IMSGetDatasBase.time_dict[key_name]).total_seconds()>(4*60*60):
            self.logger.info('超時%d小時,重新查數據:%s', timeout, key_name)
            # IMSGetDatasBase.time_list[key_name] = datetime.datetime.today()
            get_data_timeout = True
        else:
            self.logger.info('未超時%d小時,跳過查數據:%s', timeout, key_name)
        # if self.vars_helper is not None :
        #     self.vars_helper.set_value('IMSGetDatasBase.time_list', IMSGetDatasBase.time_list)
        IMSGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout
        return get_data_timeout
    def save_last_time(self, key_name):
        '''更新狀態超時'''
        # 轉靜態路徑,確保唯一性
        key_name = os.path.abspath(key_name)
        if IMSGetDatasBase.get_data_timeout_dict[key_name]:
            IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()
        if self.vars_helper is not None :
            IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()
            self.vars_helper.set_value('IMSGetDatasBase.time_list', IMSGetDatasBase.time_dict)
    def get_lock(self, key_name) -> threading.Lock:
        '''獲取鎖'''
        # 轉靜態路徑,確保唯一性
        key_name = os.path.abspath(key_name)
        if key_name not in IMSGetDatasBase.lock_dict.keys():
            IMSGetDatasBase.lock_dict[key_name] = threading.Lock()
        return IMSGetDatasBase.lock_dict[key_name]
    def get_data_of_date(
        self,
        save_dir,
        sql,
        sort_columns:List[str],
        del_index_list=[-1], # 刪除最后下標
        start_date = datetime.datetime(2017, 1, 1), # 開始時間
        offset = relativedelta(months=3), # 時間間隔
        date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查詢語句中替代時間參數的格式化
        filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查詢語句中替代時間參數的格式化
        stop_date = '20700101', # 超過時間則停止
        ):
        '''分時間增量讀取數據'''
        # 創建文件夾
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        else:
            #刪除最后一個文件
            file_list = os.listdir(save_dir)
            if len(file_list)>0:
                file_list.sort()
                for del_index in del_index_list:
                    os.remove(os.path.join(save_dir,file_list[del_index]))
                    print('刪除最后一個文件:', file_list[del_index])
        select_index = -1
        # start_date = datetime.datetime(2017, 1, 1)
        while True:
            end_date = start_date + offset
            start_date_str = date_format_fun(start_date)
            end_date_str = date_format_fun(end_date)
            self.logger.info('date: %s-%s', start_date_str, end_date_str)
            file_path = os.path.join(save_dir, filename_format_fun(start_date))
            # self.logger.info('file_path: %s', file_path)
            if not os.path.exists(file_path):
                data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))
                if data is None:
                    break
                self.logger.info('data: %d', len(data))
                # self.logger.info('data: %d', data.columns)
                if len(data)>0:
                    select_index+=1
                    # 排序
                    data = data.sort_values(sort_columns)
                    data.to_csv(file_path)
                elif select_index!=-1:
                    break
                elif stop_date < start_date_str:
                    raise Exception("讀取數據異常,時間超出最大值!")
            start_date = end_date
pass
class CustomIMSGetDatas(IMSGetDatasBase):
    def __init__(
        self,
        host='192.168.13.134',
        port=4000,
        database='test_ims',
        user='root',
        password='rootimmsadmin',
        save_dir='./hjx/data/export_ims_data',
        logger:logging.Logger=None
        ):
        self.save_dir = save_dir
        self.logger = logger
        super().__init__(
            host=host,
            port=port,
            database=database,
            user=user,
            password=password,
            save_dir=save_dir,
            logger=logger
            )
    def get_ims_w_amt_pro(self):
        '''年月系列占比數據'''
        file_path = os.path.join(self.save_dir,'ims_w_amt_pro.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設置超時4小時才重新查數據
            # if not self.get_last_time(file_path):
            #     return
            sql = 'SELECT * FROM ims_w_amt_pro'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            data = data.rename(columns={
                'serial_forecast_proportion': 'forecast_proportion',
            })
            data.to_csv(file_path)
            # # 更新超時時間
            # self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
pass
def get_datas(
    host='192.168.13.134',
    port=4000,
    database='test_ims',
    user='root',
    password='rootimmsadmin',
    save_dir='./hjx/data/export_ims_data',
    logger:logging.Logger=None
    ):
    ur_bi_get_datas = CustomIMSGetDatas(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
    try:
        # 年月系列占比數據
        logger.info('正在查詢年月系列占比數據...')
        ur_bi_get_datas.get_ims_w_amt_pro()
        logger.info('查詢年月系列占比數據完成!')
    except Exception as ex:
        logger.exception(ex)
        raise ex # 往外拋出異常
    finally:
        ur_bi_get_datas.close()
pass
def getdata_export_ims(
    host='192.168.13.134',
    port=4000,
    database='test_ims',
    user='root',
    password='rootimmsadmin',
    save_dir='./hjx/data/export_ims_data',
    logger:logging.Logger=None
    ):
    get_datas(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
pass

以上就是“Python讀取Hive數據庫代碼怎么寫”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

通河县| 木里| 石台县| 花莲市| 柯坪县| 镇坪县| 离岛区| 廉江市| 西林县| 九台市| 大理市| 讷河市| 南平市| 武穴市| 图木舒克市| 高唐县| 弥勒县| 香格里拉县| 苏尼特右旗| 中方县| 凌源市| 林口县| 怀集县| 龙山县| 光泽县| 商都县| 收藏| 乌拉特前旗| 济南市| 盘山县| 五大连池市| 讷河市| 金坛市| 时尚| 秭归县| 宁安市| 丰都县| 调兵山市| 彭阳县| 霍林郭勒市| 犍为县|