使用平面文件名作为表名将同一文件夹中的多个平面文件导入 SQL Server

我一直在寻找一种通过单击按钮将同一文件夹中的 100 个文本文件上传到 SQL Server 的有效方法。我每个月都执行此操作,手动映射表。在准确提取之前手动选择每个平面文件 (.txt) 并将其映射到正确的加载表需要一些时间。

我搜索了许多不同的方式,包括 SSIS。这些太手动了,需要太多时间来执行,尤其是维护。

我有一个想法,例如以下同一文件夹中的 5 个平面文件:

1.Extract_ABC.txt

  1. Extract_YMCA.txt
  2. 提取_WSS.txt
  3. 提取_RMC.txt
  4. 提取_HBO.txt

使用各自的文件名以模式 [upload] 上传到 SQL 服务器。最终输出将如下所示。

  1. [服务器名].[上传].Extract_ABC
  2. [服务器名称].[上传].Extract_YMCA
  3. [服务器名].[上传].Extract_WSS
  4. [服务器名].[上传].Extract_RMC
  5. [服务器名称].[上传].Extract_HBO

然后可以在查找表中手动为这些表分配位置并插入到正确的表中。除了在服务器中维护位置表之外,这将消除几乎所有的手动工作。

请注意,这些表在插入时也必须创建,因为文件在提取时具有唯一的名称。另外,要注意文件是管道 ( | ) 分隔的文本文件,而不是 CSV

有谁知道可以执行此操作的方法?它必须避免使用 Visual Studio 手动包等。而是将每个平面文件批量插入到具有各自文件名的 SQL Server 中。

一个有效的查询/流程可以节省我几天的时间。如果可以在计划时间在服务器端运行以防止从实际活动的 PC (即 SQL Server 代理等)上传,那么也是一个 HUGE plus。

stack overflow Import multiple flat files in the same folder to SQL Server using flat file name as table name
原文答案

答案:

作者头像

正如我所提到的,我实际上会使用类似 Powershell 的东西来做到这一点。执行此操作的脚本实际上非常简单。

#Below is a Linux Path, as I am running SQL Server (and Powershell) on Linux
$ImportFolder = "/home/mssql/ImportSample" #The Folder your files are in
$Instance = "." #The Instance you are inserting the data into. "." means local host
$Database = "ImportSample" #The database you are inserting the data into

$Files = Get-ChildItem -Path $ImportFolder -Filter "*.txt" #Get all the txt files

#Loop the files
foreach($File in $Files){
    #Create the statement
    $SQL = "BULK INSERT dbo.[$($File.BaseName.Replace("]","]]"))] FROM '$($File.FullName.Replace("'","''"))' WITH (FORMAT = 'CSV', FIELDTERMINATOR = ',', ROWTERMINATOR = '0x0a', FIRSTROW = 2);"

    Write-Debug $SQL

    #Execute the statement
    Invoke-SqlCmd -ServerInstance $Instance -Database $Database -Query $SQL
}

这假定 Windows 身份验证,但是,如果您使用 SQL 身份验证,您可以将 -Username-Password 开关传递给 Invoke-SqlCmd


Note: This answer was written before the necessity to 创建 the tables was added. This does not meet the new requirement, however, I am leaving the answer here as it may still prove useful to others with a question similar to the original version of this question, without said requirement.

作者头像

在这个答案中,我将尝试提供一些信息并提供一个工作示例,将多个具有不同元数据的平面文件导入到单独的 SQL Server 表中。

SSIS 限制

由于 SSIS requires fixed metadata of the data source and destination ,我建议使用 C# 代码而不是自动创建 SSIS 包。如果需要 SSIS,您应该使用脚本任务来完成该任务。

在类似的情况下,我通常使用 C# 或其他脚本语言,例如 python。

为什么不应该自动生成 SSIS 包?


将 100 多个平面文件导入应在运行时创建的 100 多个表(没有固定结构)需要创建 100 多个 SSIS 包。即使使用 SSIS automation tools 之一自动执行此过程,也很难管理和调试大量 SSIS 包。

