您好,登錄后才能下訂單哦!
這篇文章主要介紹“如何利用MySqlBulkLoader實現批量插入數據”,在日常操作中,相信很多人在如何利用MySqlBulkLoader實現批量插入數據問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何利用MySqlBulkLoader實現批量插入數據”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
MySqlBulkLoader主要的實現方式:將需要插入的數據轉成DataTable,DataTable轉成一個CSV文件,將CSV文件使用批量導入的形式導入到數據庫里面去。
注意:
1).數據庫連接地址需要添加配置AllowLoadLocalInfile=true,允許本地文件導入;
Data Source = 數據庫地址; Port = 端口; Initial Catalog = 數據庫名; User Id = 用戶名; Password = 密碼;AllowLoadLocalInfile=true;
2).插入的時候會返回插入行數,但是檢查所有的數據都正確,也沒有報異常,卻返回了插入數量為0,可以檢查表是否有唯一索引,插入的數據是否違反了唯一索引。
/// <summary> /// 將List轉化為DataTable /// </summary> /// <returns></returns> public DataTable ListToDataTable<T>(List<T> data) { #region 創建一個DataTable,以實體名稱作為DataTable名稱 var tableName = typeof(T).Name; tableName = tableName.ToSnakeCase(); /*實體名稱與表名進行轉化,主要根據各項目的規定進行轉化,不一定就是我這些寫的這種轉換方式*/ DataTable dt = new DataTable { TableName = tableName }; #endregion #region 拿取列名,以實體的屬性名作為列名 var properties = typeof(T).GetProperties(); foreach (var item in properties) { var curFileName = item.Name; curFileName = curFileName.ToSnakeCase();/*列名與字段名進行轉化,主要根據各項目的規定進行轉化,不一定就是我這些寫的這種轉換方式*/ dt.Columns.Add(curFileName); } #endregion #region 列賦值 foreach (var item in data) { DataRow dr = dt.NewRow(); var columns = dt.Columns; var curPropertyList = item.GetType().GetProperties(); foreach (var p in curPropertyList) { var name = p.Name; name = name.ToSnakeCase();/*列名與字段名進行轉化,主要根據各項目的規定進行轉化,不一定就是我這些寫的這種轉換方式*/ var curValue = p.GetValue(item); int i = columns.IndexOf(name); dr[i] = curValue; } dt.Rows.Add(dr); } #endregion return dt; }
/// <summary> /// csv擴展 /// </summary> public static class CSVEx { /// <summary> ///將DataTable轉換為標準的CSV文件 /// </summary> /// <param name="table">數據表</param> /// <param name="tmpPath">文件地址</param> /// <returns>返回標準的CSV</returns> public static void ToCsv(this DataTable table, string tmpPath) { //以半角逗號(即,)作分隔符,列為空也要表達其存在。 //列內容如存在半角逗號(即,)則用半角引號(即"")將該字段值包含起來。 //列內容如存在半角引號(即")則應替換成半角雙引號("")轉義,并用半角引號(即"")將該字段值包含起來。 StringBuilder sb = new StringBuilder(); DataColumn colum; foreach (DataRow row in table.Rows) { for (int i = 0; i < table.Columns.Count; i++) { Type _datatype = typeof(DateTime); colum = table.Columns[i]; if (i != 0) sb.Append("\t"); //if (colum.DataType == typeof(string) && row[colum].ToString().Contains(",")) //{ // sb.Append("\"" + row[colum].ToString().Replace("\"", "\"\"") + "\""); //} if (colum.DataType == _datatype) { sb.Append(((DateTime)row[colum]).ToString("yyyy/MM/dd HH:mm:ss")); } else sb.Append(row[colum].ToString()); } sb.Append("\r\n"); } StreamWriter sw = new StreamWriter(tmpPath, false, UTF8Encoding.UTF8); sw.Write(sb.ToString()); sw.Close(); } }
/// <summary> /// 批量導入mysql幫助類 /// </summary> public static class MySqlHelper { /// <summary> /// MySqlBulkLoader批量導入 /// </summary> /// <param name="_mySqlConnection">數據庫連接地址</param> /// <param name="table"></param> /// <param name="csvName"></param> /// <returns></returns> public static int BulkLoad(MySqlConnection _mySqlConnection, DataTable table, string csvName) { var columns = table.Columns.Cast<DataColumn>().Select(colum => colum.ColumnName).ToList(); MySqlBulkLoader bulk = new MySqlBulkLoader(_mySqlConnection) { FieldTerminator = "\t", FieldQuotationCharacter = '"', EscapeCharacter = '"', LineTerminator = "\r\n", FileName = csvName, NumberOfLinesToSkip = 0, TableName = table.TableName, }; bulk.Columns.AddRange(columns); return bulk.Load(); } }
/// <summary> /// 使用MySqlBulkLoader批量插入數據 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="data"></param> /// <returns></returns> /// <exception cref="Exception"></exception> public int BulkLoaderData<T>(List<T> data) { if (data.Count <= 0) return 0; var connectString = "數據庫連接地址"; using (MySqlConnection connection = new MySqlConnection(connectString)) { MySqlTransaction sqlTransaction = null; try { if (connection.State == ConnectionState.Closed) { connection.Open(); } sqlTransaction = connection.BeginTransaction(); var dt = ListToDataTable<T>(data); //將List轉成dataTable string tmpPath = Path.GetTempFileName(); dt.ToCsv(tmpPath); //將DataTable轉成CSV文件 var insertCount = MySqlHelper.BulkLoad(connection, dt, tmpPath); //使用MySqlBulkLoader插入數據 sqlTransaction.Commit(); try { if (File.Exists(tmpPath)) File.Delete(tmpPath); } catch (Exception) { //刪除文件失敗 } return insertCount; //返回執行成功的條數 } catch (Exception e) { if (sqlTransaction != null) { sqlTransaction.Rollback(); } //執行異常 throw e; } } }
namespace WebApplication1.BrantchInsert { /// <summary> /// 批量插入 /// </summary> public class BulkLoader { /// <summary> /// 測試批量插入入口 /// </summary> /// <returns></returns> public int BrantchDataTest() { #region 模擬數據 var data = new List<CrmCouponTestDto>() { new CrmCouponTestDto { Id=1, CouponCode="test001", CouponId = 1, MemberId=100, IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"), UsageTime=Convert.ToDateTime("3000-12-31 00:00:00"), UsageShopId=0, UsageBillNo="", EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"), EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"), Status=0 }, new CrmCouponTestDto { Id=2, CouponCode="test002", CouponId = 1, MemberId=101, IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"), UsageTime=Convert.ToDateTime("2022-06-27 14:30:00"), UsageShopId=2, UsageBillNo="CS202206271430001", EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"), EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"), Status=1 }, new CrmCouponTestDto { Id=3, CouponCode="test003", CouponId = 1, MemberId=102, IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"), UsageTime=Convert.ToDateTime("3000-12-31 00:00:00"), UsageShopId=0, UsageBillNo="", EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"), EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"), Status=0 }, new CrmCouponTestDto { Id=4, CouponCode="test004", CouponId = 1, MemberId=103, IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"), UsageTime=Convert.ToDateTime("3000-12-31 00:00:00"), UsageShopId=0, UsageBillNo="", EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"), EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"), Status=0 } }; #endregion var result = BulkLoaderData<CrmCouponTestDto>(data); return result; } /// <summary> /// 使用MySqlBulkLoader批量插入數據 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="data"></param> /// <returns></returns> /// <exception cref="Exception"></exception> public int BulkLoaderData<T>(List<T> data) { if (data.Count <= 0) return 0; var connectString = "數據庫連接地址"; using (MySqlConnection connection = new MySqlConnection(connectString)) { MySqlTransaction sqlTransaction = null; try { if (connection.State == ConnectionState.Closed) { connection.Open(); } sqlTransaction = connection.BeginTransaction(); var dt = ListToDataTable<T>(data); //將List轉成dataTable string tmpPath = Path.GetTempFileName(); dt.ToCsv(tmpPath); //將DataTable轉成CSV文件 var insertCount = MySqlHelper.BulkLoad(connection, dt, tmpPath); //使用MySqlBulkLoader插入數據 sqlTransaction.Commit(); try { if (File.Exists(tmpPath)) File.Delete(tmpPath); } catch (Exception) { //刪除文件失敗 } return insertCount; //返回執行成功的條數 } catch (Exception e) { if (sqlTransaction != null) { sqlTransaction.Rollback(); } //執行異常 throw e; } } } /// <summary> /// 將List轉化為DataTable核心方法 /// </summary> /// <returns></returns> public DataTable ListToDataTable<T>(List<T> data) { #region 創建一個DataTable,以實體名稱作為DataTable名稱 var tableName = typeof(T).Name; tableName = tableName.ToSnakeCase(); /*實體名稱與表名進行轉化,主要根據各項目的規定進行轉化,不一定就是我這些寫的這種轉換方式*/ DataTable dt = new DataTable { TableName = tableName }; #endregion #region 拿取列名,以實體的屬性名作為列名 var properties = typeof(T).GetProperties(); foreach (var item in properties) { var curFileName = item.Name; curFileName = curFileName.ToSnakeCase();/*列名與字段名進行轉化,主要根據各項目的規定進行轉化,不一定就是我這些寫的這種轉換方式*/ dt.Columns.Add(curFileName); } #endregion #region 列賦值 foreach (var item in data) { DataRow dr = dt.NewRow(); var columns = dt.Columns; var curPropertyList = item.GetType().GetProperties(); foreach (var p in curPropertyList) { var name = p.Name; name = name.ToSnakeCase();/*列名與字段名進行轉化,主要根據各項目的規定進行轉化,不一定就是我這些寫的這種轉換方式*/ var curValue = p.GetValue(item); int i = columns.IndexOf(name); dr[i] = curValue; } dt.Rows.Add(dr); } #endregion return dt; } } /// <summary> /// 批量導入mysql幫助類 /// </summary> public static class MySqlHelper { /// <summary> /// MySqlBulkLoader批量導入 /// </summary> /// <param name="_mySqlConnection">數據庫連接地址</param> /// <param name="table"></param> /// <param name="csvName"></param> /// <returns></returns> public static int BulkLoad(MySqlConnection _mySqlConnection, DataTable table, string csvName) { var columns = table.Columns.Cast<DataColumn>().Select(colum => colum.ColumnName).ToList(); MySqlBulkLoader bulk = new MySqlBulkLoader(_mySqlConnection) { FieldTerminator = "\t", FieldQuotationCharacter = '"', EscapeCharacter = '"', LineTerminator = "\r\n", FileName = csvName, NumberOfLinesToSkip = 0, TableName = table.TableName, }; bulk.Columns.AddRange(columns); return bulk.Load(); } } /// <summary> /// csv擴展 /// </summary> public static class CSVEx { /// <summary> ///將DataTable轉換為標準的CSV文件 /// </summary> /// <param name="table">數據表</param> /// <param name="tmpPath">文件地址</param> /// <returns>返回標準的CSV</returns> public static void ToCsv(this DataTable table, string tmpPath) { //以半角逗號(即,)作分隔符,列為空也要表達其存在。 //列內容如存在半角逗號(即,)則用半角引號(即"")將該字段值包含起來。 //列內容如存在半角引號(即")則應替換成半角雙引號("")轉義,并用半角引號(即"")將該字段值包含起來。 StringBuilder sb = new StringBuilder(); DataColumn colum; foreach (DataRow row in table.Rows) { for (int i = 0; i < table.Columns.Count; i++) { Type _datatype = typeof(DateTime); colum = table.Columns[i]; if (i != 0) sb.Append("\t"); //if (colum.DataType == typeof(string) && row[colum].ToString().Contains(",")) //{ // sb.Append("\"" + row[colum].ToString().Replace("\"", "\"\"") + "\""); //} if (colum.DataType == _datatype) { sb.Append(((DateTime)row[colum]).ToString("yyyy/MM/dd HH:mm:ss")); } else sb.Append(row[colum].ToString()); } sb.Append("\r\n"); } StreamWriter sw = new StreamWriter(tmpPath, false, UTF8Encoding.UTF8); sw.Write(sb.ToString()); sw.Close(); } } /// <summary> /// 字符串轉化 /// </summary> public static class StringExtensions { /// <summary> /// 轉換為 main_keys_id 這種形式的字符串方式 /// </summary> public static string ToSnakeCase(this string input) { if (string.IsNullOrEmpty(input)) { return input; } var startUnderscores = Regex.Match(input, @"^_+"); return startUnderscores + Regex.Replace(input, @"([a-z0-9])([A-Z])", "$1_$2").ToLower(); } } /// <summary> /// 實體 /// </summary> public class CrmCouponTestDto { /// <summary> /// ID /// </summary> public long Id { get; set; } /// <summary> /// 卡券號 /// </summary> public string CouponCode { get; set; } /// <summary> /// 卡券ID /// </summary> public int CouponId { get; set; } /// <summary> /// 會員ID /// </summary> public int MemberId { get; set; } /// <summary> /// 發放時間 /// </summary> public DateTime IssueTime { get; set; } /// <summary> /// 使用時間 /// </summary> public DateTime UsageTime { get; set; } /// <summary> /// 使用店鋪ID /// </summary> public int UsageShopId { get; set; } /// <summary> /// 使用單號 /// </summary> public string UsageBillNo { get; set; } /// <summary> /// 有效開始時間 /// </summary> public DateTime EffectiveStart { get; set; } /// <summary> /// 有效結束時間 /// </summary> public DateTime EffectiveEnd { get; set; } /// <summary> /// 狀態 /// CouponStatus 卡券狀態: /// -1:未領用 /// 0:未使用 /// 1:已使用 /// 2:已過期 ///3:已作廢 ///4:轉贈中 /// </summary> public Int16 Status { get; set; } } }
到此,關于“如何利用MySqlBulkLoader實現批量插入數據”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。