您好,登錄后才能下訂單哦!
項目需求:將kafka解析來的日志獲取到數據庫的變更記錄,按照訂單的級別和訂單明細級別寫入數據庫,一條訂單的所有信息包括各種維度信息均保存在一條json中,寫入mysql5.7中。
配置信息:
[Global] kafka_server=xxxxxxxxxxx:9092 kafka_topic=mes consumer_group=test100 passwd = tracking port = 3306 host = xxxxxxxxxx user = track schema = track dd_socket = dd_host = xxxxxxxxxxxx dd_port = 3306 dd_user = xxxxxxxxx dd_passwd = xxxxxxxx
代碼又長又丑,半吊子,只完成了面向過程的編程,沒做到對象,將就看,有問題可以聯系我
代碼:
#encoding=utf-8 import datetime import configparser import re import pymysql from vertica_python import connect import vertica_python import json from confluent_kafka import Consumer, KafkaError import csv import logging import os import time import signal import sys #寫日志 logging.basicConfig(filename=os.path.join(os.getcwd(), 'log_tracking.txt'), level=logging.WARN, filemode='a',format='%(asctime)s - %(levelname)s: %(message)s') def writeErrorLog(errSrc, errType, errMsg): try: v_log_file = 'err_tracking.log'; v_file = open(v_log_file, 'a') v_file.write(datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d %H:%M:%S") + " - " + errSrc + " - " + errType +" : " + errMsg + '\n') v_file.flush() except Exception as data: v_err_file = open('err_tracking.log', 'a') v_err_file.write(str(data) + '\n') v_err_file.write(datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d %H:%M:%S") + " - " + errSrc + " - " + errType + " : " + errMsg + '\n') v_err_file.flush() v_err_file.close() finally: v_file.close() class RH_Consumer: #讀取配置文件的配置信息,并初始化一些類需要的變量 def __init__(self): self.config = configparser.ConfigParser() self.config.read('config.ini') self.host = self.config.get('Global', 'host') self.user = self.config.get('Global', 'user') self.passwd = self.config.get('Global', 'passwd') self.schema = self.config.get('Global', 'schema') self.port = int(self.config.get('Global', 'port')) self.kafka_server = self.config.get('Global', 'kafka_server') self.kafka_topic = self.config.get('Global', 'kafka_topic') self.consumer_group = self.config.get('Global', 'consumer_group') self.dd_host = self.config.get('Global', 'dd_host') self.dd_user = self.config.get('Global', 'dd_user') self.dd_passwd = self.config.get('Global', 'dd_passwd') self.dd_port = int(self.config.get('Global', 'dd_port')) self.dd_socket = self.config.get('Global', 'dd_socket') self.operation_time = datetime.datetime.now() self.stop_flag = 0 self.src_table_name = [] self.__init_db() self.__init_mes_db() self._get_all_src_table() #連接寫入目標數據庫 def __init_db(self): try: self.conn_info = {'host': self.host,'port': self.port,'user': self.user,'password': self.passwd,'db': 'tracking'} self.mysql_db = pymysql.connect(**self.conn_info, charset="utf8" ) self.mysql_cur = self.mysql_db.cursor() except Exception as data: writeErrorLog('__init_db', 'Error', str(data)) #連接生產數據庫,用于獲取相關維度信息 def __init_mes_db(self): try: self.mes_mysql_db = pymysql.connect(host=self.dd_host, user=self.dd_user, passwd=self.dd_passwd,port=self.dd_port, unix_socket=self.dd_socket, charset="utf8") self.mes_mysql_cur = self.mes_mysql_db.cursor() except Exception as data: writeErrorLog('__init_db', 'Error', str(data)) #關閉數據庫 def _release_db(self): self.mysql_cur.close() self.mysql_db.close() self.mes_mysql_cur.close() self.mes_mysql_db.close() #獲取所有的配置表信息(需要獲取的表) def _get_all_src_table(self): try: # 獲取table的信息 select_src_table_names = "select distinct src_table_name from tracking.tracking_table_mapping_rule" self.mysql_cur.execute(select_src_table_names) rows = self.mysql_cur.fetchall() for item in rows: self.src_table_name.append(item[0]) return self.src_table_name except Exception as data: writeErrorLog('_get_all_src_table', 'Error', str(data)) logging.error('_get_all_src_table: ' + str(data)) #獲取src表的目標表信息 def _get_tgt_table_name(self,table_name,table_schema): try: # 獲取table的信息(table_name是schema|tablename) select_tgt_table_names = "select distinct tgt_table_name from tracking.tracking_table_mapping_rule where src_table_name = '%s' and src_table_schema = '%s'" %(table_name,table_schema) self.mysql_cur.execute(select_tgt_table_names) rows = self.mysql_cur.fetchall() tgt_table_names=[] for item in rows: tgt_table_names.append(item[0]) return tgt_table_names except Exception as data: writeErrorLog('_get_tgt_table_name', 'Error', str(data)) logging.error('_get_tgt_table_name: ' + str(data)) # 根據獲取到輸入的table_name,讀取表的配置信息 會以json格式返回獲取到的數據 def _get_config(self,table_name,tgt_table_name,table_schema): try: # 獲取table的信息(table_name是schema|tablename) select_table_config = "select coalesce( src_system, '' ) as src_system,coalesce ( src_table_schema, '' ) as src_table_schema,coalesce ( src_table_name, '' ) as src_table_name,coalesce ( tgt_operation, '{}' ) as tgt_operation,active_flag,coalesce ( tgt_system, '' ) as tgt_system,coalesce ( tgt_table_schema, '' ) as tgt_table_schema,coalesce ( tgt_table_name, '' ) as tgt_table_name from tracking.tracking_table_mapping_rule where src_table_name = '%s' and tgt_table_name='%s' and src_table_schema = '%s' " %(table_name,tgt_table_name,table_schema) self.mysql_cur.execute(select_table_config) rows = self.mysql_cur.fetchall() for item in rows: self.src_system = item[0] self.src_table_schema = item[1] self.src_table_name = item[2] self.tgt_operation = item[3] self.active_flag = item[4] self.tgt_system = item[5] self.tgt_table_schema = item[6] self.tgt_table_name = item[7] #解析出self.tgt_operation 中以后所需要的數據 self.tgt_operation = eval(self.tgt_operation) result_data = {'src_system':self.src_system, 'src_table_schema':self.src_table_schema, 'src_table_name':self.src_table_name, 'tgt_operation':self.tgt_operation, 'active_flag':self.active_flag, 'tgt_system': self.tgt_system, 'tgt_table_schema': self.tgt_table_schema, 'tgt_table_name': self.tgt_table_name, #解析出來的self.tgt_operation里的信息 'source_primary_key': self.tgt_operation['source_primary_key'], 'source_all_column': self.tgt_operation['source_all_column'], 'target_primary_key': self.tgt_operation['target_primary_key'], 'target_column': self.tgt_operation['target_column'], 'source_level': self.tgt_operation['source_level'] } return result_data except Exception as data: writeErrorLog('_get_config', 'Error', str(data)+':table is not available') logging.error('_get_config: ' + str(data)) #主方法的入口 def _do(self): try: #配置consumer的信息,可以配置很多其他信息 c = Consumer({ 'bootstrap.servers': self.kafka_server, 'group.id': self.consumer_group, 'default.topic.config': { 'auto.offset.reset': 'smallest', 'enable.auto.commit': False} }) #定義消費kafka中的主題 c.subscribe([self.kafka_topic]) while True: msg = c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print(msg.error()) break text = msg.value().decode(encoding="utf-8") # kfk_text = eval(text) kfk_text = json.loads(text) #此處判斷kfk數據是否在配置表中,如果在則進行下一步,如果不在則忽略 #添加異常處理目的是為了如果這條數據寫入有問題,就不commit,方便下次處理還可以繼續消費 try: kfk_table = kfk_text['table'] if kfk_table in ['order_mails'] : print(type(text),text) logging.warning('-------------- start exec table time : ' + str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))+'---------------------') kfk_text = str(kfk_text) kfk_text = kfk_text.replace(": None",": ''") kfk_text = eval(kfk_text) kfk_datas = kfk_text['data'] kfk_type = kfk_text['type'] kfk_old = kfk_text['old'] logging.warning(' table_name: '+ str(kfk_table)+ ' table_type : ' + kfk_type) if kfk_type == 'UPDATE': continue print('update') for i,data in enumerate(kfk_datas): kfk_text['data'] = eval("["+str(data)+"]") kfk_text['old'] = eval("[" + str(kfk_old[i]) + "]") self._get_rh_from_kafka(kfk_text) else: print('insert') for data in kfk_datas: kfk_text['data'] = eval("["+str(data)+"]") print(type(kfk_text), kfk_text) self._get_rh_from_kafka(kfk_text) logging.warning('----------------end exec table time : ' + str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))+'---------------') c.commit() except Exception as data: writeErrorLog('_do', 'exce data Error', str(data)) logging.error('_do: ' + str(data)) #如果停止程序 if self.stop_flag == 1: self._exit_consumer() c.close() except Exception as data: print(data) writeErrorLog('_do', 'Error', str(data)) logging.error('_do: ' + str(data)) def _trans_path(self,tgt_path): new_tgt_path=tgt_path.replace('.','\".\"').replace('$\".','$.')+'\"' return new_tgt_path #此方法用來獲取kafka中的數據, def _get_rh_from_kafka(self,kfk_text): try: # 解析獲取到的kfk中的數據流 self.kfk_tb_schema = kfk_text["database"]#schema self.kfk_tb_name = kfk_text["table"]#table_name self.kfk_data = kfk_text['data'][0]#data self.kfk_type = kfk_text['type']#數據類型type self.kfk_es = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(float(kfk_text['es'] / 1000)))#數據表更的時間 # 獲取kfk傳遞過來src表的配置信息,讀取配置表信息-----可能為空 需要添加判斷 tgt_table_names=self._get_tgt_table_name(self.kfk_tb_name,self.kfk_tb_schema) if len(tgt_table_names) != 0: for tgt_table_name_for_config in tgt_table_names: tb_config = self._get_config(self.kfk_tb_name,tgt_table_name_for_config,self.kfk_tb_schema) tgt_pk_key = tb_config['target_primary_key']#目標表的主鍵(order_no/order_item_id) tgt_schema = tb_config['tgt_table_schema']#目標表的schema tgt_table_name = tb_config['tgt_table_name']#目標表的名稱(目前只有兩個目標表tracking_order,tracking_order_item) src_table_name = tb_config['src_table_name']#源表的名稱(schema|table_name) src_table_schema = tb_config['src_table_schema'] tgt_columns = tb_config['target_column']#獲取插入到目標表中字段的配置信息(例如該表在order_info的插入路徑等配置信息) src_level = tb_config['source_level']#源表的level,目前有三種root,leaf,father src_pk_key = tb_config['source_primary_key']#源表的主鍵 src_pk_value = self.kfk_data[src_pk_key]#源表的主鍵值(從kfk中獲取到) tgt_operation=tb_config['tgt_operation']#源表的其他配置,在下面處理時候再進行解析 #處理的邏輯是,將表類型分為三類,root,leaf,father分開處理,分別處理其insert,update和delete的操作 if self.kfk_type == 'INSERT': # 判斷kfk的操作類型是INSERT,UPDATE,DELETE if src_level == 'root': # 判斷該數據是否是root表 tgt_pk_value = self.kfk_data[tgt_pk_key]#如果是root表,則獲取目標表表的主鍵的值(和src_pk_value的值相同) for item in tgt_columns: # item取值范圍:order_info、order_progress等循環插入列,按照配置分別寫入,因為可能一張表在不同列中插入位置不同 tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" #拼成如下形式,目的為了_get_data_from_kfk傳入參數,例如{"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # 將字符串轉換成dict類型 if str(tgt_columns[item]['target_path'])=='{}': logging.warning(str(item)+" is null,please check") else: tgt_path = list(tgt_columns[item]['target_path'].values())[0]#表在配置中,寫入目標表的路徑 (table_insert_data,table_insert_data_for_leaf, insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column,src_table_name,tgt_pk_value) #調用方法,返回三種格式的json,為了不同的寫入方式傳參 #調用將kfk中數據入庫的方法 self._insert_data_from_kfk_for_root(tgt_schema, tgt_table_name, tgt_pk_key, tgt_pk_value, item, table_insert_data,tgt_path)#將kfk中主數據寫入數據庫 self._insert_father_data(src_table_schema,src_table_name, insert_data, tgt_path, tgt_pk_value, item,catalog_type,tgt_table_name_for_config)#將主數據涉及到父表寫入 #子表insert思路:通過配置表找到上層關聯表的鍵值,通過鍵值到數據庫中查找到子表屬于的記錄(order_no/order_item_id)的值,從而可以確認子表的寫入的絕對路徑(拼上表名稱或者是拼上鍵對應值),然后按照路徑寫入,補全父表 elif src_level == 'leaf': # 判斷kfk的操作類型是INSERT,UPDATE,DELETE parent_pk_info=tgt_operation['parent_pk_key'] for item in tgt_columns: # item取值范圍:order_info、order_progress、order_operation、order_adjudgement tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # #拼成如下形式,目的為了_get_data_from_kfk傳入參數,例如{"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # 將字符串轉換成dict類型 if str(tgt_columns[item]['target_path'])=='{}':#因為子節點可能不會每一列都會配置寫入信息(這個是不是不判斷也可以,只要不配置即可,如果判斷,root中也需要判斷嗎?) logging.warning(str(item) + " is null,please check") else: tgt_path = list(tgt_columns[item]['target_path'].keys())[0]#獲取寫入的路徑 (table_insert_data,table_insert_data_for_leaf, insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column,src_table_name,src_pk_value) # #調用方法,返回三種格式的json,為了不同的寫入方式傳參 (parent_tgt_path, tgt_pk_value_new) = self._get_tgt_info_for_leaf(item, tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,self.kfk_data)#獲取子節點表的需要寫入的目標表的主鍵的值和上一層的寫入真實絕對路徑 tgt_path_true=parent_tgt_path+"."+src_table_name#獲取子表寫入的絕對路徑(一直到子表的表名的路徑) self._insert_data_from_kfk_for_leaf(tgt_schema, tgt_table_name, tgt_pk_key, tgt_pk_value_new, item, table_insert_data_for_leaf,tgt_path_true,src_pk_value,insert_data) # 將從kafka獲取的數據入庫 tgt_path_new=tgt_path_true+r'.\"'+src_pk_value+r'\"'#獲取子表寫入的絕對路徑(一直到子表的主鍵值的路徑) self._insert_father_data(src_table_name, insert_data, tgt_path_new, tgt_pk_value_new, item,catalog_type,tgt_table_name_for_config)#遞歸,寫入子表的父表信息 elif src_level == 'father':#針對父表數據在主表和子表數據之后產生的情況 for item in tgt_columns: # item取值范圍:order_info、order_progress、order_operation、order_adjudgement tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column例如{"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # 拼接目標列和目標列的值的信息 if str(tgt_columns[item]['target_path']) == '{}': logging.warning(str(item) + " is null,please check") else: tgt_paths = list(tgt_columns[item]['target_path'].values()) (table_insert_data, table_insert_data_for_leaf,insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column, src_table_name, src_pk_value) # 從kafka獲取的需要插入的json串 if 'product' in src_table_name.lower(): catalog_type='PRODUCT' elif 'service' in src_table_name.lower(): catalog_type='SERVICE' else: catalog_type='0' for tgt_path in tgt_paths: tgt_info_for_father = self._get_tgt_info_for_father(tgt_path, src_pk_key, src_pk_value, tgt_pk_key, tgt_schema,tgt_table_name, item,catalog_type) if len(tgt_info_for_father)==0: logging.warning('can not available the data of the root and leaf table ') else: for i in range(len(tgt_info_for_father)): tgt_pk_value_new = tgt_info_for_father[i][0] tgt_path_new = ('.'.join(tgt_info_for_father[i][1].split('.')[:-1]))[1:] self._insert_data_from_db(tgt_schema, tgt_table_name, tgt_pk_key, tgt_pk_value_new,item, tgt_path_new, insert_data) self._insert_father_data(src_table_name, insert_data, tgt_path_new,tgt_pk_value_new, item, catalog_type,tgt_table_name_for_config) elif self.kfk_type == 'UPDATE':#update處理方式 #主表update思路 :找到更新的記錄,將需要更新的字段按照配置的路徑更新(主表的路徑不存在多層),再補全父表,寫入歷史紀錄 if src_level == 'root': # 判斷是否是root表 tgt_pk_value = self.kfk_data[tgt_pk_key]##如果是root表,則獲取目標表表的主鍵的值(和src_pk_value的值相同) for item in tgt_columns: # item取值范圍:order_info、order_progress、order_operation、order_adjudgement tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column例如{"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # 拼接目標列和目標列的值的信息 if str(tgt_columns[item]['target_path'])=='{}': logging.warning(str(item) + " is null,please check") else: update_columns = kfk_text['old'][0]#獲取kfk中變更信息 tgt_path = list(tgt_columns[item]['target_path'].values())[0] (table_insert_data,table_insert_data_for_leaf, insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column, src_table_name,tgt_pk_value) self._update_data(tgt_schema, tgt_table_name, tgt_pk_key, src_table_name,update_columns, insert_data, tgt_path, tgt_pk_value, item,catalog_type,tgt_table_name_for_config,src_table_schema)#更新數據 #將變更歷史寫入 if 'alter_column' in list(tgt_columns[item].keys()): record_history_column = tgt_columns[item]['alter_column'] self._insert_history_data(update_columns,insert_data,tgt_path,record_history_column,self.kfk_es,item,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value) else: logging.warning(str(item) + " alter_column is not available") #子表update思路:通過配置表找到上層關聯表的鍵值,通過鍵值到數據庫中查找到子表屬于的記錄(order_no/order_item_id)的值,從而可以確認子表的寫入的絕對路徑(拼上表名稱或者是拼上鍵對應值),然后按照路徑更新對飲的字段,補全父表 elif src_level == 'leaf': ## 判斷是否是root表 parent_pk_info=tgt_operation['parent_pk_key'] for item in tgt_columns: # item取值范圍:order_info、order_progress、order_operation、order_adjudgement tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column例如{"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # 拼接目標列和目標列的值的信息 if str(tgt_columns[item]['target_path'])=='{}': logging.warning(str(item) + " is null,please check") else: update_columns = kfk_text['old'][0] # 獲取到變更信息 tgt_path = list(tgt_columns[item]['target_path'].keys())[0] (table_insert_data,table_insert_data_for_leaf, insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column,src_table_name,src_pk_value) # 從kafka獲取的需要插入的json串 (parent_tgt_path, tgt_pk_value_new) = self._get_tgt_info_for_leaf(item, tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,self.kfk_data) # 獲取子表上一層主鍵路徑 tgt_path_true=parent_tgt_path+"."+src_table_name##獲取子表寫入的絕對路徑(一直到子表的表名的路徑) tgt_path_new=tgt_path_true+r'.\"'+src_pk_value+r'\"'#獲取子表寫入的絕對路徑(一直到子表的主鍵值) self._update_data(tgt_schema, tgt_table_name, tgt_pk_key, src_table_name,update_columns, insert_data, tgt_path_new, tgt_pk_value_new, item,catalog_type,tgt_table_name_for_config,src_table_schema) if 'alter_column' in list(tgt_columns[item].keys()): record_history_column = tgt_columns[item]['alter_column'] self._insert_history_data(update_columns, insert_data, tgt_path_new,record_history_column, self.kfk_es, item, tgt_schema,tgt_table_name, tgt_pk_key, tgt_pk_value_new) else: logging.warning(str(item) + " alter_column is not available") #父表更新的思路:從配置表獲取所有目標路徑,循環每一個路徑,通過模糊匹配找到所有的目標主鍵值及準確路徑,然后一條條更新,并將涉及的下一級信息補全 elif src_level == 'father': # 判斷該數據是否是kfk入庫信息如果不是就pass for item in tgt_columns: # item取值范圍:order_info、order_progress、order_operation、order_adjudgement tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column例如{"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # 拼接目標列和目標列的值的信息 if str(tgt_columns[item]['target_path']) == '{}': logging.warning(str(item) + " is null,please check") else: update_columns = kfk_text['old'][0] # 獲取到變更信息 tgt_paths = list(tgt_columns[item]['target_path'].values()) (table_insert_data, table_insert_data_for_leaf,insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column, src_table_name, src_pk_value) # 從kafka獲取的需要插入的json串 if 'product' in src_table_name.lower(): catalog_type='PRODUCT' elif 'service' in src_table_name.lower(): catalog_type='SERVICE' else: catalog_type='0' for tgt_path in tgt_paths: tgt_info_for_father = self._get_tgt_info_for_father(tgt_path, src_pk_key, src_pk_value, tgt_pk_key, tgt_schema,tgt_table_name, item,catalog_type) for i in range(len(tgt_info_for_father)): tgt_pk_value_new = tgt_info_for_father[i][0] tgt_path_new = ('.'.join(tgt_info_for_father[i][1].split('.')[:-1]))[1:] self._update_data(tgt_schema, tgt_table_name, tgt_pk_key, src_table_name,update_columns, insert_data, tgt_path_new,tgt_pk_value_new, item,catalog_type,tgt_table_name_for_config,src_table_schema) #刪除操作思路:root表直接刪除所有的記錄,leaf刪除按照路徑刪除目標,再加上判斷如果子節點中沒有數據,將對應的表名的字段刪除 elif self.kfk_type == 'DELETE': if src_level == 'root': tgt_pk_value = self.kfk_data[tgt_pk_key] self._delete_data_for_root(tgt_pk_key,tgt_pk_value,tgt_schema,tgt_table_name) elif src_level == 'leaf': # parent_pk_info = tgt_operation['parent_pk_key'] for item in tgt_columns: # item取值范圍:order_info、order_progress、order_operation、order_adjudgement tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column例如{"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # 拼接目標列和目標列的值的信息 if str(tgt_columns[item]['target_path'])=='{}': logging.warning(str(item) + " is null,please check") else: tgt_path = list(tgt_columns[item]['target_path'].keys())[0] (parent_tgt_path, tgt_pk_value_new) = self._get_tgt_info_for_leaf(item, tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,self.kfk_data)#獲取子表上一層主鍵路徑 tgt_path_true=parent_tgt_path+"."+src_table_name#獲取子表上一層表的路徑 tgt_path_new=tgt_path_true+r'.\"'+src_pk_value+r'\"' self._delete_data_for_leaf(tgt_schema, tgt_table_name, item, tgt_path_new, tgt_pk_key,tgt_pk_value_new,tgt_path_true) except Exception as data: writeErrorLog('_get_rh_from_kafka', 'Error', str(data)) logging.error('_get_rh_from_kafka: ' + str(data)) def _get_tgt_info_for_father(self,tgt_path,src_pk_key,src_pk_value,tgt_pk_key,tgt_schema,tgt_table_name,tgt_column,catalog_type): try: tgt_path_true = tgt_path + "." + src_pk_key if catalog_type=='0': select_sql_for_father="select "+tgt_pk_key+",json_search("+tgt_column+",\'all\',\'"+src_pk_value+"\',null,\'"+tgt_path_true+"\') from "+tgt_schema+"."+tgt_table_name+" where json_extract(json_extract("+tgt_column+",\'"+tgt_path_true+"\'),\'$[0]\')=\'"+src_pk_value+"\';" else: select_sql_for_father = "select " + tgt_pk_key + ",json_search(" + tgt_column + ",\'all\',\'" + src_pk_value + "\',null,\'" + tgt_path_true + "\') from " + tgt_schema + "." + tgt_table_name + " where json_extract(json_extract(" + tgt_column + ",\'" + tgt_path_true + "\'),\'$[0]\')=\'" + src_pk_value + "\' and json_extract(" + tgt_column + ",\'$." +tgt_table_name+".type=\'"+catalog_type+"\';" self.mysql_cur.execute(select_sql_for_father) tgt_info_for_father=self.mysql_cur.fetchall() return tgt_info_for_father except Exception as data: writeErrorLog('_get_tgt_info_for_father', 'Error', str(data)) logging.error('_get_tgt_info_for_father: ' + str(data)) def _delete_data_for_root(self,tgt_pk_key,tgt_pk_value,tgt_schema,tgt_table_name): try: delete_sql="delete from "+tgt_schema+"."+tgt_table_name+" where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';" self.mysql_cur.execute(delete_sql) self.mysql_db.commit() except Exception as data: writeErrorLog('_delete_data_for_root', 'Error', str(data)) logging.error('_delete_data_for_root: ' + str(data)) def _delete_data_for_leaf(self,tgt_schema,tgt_table_name,tgt_column,tgt_path,tgt_pk_key,tgt_pk_value,tgt_path_true): try: delete_sql="update "+tgt_schema+"."+tgt_table_name+" set "+tgt_column+"=json_remove("+tgt_column+",\'"+tgt_path+"\') where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';" self.mysql_cur.execute(delete_sql) self.mysql_db.commit() select_sql="select json_extract("+tgt_column+",\'"+tgt_path_true+"\') from "+tgt_schema+"."+tgt_table_name+" where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';" self.mysql_cur.execute(select_sql) tgt_column_value=self.mysql_cur.fetchall()[0][0] if tgt_column_value==r'{}': table_delete_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_remove(" + tgt_column + ",\'" + tgt_path_true + "\') where " + tgt_pk_key + "=\'" + str(tgt_pk_value) + "\';" self.mysql_cur.execute(table_delete_sql) self.mysql_db.commit() except Exception as data: writeErrorLog('_delete_data_for_leaf', 'Error', str(data)) logging.error('_delete_data_for_leaf: ' + str(data)) def _insert_history_data(self,update_columns,insert_data,tgt_path,record_history_column,data_time,tgt_column,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value): try: update_columns_key=list(update_columns.keys()) for item in record_history_column: if item in update_columns_key: tgt_path_for_column = tgt_path + '.alter_data.' + item tgt_path_for_alter = tgt_path + '.alter_data' select_sql_for_alter_column_path = 'select json_extract(' + tgt_column + ',\'' + tgt_path_for_column + '\')' + ' from ' + tgt_schema + '.' + tgt_table_name + ' where ' + tgt_pk_key + '=\'' + str(tgt_pk_value) + '\';' select_sql_for_alter_path = 'select json_extract(' + tgt_column + ',\'' + tgt_path_for_alter + '\')' + ' from ' + tgt_schema + '.' + tgt_table_name + ' where ' + tgt_pk_key + '=\'' + str(tgt_pk_value) + '\';' self.mysql_cur.execute(select_sql_for_alter_column_path) tgt_path_vlaue_for_column = self.mysql_cur.fetchall() self.mysql_cur.execute(select_sql_for_alter_path) tgt_path_vlaue_for_alter = self.mysql_cur.fetchall() old_data = update_columns[item] new_data = eval(insert_data)[item] if tgt_path_vlaue_for_alter[0][0]==None: history_data = '{\"' + item + '\":[{\"old_data\":\"' + str(old_data) + '\",\"new_data\":\"' + str(new_data) + '\",\"time\":\"' + data_time + '\"}]}' insert_sql = "update "+tgt_schema + "." + tgt_table_name + " set " + tgt_column +"=json_insert("+tgt_column+",\'"+tgt_path_for_alter+"\',cast(\'"+history_data+"\' as json)) where " + tgt_pk_key + "= '" + str(tgt_pk_value) + "';" else: if tgt_path_vlaue_for_column[0][0]==None: history_data='[{\"old_data\":\"'+str(old_data)+'\",\"new_data\":\"'+str(new_data)+'\",\"time\":\"'+data_time+'\"}]' insert_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_insert(" + tgt_column + ",\'" + tgt_path_for_column + "\',cast(\'" + history_data + "\' as json)) where " + tgt_pk_key + "= '" + str(tgt_pk_value) + "';" else: history_data='{\"old_data\":\"'+str(old_data)+'\",\"new_data\":\"'+str(new_data)+'\",\"time\":\"'+data_time+'\"}' insert_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_array_append(" + tgt_column + ",\'" + tgt_path_for_column + "\',cast(\'" + history_data + "\' as json)) where " + tgt_pk_key + "= '" + str(tgt_pk_value) + "';" self.mysql_cur.execute(insert_sql) self.mysql_db.commit() except Exception as data: writeErrorLog('_insert_history_data', 'Error', str(data)) logging.error('_insert_history_data: ' + str(data)) #將kfk中的數據,進行轉換,轉換成不同的寫入方式需要的json格式 def _get_data_from_kfk(self, text, tgt_column,src_table_name,src_table_pk_value): try: tgt_column_json = tgt_column #傳入的目標表的列名稱 tgt_column_key = '' for key in tgt_column_json:#循環tgt_column中的key值 json_column_key = '{' for item in tgt_column_json[key]: json_column_key += '"' + item + '":"' + text['data'][0][item].replace('"',r'\\"') + '",' tgt_column_item = json_column_key[:-1] tgt_column_key += tgt_column_item + '},' if 'type' in text['data'][0]: catalog_type=text['data'][0]['type'] else: catalog_type='0' table_insert_data = '{\"' + src_table_name + '\":' + tgt_column_key[:-1] + '}'#拼接成如下帶有表名和主鍵值格式{"order":{"order_no":"100"}} insert_data = tgt_column_key[:-1]#拼接成如下不帶表名和不帶主鍵值的格式{"order_no":"100"} table_insert_data_for_leaf = '{\"' + src_table_pk_value + '\":'+insert_data+'}'#拼接成如下帶有主鍵值格式的{"100":{"order_no":"100"}} print(insert_data) return (table_insert_data, table_insert_data_for_leaf,insert_data,catalog_type)#返回數據 except Exception as data: writeErrorLog('_get_data_from_kfk', 'Error', str(data)) logging.error('_get_data_from_kfk: ' + str(data)) def _insert_data_from_kfk_for_root(self,tgt_schema,tgt_table_name,tgt_table_pk,tgt_table_value,tgt_column,table_insert_data,tgt_path): try: #先判斷主鍵是否存在,如果存在則插入其他數據,如果不存在,則先插入主鍵信息 select_tb_count = 'select count(*) from ' + tgt_schema +"."+tgt_table_name + ' where ' + tgt_table_pk + '=\'' + tgt_table_value + '\';' #判斷列中是否存在數據 select_tb_column_count ='select case when coalesce(' + tgt_column + ', \'\') = \'\' then 1 else 0 end from ' + tgt_schema +"."+tgt_table_name + ' where ' + tgt_table_pk + '=\'' + tgt_table_value + '\';' self.mysql_cur.execute(select_tb_count) tb_count = self.mysql_cur.fetchall() self.mysql_cur.execute(select_tb_column_count) tb_column_count = self.mysql_cur.fetchall() #判斷是否存在數據,如果不存在,則先插入主鍵(order_no/order_item_id)再將數據寫入到列中 if tb_count[0][0] == 0: insert_pk_sql = "insert into " + tgt_schema+"."+tgt_table_name + "(" + tgt_table_pk + ") values ('" + tgt_table_value + "')" self.mysql_cur.execute(insert_pk_sql) self.mysql_db.commit() update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "= cast('" + table_insert_data +"' as json) where " + tgt_table_pk + "= '"+ tgt_table_value + "';" else: #如果主鍵存在,列為空,則需要 直接 寫入帶有table_name格式的json if tb_column_count[0][0]==1:#當目標字段為空 update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "= cast('" + table_insert_data + "' as json) where " + tgt_table_pk + "= '" + tgt_table_value + "';" else: #如果主鍵存在,列不為空,則需要使用json_insert方法寫入帶有table_name格式的json update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_insert(" + tgt_column + ",\'" + tgt_path + "\',cast(\'" + table_insert_data + "\' as json)) where " + tgt_table_pk + "=\'" + tgt_table_value + "\';" self.mysql_cur.execute(update_sql) self.mysql_db.commit() except Exception as data: writeErrorLog('_insert_data_from_kfk_for_root', 'Error', str(data)) logging.error('_insert_data_from_kfk_for_root: ' + str(data)) def _get_tgt_pk_value_for_leaf(self,tgt_table_pk,tgt_schema,tgt_table_name,tgt_column,tgt_path,parent_pk_value): try: select_tgt_pk_sql = "select " + tgt_table_pk + " from " + tgt_schema + "." + tgt_table_name + " where json_extract(" + tgt_column + ",\'" + tgt_path + "\')=\'" + parent_pk_value + "\';" self.mysql_cur.execute(select_tgt_pk_sql) tgt_pk_value = self.mysql_cur.fetchall()[0][0] return tgt_pk_value except Exception as data: writeErrorLog('_get_tgt_pk_value_for_leaf', 'Error', str(data)) logging.error('_get_tgt_pk_value_for_leaf: ' + str(data)) #獲取子節點表的需要寫入的目標表的主鍵的值和上一層的寫入真實絕對路徑 def _get_tgt_info_for_leaf(self,tgt_column,tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,kafka_data): try: if_tgt_path='.'.join(tgt_path.split('.')[:-1]) i=0 json_search_sql='' where_sql='' if if_tgt_path=='$': for parent_pk_key in list(parent_pk_info.keys()): parent_pk_value = kafka_data[parent_pk_info[parent_pk_key]] json_search_sql += ",json_search(" + tgt_column + ", 'one','" + str(parent_pk_value) + "', null, '" + tgt_path + "." + parent_pk_key + "') as tgt_path" + str(i) where_sql += " tgt_path" + str(i) + " is not null and" i = i + 1 else: for parent_pk_key in list(parent_pk_info.keys()): parent_pk_value = kafka_data[parent_pk_info[parent_pk_key]] json_search_sql += ",json_search(" + tgt_column + ", 'one','" + str(parent_pk_value) + "', null, '" + tgt_path + ".*." + parent_pk_key + "') as tgt_path" + str(i) where_sql += " tgt_path" + str(i) + " is not null and" i = i + 1 select_sql = "select "+tgt_pk_key+",tgt_path0 from (select "+tgt_pk_key+json_search_sql+" from " + tgt_schema + "." + tgt_table_name +") t where "+where_sql[:-4]+";" self.mysql_cur.execute(select_sql) rows=self.mysql_cur.fetchall()[0] tgt_path_new = ('.'.join(rows[1].split('.')[:-1]))[1:] tgt_pk_value_new=rows[0] return (tgt_path_new,tgt_pk_value_new) except Exception as data: writeErrorLog('_get_tgt_info_for_leaf', 'Error', str(data)) logging.error('_get_tgt_info_for_leaf: ' + str(data)) def _insert_data_from_kfk_for_leaf(self,tgt_schema,tgt_table_name,tgt_table_pk,tgt_table_value,tgt_column,table_insert_data_for_leaf,tgt_path,src_pk_value,insert_data): try: select_tb_column_key = 'select case when coalesce(json_extract(' + tgt_column + ',\'' + tgt_path + '\') , \'\') = \'\' then 1 else 0 end from ' + tgt_schema + "." + tgt_table_name + ' where ' + tgt_table_pk + '=\'' + str(tgt_table_value) + '\';' self.mysql_cur.execute(select_tb_column_key) column_key_data = self.mysql_cur.fetchall() if column_key_data[0][0] == 1:# 當主鍵存在并且目標字段不為空路徑不存在, tgt_path_new = tgt_path tgt_insert_data=table_insert_data_for_leaf else: tgt_path_new=tgt_path+r'.\"'+str(src_pk_value)+r'\"' tgt_insert_data=insert_data update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_insert(" + tgt_column + ",\'" + tgt_path_new + "\',cast(\'" + tgt_insert_data + "\' as json)) where " + tgt_table_pk + "=\'" + str(tgt_table_value) + "\';" self.mysql_cur.execute(update_sql) self.mysql_db.commit() except Exception as data: writeErrorLog('_insert_data_from_kfk_for_leaf', 'Error', str(data)) logging.error('_insert_data_from_kfk_for_leaf: ' + str(data)) #將父表數據寫入(父表數據從生產庫中獲取,按照對應的配置路徑寫入數據庫中) def _insert_father_data(self,src_table_schema,scr_table_name,insert_data,src_path,root_pk_value,tgt_column,catalog_type,tgt_table_name_for_config): try: src_config_data=self._get_config(scr_table_name,tgt_table_name_for_config,src_table_schema)#獲取初始表的配置信息(此處獲取是為了遞歸時候傳入下一層的表名,獲取對應的配置信息) src_foreign_info=src_config_data['target_column'][tgt_column]['source_foreign_info']#從數據庫配置表中獲取source_foreign_info的信息,也就是外鍵的信息,包括外鍵,外鍵的表,以及外鍵表中的主鍵名稱 if len(json.dumps(src_foreign_info))==2:#當沒有外鍵的時候,配置表只存在‘{}'長度為2,就不需要向下遞歸執行,對應的source_foreign_info=[],長度為2 logging.warning(scr_table_name+" :Recursive over") else: for src_pk_key in src_foreign_info:#獲取當前表與下層父表的關聯鍵(例如customer表的配置獲取到org_id,"source_foreign_info": {"org_id": {"customer.organization": "org_id"}}) foreign_table_name_tmp=list(src_foreign_info[src_pk_key].keys())[0] #獲取外鍵對應的表名foreign_table_name(organization),(每次傳入的key對應一個外鍵表,只存在一個列,order_info,所以取第一個元素即可) foreign_table_schema=foreign_table_name_tmp.split('.')[0] foreign_table_name_tmp=foreign_table_name_tmp.split('.')[1] if '#' in foreign_table_name_tmp: foreign_table_name = foreign_table_name_tmp.replace('#', catalog_type).lower() else: foreign_table_name = foreign_table_name_tmp foreign_table_pk_key=list(src_foreign_info[src_pk_key].values())[0]#獲取外鍵對應的表的關聯鍵foreign_table_key,即org_id foreign_datas = self._get_config(foreign_table_name,tgt_table_name_for_config,foreign_table_schema)#獲取外鍵表的配置信息,以便下面獲取配置表的信息 foreign_column = foreign_datas['target_column'][tgt_column]#獲取要插入的目標表列是order_info/order_progress)(organization寫入目標表的列的配置信息) foreign_schema = foreign_datas['src_table_schema']#獲取表的schema(organization的原始src schema) foreign_table_pk_value = eval(str(insert_data))[src_pk_key] # 獲取外鍵對應的value(即organization在kfk數據中對應的值) #獲取外鍵對應表的配置信息(寫入數據庫需要用) tgt_schema=foreign_datas['tgt_table_schema'] tgt_table_name=foreign_datas['tgt_table_name'] tgt_pk_key=foreign_datas['target_primary_key'] tgt_pk_value=root_pk_value#目標表主鍵的值 #獲取數據,并在其中獲取后,寫入數據庫(此處部分參數是為了給insert服務) for foreign_path in foreign_column['target_path']: src_tgt_path=list(foreign_path.keys())[0] foreign_tgt_path = list(foreign_path.values())[0] if re.sub('.\"\S*?\"',r'*',src_path) ==src_tgt_path and re.sub('.\"\S*?\"',r'*',src_path)+'.'+src_pk_key==foreign_tgt_path: next_src_path=src_path+'.'+src_pk_key next_insert_data=self._get_data_from_db(foreign_column,foreign_table_name,foreign_schema,foreign_table_pk_key,foreign_table_pk_value,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value,src_path,tgt_column,next_src_path) self._insert_father_data(foreign_table_schema,foreign_table_name,next_insert_data,next_src_path,root_pk_value,tgt_column,catalog_type,tgt_table_name_for_config) else: logging.warning(foreign_table_name + ' :have no next level') except Exception as data: writeErrorLog('_insert_father_data', 'Error', str(data)) logging.error('_insert_father_data: ' + str(data)) #從數據庫中獲取數據,并將獲取到的數據,直接插入數據庫中,返回遞歸需要使用的數據 def _get_data_from_db(self,src_tgt_column,src_table_name,src_table_schema,src_table_pk_key,src_table_pk_value,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value,src_path,tgt_column,tgt_path): try: result_data = '{' src_column=src_tgt_column['source_column']#讀取需要獲取的字段 if len(src_column)==0: logging(str(src_column)+ ' length equal 0 error ') else: for item in src_column:#拼接好sql語句,獲取數據 select_sql1 = 'concat(\'' select_sql1 += u'"' + item + '":"\',coalesce(' + item + ',\'\'),\'",' select_sql1 = select_sql1[:-1] + '\')' select_sql = "select " + select_sql1 + " from " + src_table_schema + "." + src_table_name + " where " + src_table_pk_key + "=\'" + src_table_pk_value + "\';" #使用execute方法執行SQL語句 self.mes_mysql_cur.execute(select_sql) # 使用 fetchone() 方法獲取一條數據 data = self.mes_mysql_cur.fetchall() if len(data) == 0: result_data += '' else: result_data+=data[0][0]+',' if result_data != '{': tgt_value=result_data[:-1] + '}' else: tgt_value = result_data+'\"'+src_table_pk_key+'\":\"'+src_table_pk_value+'\"}' self._insert_data_from_db(tgt_schema, tgt_table_name, tgt_pk_key, tgt_pk_value, tgt_column,tgt_path, tgt_value) # 將獲取到的父表數據寫入數據庫 return tgt_value#返回寫入的數據,和真是的寫入路徑(因為路徑在配置表中層數多的是用*代替的,不是真正的絕對路徑,這里返回的是絕對路徑) except Exception as data: writeErrorLog('_get_data_from_db', 'Error', str(data)) logging.error('_get_data_from_db: ' + str(data)) #將父表寫入數據庫 def _insert_data_from_db(self,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value,tgt_column,tgt_path,tgt_value): try: insert_sql="update "+ tgt_schema+"."+tgt_table_name +" set "+ tgt_column+"=json_replace("+tgt_column+",\'"+tgt_path+"\',cast(\'"+tgt_value+"\' as json)) where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';" # self.mysql_cur.execute(insert_sql.encode("utf-8").decode("latin1")) self.mysql_cur.execute(insert_sql) self.mysql_db.commit() except Exception as data: writeErrorLog('_insert_data_from_db', 'Error', str(data)) logging.error('_insert_data_from_db: ' + str(data)) # 當變更數據為外鍵時,補全外鍵對應的信息 def _update_data(self, tgt_schema, tgt_table_name, tgt_pk_key, src_table_name, update_columns,insert_data, src_path, root_pk_value, tgt_column,catalog_type,tgt_table_name_for_config,src_table_schema): try: # 判斷是否涉及外鍵信息,判斷變更的字段是否在外鍵信息里,將在的組成新的外鍵json,在調用_get_data_from_db進行更新數據 insert_data = json.loads(insert_data) for update_column in update_columns:# if update_column in list(insert_data.keys()): update_column_data = '\"' + insert_data[update_column] + '\"' tgt_path = src_path + '.' + update_column self._insert_data_from_db(tgt_schema, tgt_table_name, tgt_pk_key, root_pk_value,tgt_column, tgt_path, update_column_data) self._insert_father_data(src_table_schema,src_table_name, insert_data, src_path, root_pk_value, tgt_column,catalog_type,tgt_table_name_for_config) except Exception as data: writeErrorLog('_update_data', 'Error', str(data)) logging.error('_update_data: ' + str(data)) #退出消費消息 def _exit_consumer(self): self._release_db() sys.exit() def exit_program(signum, frame): logging.info("Received Signal: %s at frame: %s" % (signum, frame)) p.stop_flag = 1 def main(): #實例化對象 p = RH_Consumer() signal.signal(signal.SIGTERM, exit_program) # while True: p._do() main()
以上這篇python3實現從kafka獲取數據,并解析為json格式,寫入到mysql中就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。