统一的 SQL Server 目标表?

如果目标表结构大多具有相似的结构,最好使用架构映射的方式将所有平面文件导入到统一的 SQL Server 目标中,您可以参考以下答案:

解决方案概述

All code provided in this solution can be improved, but it was written as a prototype

读取平面文件

我创建了一个类 TextImport ,它尝试遍历给定目录中的所有平面文件,读取平面文件元数据并将每个文件数据存储在 System.Data.DataTable 对象中。 (Note that Microsoft.VisualBasic assembly should be added as a reference to use the TextFieldParser class)

TextImport txtimp = new TextImport(filename, true, 0); // TextImport(string filename, bool containsHeader, int RowsToSkip)
txtimp.BuildDataTableStructure(); //Read flat file metadata
DataTable dt = txtimp.FillDataTable(); //Convert the flat file to a DataTable object

TextImport 类尝试根据平面文件中的前 8 行 (number of columns, delimiter, text qualifiers, encodings) 检测文件元数据 (arbitrary number)

创建 SQL 目标表

然后将此 System.Data.DataTable 传递给另一个名为 SQLExport 的类,该类基于 CREATE TABLE 元数据生成并执行 DataTable 语句。

SQLExport.CreateDestinationTable(DataTable dt);

将数据插入 SQL

最后提供了两种插入方法:

  1. 生成 INSERT INTO () ... VALUES (),(),()... 语句并执行它以将数据插入 SQL 目标
  2. 使用 System.Data.SqlClient.SqlBulkCopy 类使用 BULK INSERT 方法插入数据。

主要代码应如下所示:

static void Main(string[] args)
{
    //You should set your database connection string
    string connectionstring = @"Data Source=.SQLINSTANCE;Initial Catalog=tempdb;integrated security=SSPI;";
    //This is the schema of the destination table
    string Schema = "upload";
    //You should set the text files directory
    string directory = @"E:TextFiles";
    using (SQLExport sqlExp = new SQLExport(connectionstring, Schema))
    {
        //if you don't want to traverse subfolders use System.IO.SearchOption.TopDirectoryOnly
        foreach (string filename in System.IO.Directory.GetFiles(directory,
            "*.txt",System.IO.SearchOption.AllDirectories)){

            using(TextImport txtimp = new TextImport(filename, true, 0))
            {
                txtimp.BuildDataTableStructure();
                DataTable dt = txtimp.FillDataTable();
                dt.TableName = System.IO.Path.GetFileName(filename);
                sqlExp.CreateDestinationTable(dt);
                //Insert using BULK INSERT
                sqlExp.InsertUsingSQLBulk(dt);

                //Creates and Execute an INSERT INTO statment 
                //sqlExp.InsertIntoDb(dt);
            }
        }            
    }

}

### 测试

我在 the following link (the first result of my Google search) 上存储的两个平面文件上测试了这个解决方案,如下图所示,表创建成功,数据插入。

Note that all columns are created a NVARCHAR(255) , you can change this within the SQLExport.cs file

enter image description here

enter image description here

