MySqlBulkLoader一种高效的数据保存方案

最近在处理大批量数据入库的问题,认识了MySqlBulkLoader,这是一种可以高效地实现MySql数据保存入库的方案,分享给各位爱技术懂技术的小伙伴。

MySqlBulkLoader可以高效地将数据从CSV(或TSV)文件,Stream(流)加载到MySql数据表中。经不严格测试,在每次1万条数据的情况下,使用MySqlBulkLoader相比批量Insert语句的方式,效率提升1倍以上。.

01 引入依赖包
根据项目的.net版本,引入相应版本的MySqlConnector依赖包

02 示例代码:从CSV文件加载数据

using var connection = new MySqlConnection("...;AllowLoadLocalInfile=True");
await connection.OpenAsync();
var bulkLoader = new MySqlBulkLoader(connection)
{
FileName = @"C:\Path\To\file.csv",
TableName = "destination",
CharacterSet = "UTF8",
NumberOfLinesToSkip = 1,
FieldTerminator = ",",
FieldQuotationCharacter = '"',
FieldQuotationOptional = true,
Local = true,
}
var rowCount = await bulkLoader.LoadAsync();

以上示例代码是从官网中复制出来的,大概做个解释。

FileName:csv文件本地路径

TableName:目标表表名

注意:数据库连接需要设置:AllowLoadLocalInfile=true

03 示例代码:从Stream加载数据

实际开发中,通过csv文件加载数据的场景较少,更多的是从程序中提取数据,直接入库保存,这可以通过Stream方式实现。
/// <summary>
/// 
/// </summary>
/// <returns></returns>
private async Task<int> BulkInnerSave(System.Data.Common.DbConnection dbConnection, List<AssessmentDetailSummary> assessmentDetail)
{
int rowCount = 0;
if (assessmentDetail.Any())
{
using var memoryStream = new MemoryStream();
foreach (var item in assessmentDetail)
{
var bytes =
Encoding.UTF8.GetBytes($"{item.NumericalOrder},{item.ProductID}\r\n");
memoryStream.Write(bytes, 0, bytes.Length);
}

var columns = new string[] { "NumericalOrder", "ProductID"};
rowCount = await InnerBulkSave(dbConnection, memoryStream, "assessment_detail", columns);
}

return rowCount;
}

//
private async Task<int> InnerBulkSave(System.Data.Common.DbConnection dbConnection, MemoryStream memoryStream, string tableName, string[] columns)
{
memoryStream.Position = 0;

var connection = dbConnection as MySqlConnection;
if (connection.State != ConnectionState.Open)
{
await connection.OpenAsync();
}

MySqlBulkLoader bl = new MySqlBulkLoader(connection)
{
SourceStream = memoryStream,
TableName = tableName,
NumberOfLinesToSkip = 0,
FieldTerminator = ",",
FieldQuotationCharacter = '"',
FieldQuotationOptional = true,
Local = true,
};
bl.Columns.AddRange(columns);
return await bl.LoadAsync();
}

与加载csv文件的差别在于,FileName属性改成了SourceStream,而SourceStream的内容格式和csv文件是一样的。

另外要说明一下属性Columns,这是一个可选项,相当于Insert语句中指定的列名,目的是指定csv文件或Stream中的数据对应的列。

04 在EF中使用Bulk操作

由于本次项目中并没有使用EF,所以采用了直接调用MySqlBulkLoader这种原始的方式,如果你的项目中使用了EF框架,可以引用包:EFCore.BulkExtensions或者EFCore.BulkExtensions.MySql,调用DbContext的扩展方法,更简便地实现批量保存。

context.BulkInsert(entities);    
context.BulkSaveChanges();

各位热爱技术的小伙伴,你是否希望在下一个项目中,尝试一下这种方式。