您可以在以下 GitHub 存储库 (.NET Framework 4.6.1 C# console application) 中查看完整代码:

  • GitHub - TextToSQL

    If you decided to use a Script Task to run this code, you should only copy the classes from the GitHub project into your Script Task project Remember to add Microsoft.VisualBasic as a reference.

如何从 SQL Server 代理作业运行 C# 控制台应用程序?


您可以简单地编辑 C# 控制台应用程序以将连接字符串和文本文件目录作为参数传递,然后使用 cmdExec 作业步骤运行它:

甚至通过使用 SSIS 包:

替代方案 - 使用任务计划程序

另一种方法是使用 Windows 任务计划程序定期运行 C# 控制台应用程序。

作者头像

注意: 我在底部添加了要在控制台应用程序 Core3.1 或更高版本中运行的异步代码。

我有点把这当作一个挑战,看看我是否能做到。这是我在c#中想出的...

基本概念:

1.循环文件

  1. 使用管道(|)作为分隔符将每个文件读入数据表

  2. 将标题行读入字符串列表

  3. 根据表头创建表,所有数据类型为 varchar(255)

  4. 使用批量复制将数据表加载到新表中。

     public static string cstr = "Enter your connection string to destination here";
    
     //if you are going to use script task inside SSIS then this is what goes into whatever the Main is in SSIS.
     static void Main(string[] args)
     {
         var dir = @"C:whereever";
    
         foreach(var file in new DirectoryInfo(dir).GetFiles().AsEnumerable().Where(f=>f.Extension.Contains("txt")))
         {
             DataTable dt = ConvertCSVtoDataTable(file.FullName, '|');
             //getColumnNames into an object
             List<string> colNames = new List<string>();
             foreach (DataColumn dc in dt.Columns)
                 colNames.Add(dc.ColumnName);
    
             //create table
             string tableName = Path.GetFileNameWithoutExtension(file.Name);
             createTable(colNames, tableName);
             //load table
             loadDtIntoDb(dt, colNames, tableName);
         }
     }
    
     public static void loadDtIntoDb(DataTable dt, List<string> cols, string tableName)
     {
         using (var conn = new SqlConnection(cstr))
         {
             using (var bcp = new SqlBulkCopy(conn))
             {
                 bcp.DestinationTableName = tableName;
                 foreach (var col in cols)
                     bcp.ColumnMappings.Add(col, col);
                 conn.Open();
                 bcp.WriteToServer(dt);
             }
         }
     }
    
     public static void createTable(List<string> dcs,string tableName)
     {
         StringBuilder sb = new StringBuilder();
             sb.AppendLine(string.Format("Create table {0}(", tableName));
         foreach (var dc in dcs)
             sb.AppendLine(string.Format("{0} varchar(255),", dc));
         //Clean sql by removing final ',' and adding a close paren 
         string sql = sb.ToString().TrimEnd(',') + ")";
         using (var conn = new SqlConnection(cstr))
         {
             using (var cmd = new SqlCommand(sql, conn))
             {
                 conn.Open();
                 cmd.ExecuteNonQuery();
             }
         }
     }
    
     public static DataTable ConvertCSVtoDataTable(string strFilePath, char delim)
     {
         DataTable dt = new DataTable();
         using (StreamReader sr = new StreamReader(strFilePath))
         {
             string[] headers = sr.ReadLine().Split(delim);
             foreach (string header in headers)
             {
                 dt.Columns.Add(header);
             }
             while (!sr.EndOfStream)
             {
                 string[] rows = sr.ReadLine().Split(delim);
                 DataRow dr = dt.NewRow();
                 for (int i = 0; i < headers.Length; i++)
                 {
                     dr[i] = rows[i];
                 }
                 dt.Rows.Add(dr);
             }
         }
         return dt;
     }
    

以下是我的 using 语句列表:

using System;
using System.Collections.Generic;
using System.Data;
using System.IO;
using System.Linq;
using System.Text;
using Microsoft.Data.SqlClient;

我看到你正在寻找效率。以下是如何异步执行此操作:

public static string cstr = "Enter your connection string to destination here";

 //This cannot be done in SSIS
 static async Task Main(string[] args)
 {
     var dir = @"C:whereever";

     var files = new DirectoryInfo(dir).GetFiles().AsEnumerable().Where(f=>f.Extension.Contains("txt")))
     await Task.WhenAll(files.Select(async f=> await processFile(f)));
  }

 public statis async Task processFile(File file)
 {
     DataTable dt = await ConvertCSVtoDataTable(file.FullName, '|');
     //getColumnNames into an object
     List<string> colNames = new List<string>();
     foreach (DataColumn dc in dt.Columns)
         colNames.Add(dc.ColumnName);

     //create table
     string tableName = Path.GetFileNameWithoutExtension(file.Name);
     await createTable(colNames, tableName);
     //load table
     await loadDtIntoDb(dt, colNames, tableName);
 }

public static async Task loadDtIntoDb(DataTable dt, List<string> cols, string tableName)
 {
     using (var conn = new SqlConnection(cstr))
     {
         using (var bcp = new SqlBulkCopy(conn))
         {
             bcp.DestinationTableName = tableName;
             foreach (var col in cols)
                 bcp.ColumnMappings.Add(col, col);
             conn.Open();
             await bcp.WriteToServerAsync(dt);
         }
     }
 }

 public static async Task createTable(List<string> dcs,string tableName)
 {
     StringBuilder sb = new StringBuilder();
         sb.AppendLine(string.Format("Create table {0}(", tableName));
     foreach (var dc in dcs)
         sb.AppendLine(string.Format("{0} varchar(255),", dc));
     //Clean sql by removing final ',' and adding a close paren 
     string sql = sb.ToString().TrimEnd(',') + ")";
     using (var conn = new SqlConnection(cstr))
     {
         using (var cmd = new SqlCommand(sql, conn))
         {
             conn.Open();
             await cmd.ExecuteNonQueryAsync();
         }
     }

 public static async Task<DataTable> ConvertCSVtoDataTable(string strFilePath, char delim)
 {
     DataTable dt = new DataTable();
     using (StreamReader sr = new StreamReader(strFilePath))
     {
         string[] headers = await sr.ReadLineAsync().Split(delim);
         foreach (string header in headers)
         {
             dt.Columns.Add(header);
         }
         while (!sr.EndOfStream)
         {
             string[] rows = await sr.ReadLineAsync().Split(delim);
             DataRow dr = dt.NewRow();
             for (int i = 0; i < headers.Length; i++)
             {
                 dr[i] = rows[i];
             }
             dt.Rows.Add(dr);
         }
     }
     return dt;
 }
作者头像

我已将此作为新答案发布,而不是编辑我现有的答案,因为我的原始答案回答了原始问题。

话虽如此,这个答案只是扩展了现有答案。不幸的是,没有详细说明表中数据的数据类型应该是什么,所以我有 **assumed** 所有数据都将是 nvarchar(200) 。您可以在动态语句中更改它,但是请注意,此解决方案假定 **all** 数据具有相同的数据类型。如果这不是真的,并且您需要强类型数据,那么是时候提出一个新问题( don't be a chameleon )了。


无论如何,正如我所说,这扩展了我现有的答案。您会注意到另外一段代码,它动态地处理表的创建。

我创建了一个临时表(将限制在 Invoke-Sqlcmd 的范围内创建它),以 INSERT 文件中的第一行,我假设具有标题。对于字段终止符,我使用了一个字符 assume 不会出现在您的数据中,即管道 ( | )。如果可以,请使用不同的字符。然后我 only INSERT 第一行(因此 FIRSTROWLASTROW 但具有值 1 )。此 INSERT 是文件中的第一行,作为分隔值。

接下来,我创建一个动态语句并将该值拆分为行,然后重新聚合,但引用字段名称,并定义数据类型 ( nvarchar(200) )。您将需要在此处使用尊重序数位置的字符串拆分器,NOT STRING_SPLIT 。我使用 DelimitedSplitN4K_LEAD (位于我的 Utility 数据库中),它的定义可以在 here 中找到。

最后执行该动态语句,以正确的顺序创建具有列名的表,所有数据类型均为 nvarchar(200)Invoke-Sqlcmd 然后执行该语句。

然后我们回到原来的解决方案, BULK INSERT 数据。

$ImportFolder = "/home/mssql/ImportSample" #The Folder your files are in
$Instance = "." #The Instance you are inserting the data into. "." means local host
$Database = "ImportSample" #The database you are inserting the data into

$Files = Get-ChildItem -Path $ImportFolder -Filter "*.txt" #Get all the txt files

#Loop the files
foreach($File in $Files){
    #Create the creation statement
    Write-Verbose "Defining table creation script for $($File.Name)."
    $CreateSQL = "CREATE TABLE #Rows (DelimitedData nvarchar(4000));`n" +
                 "BULK INSERT #Rows FROM '$($File.FullName.Replace("'","''"))' WITH(FORMAT = 'CSV', FIELDTERMINATOR = '|', ROWTERMINATOR = '0x0a', FIRSTROW = 1, LASTROW = 1);`n" + 
                 "DECLARE @SQL nvarchar(MAX);`n" +
                 "SELECT @SQL = N'CREATE TABLE dbo.[$($File.BaseName.Replace("]","]]"))] (' + STRING_AGG(QUOTENAME(DSL.Item) + N' nvarchar(200)',N',') WITHIN GROUP (ORDER BY DSL.ItemNumber) + N');' FROM #Rows R CROSS APPLY Utility.fn.DelimitedSplitN4K_LEAD(R.DelimitedData,N',') DSL;`n" + 
                 "EXEC sys.sp_executesql @SQL;"

    #Execute the statement
    Write-Verbose "Table creation script for $($File.Name) generated:"
    Write-Verbose $CreateSQL
    Invoke-SqlCmd -ServerInstance $Instance -Database $Database -Query $CreateSQL -Username $User -Password $Password

    #Create the Insert statement
    Write-Verbose "Defining BULK INSERT script for $($File.Name)."
    $InsertSQL = "BULK INSERT dbo.[$($File.BaseName.Replace("]","]]"))] FROM '$($File.FullName.Replace("'","''"))' WITH (FORMAT = 'CSV', FIELDTERMINATOR = ',', ROWTERMINATOR = '0x0a', FIRSTROW = 2);"

    Write-Verbose "BULK INSERT script for $($File.Name) generated:"
    Write-Verbose $InsertSQL

    #Execute the statement
    Invoke-SqlCmd -ServerInstance $Instance -Database $Database -Query $InsertSQL -Username $User -Password $Password
}

演示此工作的几个屏幕截图(无法用小提琴演示): enter image description here

enter image description here

作者头像

这是一个使用动态脚本的存储过程:

  1. BULK INSERT 将文件放入表 [My_bulk_insert_data] 中,假设文件的第一行包含列名:
    2.然后使用考虑列名的动态脚本批量插入

create proc usp_bulkinsert_file(@fullfilename as nvarchar(max))

as
begin

declare @filename as nvarchar(max)
set @filename = substring(@fullfilename,1 + LEN(@fullfilename) - CHARINDEX('', REVERSE(@fullfilename)) + 1, len(@fullfilename))

declare @script as nvarchar(max)
if exists(select 1 from sysobjects where name = 'My_bulk_insert_data') drop table My_bulk_insert_data
set @script = 'if exists(select 1 from sysobjects where name =''' + @filename + ''') drop table [' + @filename + ']'
exec(@script)

create table My_bulk_insert_data(data nvarchar(max))

set @script='
BULK INSERT dbo.My_bulk_insert_data
FROM ''' + @fullfilename + '''
WITH
(
FIELDTERMINATOR = ''''
)
'
print @script

exec(@script)

declare @data as nvarchar(max)
select top 1 @data = data from My_bulk_insert_data

set @script = 'Create Table [' + @filename + ']('
select @script = @script + '[' + value + ']' + ' nvarchar(max) ,' from string_split(@data, '|') My_bulk_insert_data

set @script = SUBSTRING(@script, 1, len(@script) - 1) + ')'

print @script

exec(@script)

set @script='BULK INSERT [' + @filename + ']' + ' from ''' + @fullfilename + '''
with
(
FIELDTERMINATOR = ''|''
)
'
print @script
exec(@script)

end



 `Results Screenshot` 

 [![enter image description here](https://i.stack.imgur.com/V2BCG.png) ](https://i.stack.imgur.com/V2BCG.png)