By: Jeff Levy
Multithreading an SSIS Package – Introduction
Ever have a single task in SSIS that can be applied to dozens of tables in your database? Such a task could be as simple as extracting a table out to a flat file and moving it to a network drive. In order to accomplish this task, you could create dozens of packages – one for each table – with their own Control Flow / Data Flow tasks. This can be highly inefficient if the column structure changes for a select group of tables or if the scope of tables begins to expand rapidly.
I’m going to show you a solution that consists of two SSIS packages (a template and controller package) that can dynamically accomplish the extraction of dozens of tables out to a flat file using multithreading in C#. The template package, ‘ExtractTable.dtsx’, is responsible for performing the extraction of data (via bcp queryout) to a flat file. The controller package, ‘ThreadCreator.dtsx’, is responsible for replicating the template package several times and executing them in parallel during runtime. These replicated packages will only be replicated in memory and will not be physically created as a result of the process. The diagram below depicts the process at a high level.
As far as deployment goes, I will show you how to execute these packages from your local directory and also from the Integration Service Catalog. When executing from the Integration Services Catalog, I will follow a similar approach that my colleague and fellow Key2 Consulting employee Matt Wollner articulated in his blog post, How to Execute an SSIS Package within an SSIS Script Task.
Step 1: Setup the ThreadQueue table
The first step in the process is to create and populate the ThreadQueue configuration table. This table is essentially the list of all database tables that you wish to be extracted out to flat file. For this article, I will be sourcing the list of tables from the AdventureWorks Database.
Please see the column structure in the picture below. It is important to note that the ‘ExtractStatus’ and ‘ThreadInstanceNumber’ columns are to be initially left NULL. These columns will be populated and updated as the packages are running.
Step 2: Create the Template Package (‘ExtractTable.dtsx’)
2.1 – Create the package
After the ThreadQueue configuration table has been created and populated, create a new SSIS project in Visual Studio called ‘Multithreading’. Within the project, make sure to create a new package called ‘ExtractTable.dtsx’. This package will be used as the ‘Template’ package and will be replicated n times at execution time (‘n’ represents the number of threads you choose your solution to have). Each instance of the ‘ExtractTable.dtsx’ package will be responsible for extracting a different table from the ThreadQueue configuration table. This package consists of 4 Control Flow Tasks and is illustrated in the picture below.
2.2 – Create the Package Parameters
Create a new Package Parameter in the ‘ExtractTable.dtsx’ package called ‘ThreadInstanceNumber’. For now, give this parameter a default value of ‘1’. This value will be set by the ‘ThreadCreator’ Package at runtime. I will discuss how this package works later on in this article.
2.3 – Create the Package Variables
Below is a list of Package Variables that this package will need.
• The ‘BaseTableName’, ‘FullTableName’, and ‘TableID’ variables will be populated by Task 1. Task 1 grabs the values for these variables from the ThreadQueue configuration table. If the values are set to ‘-1’, then there are no more database tables left to be extracted in the ThreadQueue configuration table.
• The ‘EvalExpressionNull’ variable is used exclusively for the For Loop Container. This variable is set by an SSIS Expression. Please note the syntax in the picture above.
• The ‘bcpArguments’ and ‘FileName’ variables are used as input for Task 2: BCP Extract Process. This task is the heart of the package, as it actually extracts the database table out to a flat file.
2.4 – Create Task 1: Get First Extract Table
This task is responsible for getting the first available database table in your ThreadQueue configuration table. It will select the first item in the ThreadQueue configuration table with a NULL value in the ExtractStatus column. After a database table is chosen, this task will mark this entry as ‘Pending’ in the ExtractStatus column.
Here is the SQL code below. Make sure to use the locks/transaction blocks to prevent any race conditions from occurring.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
BEGIN TRAN DECLARE @TableID INT SELECT TOP 1 @TableID = TableID FROM ThreadConfig.dbo.ThreadQueue WITH (UPDLOCK) WHERE ExtractStatus IS NULL UPDATE ThreadConfig.dbo.ThreadQueue SET ExtractStatus = 'Pending', ThreadInstanceNumber = @ThreadInstanceNumber WHERE TableID = @TableID COMMIT TRAN IF (@TableID IS NOT NULL) SELECT TableID, FullTableName, BaseTableName FROM ThreadConfig.dbo.ThreadQueue WHERE TableID = @TableID ELSE SELECT -1 as TableID ,-1 as FullTableName ,-1 as BaseTableName |
The parameters are set up as shown below.
Parameter Mapping / Input:
Result Set / Output:
2.5 – Create the For Loop Container
This container exists in order to iterate through all items in the ThreadQueue configuration table. You will need to configure the Loop as indicated in the screenshot below.
2.6 – Create Task 2: BCP Extract Process
This task performs the primary function of the package, which is to extract data from the list of tables in the ThreadQueue configuration table out to flat file. This task is utilizing an Execute Process task from the SSIS toolbox and running a BCP queryout command under the covers. If you are unfamiliar with the BCP process, please check out the following link: https://docs.microsoft.com/en-us/sql/tools/bcp-utility.
You will need to perform two steps for this task.
1. Select the ‘Executable’ value – You will need to search for the bcp.exe file on your machine and map the location accordingly as per the picture below.
2. Select the ‘Arguments’ values – In order to set this value, you will need to utilize an SSIS Expression and map it to the ‘bcpArguments’ variable. Please refer to the screenshot below for guidance.
2.7 – Create Task 3: Update Record to Success
Upon successful extraction of the flat file to a specified directory, the ThreadQueue configuration table will be updated to ‘Success’. The ThreadQueue configuration table will also be updated with the current ThreadInstanceNumber that is processing the current table.
The code:
1 2 3 4 5 |
BEGIN TRAN UPDATE ThreadConfig.dbo.ThreadQueue SET ExtractStatus = 'Success' WHERE TableID = @TableID COMMIT TRAN |
Note: The transaction blocks exist to ensure proper updates and to avoid thread collisions / race conditions.
Parameter Mapping / Input:
2.8 – Create Task 4: Get Next Extract Table
This task is essentially the same as Task 1, except that it occurs later in the process. The task’s purpose is to fetch the next table to be extracted out to flat file. The SQL code, parameter inputs, and result set configurations are the exact same as Task 1.
Step 3: Setup the ThreadCreator Package
The ThreadCreator package has 1 Script Task that is written completely in C#. Before we review the C# code, please add the following parameters to this package.
3.1 – Create Package Parameters
• ISC_ExecutionServerName – this is the server where the packages will run from the Integration Services Catalog
• ISC_PackageFolderName – this is the name of the folder in which your Integration Services Project will be deployed to
• ISC_PackageProjectName – this is the name of the project that your packages belong to in Visual Studio
• IsExecutedFromSSISCatalog – this will tell the controller package to run from the Integration Services Catalog (if true) or to execute from a local directory (if false)
• NumThreads – the number of template packages to be running in parallel
• PackageDirectory – the location of the template package on disk. This parameter will not be used if the execution is from the integration services catalog.
• PackageName – this is the name of the Template Package to be replicated and executed in parallel.
Be sure to add these parameters as Read Only in the SSIS Script Task editor.
3.2 – Add Assembly references
Open up the Script task editor and add the reference assemblies listed in the picture below. You should be able to find most of them in the ‘C:\Windows\assembly\GAC_MSIL’ directory of your local machine. Make sure to choose the proper version. In my case, they were located in the 13.0.0.0 folder of each assembly.
Add C# Code
Within the script task editor, add the following classes in the Solution Explorer. Note: you should not copy over the name of my namespace, SSIS should generate one unique for you.
• ScriptMain.cs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
#region Namespaces using System; using System.Data; using Microsoft.SqlServer.Dts.Runtime; using System.Windows.Forms; using System.Collections.Generic; using System.Threading; using System.Linq; using System.IO; using Microsoft.SqlServer.Management.IntegrationServices; using System.Data.SqlClient; #endregion namespace ST_b53c9804e3af4c9b9ad265d06980eac2 { [Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute] public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase { private DWPackage pkg; public void Main() { try { String serverName = Dts.Variables["$Package::ISC_ExecutionServerName"].Value.ToString(); String folderName = Dts.Variables["$Package::ISC_PackageFolderName"].Value.ToString(); String projectName = Dts.Variables["$Package::ISC_PackageProjectName"].Value.ToString(); String packageName = Dts.Variables["$Package::PackageName"].Value.ToString(); int numThreads = Convert.ToInt32(Dts.Variables["$Package::NumThreads"].Value); //This condition determines whether the execution will be done from the Integration Services Catalog or from a local directory if (Convert.ToBoolean(Dts.Variables["$Package::IsExecutedFromSSISCatalog"].Value.ToString())) { String ssisConnectionString = String.Format("Data Source={0}; Initial Catalog = msdb; Integrated Security = SSPI;Connection Timeout=0;MultipleActiveResultSets=true", serverName); pkg = new ISCPackage(ssisConnectionString, folderName, projectName, packageName); } else { String packagePath = Path.Combine(Dts.Variables["$Package::PackageDirectory"].Value.ToString(), packageName); pkg = new LocalPackage(packagePath); } //Collection of Threads var Threads = new List<Thread>(); for (int i = 1; i <= numThreads; i++) { //Creates a new thread that is the definition of the ExecutePackage Method var packageThread = new Thread(() => pkg.ExecutePackage(i)); //Executes the Thread packageThread.Start(); //Sleeps the Main Thread for smoother processing and avoiding Race Conditions. Thread.Sleep(1000); //Adds the Package Thread to the 'Threads' Collection. The 'while' loop //In the step below will reference this Collection. Threads.Add(packageThread); } // This loop will check to see if any of the created threads are currently running or // sleeping. If one of these conditions is met, then the Main thread will sleep for // another second before ending the program. while (Threads.Any(t => t.ThreadState == System.Threading.ThreadState.Running) || Threads.Any(t => t.ThreadState == System.Threading.ThreadState.WaitSleepJoin)) { Thread.Sleep(1000); }; Dts.TaskResult = (int)ScriptResults.Success; } catch (Exception e) { System.Windows.Forms.MessageBox.Show("The Package Failed from the Main thread. Exception: " + e.ToString()); Dts.TaskResult = (int)ScriptResults.Failure; } } #region ScriptResults declaration enum ScriptResults { Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success, Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure }; #endregion } } |
• DWPackage.cs
1 2 3 4 5 6 7 8 9 10 11 12 13 |
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace ST_b53c9804e3af4c9b9ad265d06980eac2 { public interface DWPackage { void ExecutePackage(int threadInstanceNumber); } } |
• LocalPackage.cs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Microsoft.SqlServer.Dts.Runtime; namespace ST_b53c9804e3af4c9b9ad265d06980eac2 { /*This Object will be used for execution if the user selects the 'IsExecutedFromSSISCatalog' * package parameter as false */ class LocalPackage : DWPackage { private String packageLocation; /*This is the constructor method that initializes the object. */ public LocalPackage(String packageLocation) { this.packageLocation = packageLocation; } public void ExecutePackage(int threadInstanceNumber) { try { Application app = new Application(); Package pkg = app.LoadPackage(this.packageLocation, null); //Sets the thread instance number of the replicated template package pkg.Parameters["ThreadInstanceNumber"].Value = threadInstanceNumber; //Actually Executes the package DTSExecResult results = pkg.Execute(); if (results == Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure) { String errormessage = ""; foreach (Microsoft.SqlServer.Dts.Runtime.DtsError errorList in pkg.Errors) { String error = errorList.Description.ToString(); errormessage = errormessage + error; } System.Windows.Forms.MessageBox.Show(errormessage); } } catch (Exception e) { System.Windows.Forms.MessageBox.Show("The Execution from your directory failed on Thread " + threadInstanceNumber + ". With error: " + e.ToString()); } } } |
• ICSPackage.cs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Microsoft.SqlServer.Management.IntegrationServices; using System.Data.SqlClient; using System.Collections.ObjectModel; using System.Threading; namespace ST_b53c9804e3af4c9b9ad265d06980eac2 { /*This Object will be used for execution if the user selects the 'IsExecutedFromSSISCatalog' * package parameter as true */ class ISCPackage : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase, DWPackage { private String ssisConnectionString; private String folderName; private String projectName; private String packageName; /*This is the constructor method that initializes the object. */ public ISCPackage(String ssisConnectionString, String folderName, String projectName, String packageName) { this.ssisConnectionString = ssisConnectionString; this.folderName = folderName; this.projectName = projectName; this.packageName = packageName; } public void ExecutePackage(int threadInstanceNumber) { try { IntegrationServices ssisServer = new IntegrationServices(new SqlConnection(this.ssisConnectionString)); Catalog cat = ssisServer.Catalogs["SSISDB"]; CatalogFolder catFolder = cat.Folders[this.folderName]; PackageInfo pkg = catFolder.Projects[this.projectName].Packages[this.packageName]; //Creates the collection of execution parameters Collection<PackageInfo.ExecutionValueParameterSet> executionParameterSet = new Collection<PackageInfo.ExecutionValueParameterSet>(); //Sets the execution of the threads as asynchronous executionParameterSet.Add(new PackageInfo.ExecutionValueParameterSet { ObjectType = 50, ParameterName = "SYNCHRONIZED", ParameterValue = 0 }); //Sets the logging level to the default level executionParameterSet.Add(new PackageInfo.ExecutionValueParameterSet { ObjectType = 50, ParameterName = "LOGGING_LEVEL", ParameterValue = 1 }); //Sets the ThreadInstanceNumber in the template package executionParameterSet.Add(new PackageInfo.ExecutionValueParameterSet { ObjectType = 30 , ParameterName = "ThreadInstanceNumber" , ParameterValue = threadInstanceNumber }); //Actually executes the package on the Integration Services catalog and captures the executionIdentifier long executionIdentifier = pkg.Execute(false, null, executionParameterSet); ExecutionOperation executionOperation = ssisServer.Catalogs["SSISDB"].Executions[executionIdentifier]; //This loop is needed due to the asynchronous nature of the multithreading. while (!executionOperation.Completed) { executionOperation.Refresh(); Thread.Sleep(5000); } } catch(Exception e) { System.Windows.Forms.MessageBox.Show("The Execution from the SSIS Catalog Failed on Thread " + threadInstanceNumber + ". With error: " + e.ToString()); } } } } |
Step 4: Execute the Package
After all the C# code has been brought into the Script Task of the ‘ThreadCreator.dtsx’ package, it is time to execute! Refer to one or both of the sections below for the type of execution that you wish your solution to perform.
4.1 – Execute from a local directory
Open up the package parameters inside the ‘ThreadCreator.dtsx’ package and make sure the following parameters are set:
• ‘IsExecutedFromSSISCatalog’ is set to False. This will insure that the package will run from a local directory.
• NumThreads is set to the number of concurrent extract processes. This is the number of template packages running at the same time. In my case I chose 3, however you may choose as many threads as you think your system can handle. This may be a trial and error process.
• ‘PackageDirectory’ is set to the path of your ‘ExtractTable.dtsx’ package on your local directory.
• ‘PackageName’ is set to the name of the template package. In this case, it is ‘ExtractTable.dtsx’.
Note: The other ‘ISC_XXXX’ parameters are not used for execution from a local directory. You will use them if you choose to execute from the Integration Services Catalog.
After confirming those values, double check your ThreadQueue configuration table. It should have null values for the last two columns.
Now you can finally hit the magic execute button and watch the amazing power of multithreading!
Notice the updated values of ‘ExtractStatus’ and ‘ThreadInstanceNumber’ column of the ThreadQueue configuration table. The values of the ‘ExtractStatus’ column should indicate ‘Success’ and the value of the ‘ThreadInstanceNumber’ column should indicate the thread instance number responsible for extracting the corresponding database table.
4.2 – Execute from the Integration Services Catalog
The first step in executing an SSIS project on the Integration Services Catalog is to deploy your solution to a SQL Server instance. I chose to deploy my project by right clicking on the project name in Visual Studio and selecting ‘Deploy’.
After deploying your project, make sure to note the Package Folder Name and Package Project Name. You will need to provide these values as execution parameters at runtime.
Before execution, again double check your ThreadQueue configuration table. It should have null values for the last two columns.
To execute from the Integration Services Catalog, right click the ‘ThreadCreator.dtsx’ package under the Integration Services Catalog on your SQL Server instance and select ‘Execute’.
A window should popup with a list of parameters. Make sure the following parameters are all filled out per the picture below:
• ‘IsExecutedFromSSISCatalog’ is set to ‘True’. This will insure that the package will run from the Integration Services Catalog. You must select this value as ‘True’ or the process will not run from the Catalog.
• ‘NumThreads’ is set to the number of concurrent extract processes. This is essentially the number of template packages all running at the same time. In my case I chose 3, however you may choose as many threads as you think your system can handle. This may be a trial and error process.
• ‘ISC_PackageServerName’ is set to the server name of the SQL Server instance running the packages from the Integration Services Catalog.
• ‘ISC_PackageFolderName’ is set to the name of the folder in which you deployed your solution. You can obtain this from the screenshot provided earlier within step 4.2.
• ‘ISC_PackageProjectName’ is set to the name of the project that contains your SSIS packages. You can obtain this from the screenshot provided earlier within step 4.2.
• ‘PackageName’ is set to the name of the template package. In this case, it is ‘ExtractTable.dtsx’.
Note: The ‘PackageDirectory’ parameter is not used for execution from the Integration Services Catalog. It is used only when executing from a local directory.
After selecting the ‘OK’ button, the execution process will start. Open up the ‘All Executions’ report from the Integration Services Catalog and you will notice 4 concurrent executions of packages. 1 instance of the ‘ThreadCreator.dtsx’ package and 3 instances (or number of threads) of the ‘ExtractTable.dtsx’ package.
Each instance of the ‘ExtractTable.dtsx’ is extracting a different table (via bcp queryout) from the ThreadQueue configuration table.
After completion, check the ThreadQueue configuration table and notice which ThreadInstanceNumber processed each table.
And finally, checking our output directory…the files have been created! Success!
Thank you for reading. Be sure to subscribe!
Thank you for reading. As always, we encourage you to share any thoughts, questions, or ideas you may have regarding this blog post.
Also, if you haven’t subscribed to our email mailing list yet, be sure to do so for Key2 Consulting content and updates!
References
The development approach discussed in this blog follows a similar structure to that found in the MSSQL Tips.com article here. The content within this blog expands on this concept by introducing a new business task (Multithreaded Export), an enhanced Threading Process (ThreadQueue Table), and a different method for package execution (SQL Server Integration Services Catalog).
This is awesome… THANK YOU!!
Help? Running VS 15.8.5
Data Tools 15.1.61808.07020
Running local with IsExecutedFromSSISCatalog = False
Error:
There is no project to reference.
There were errors during the task validation.
Location:
ScriptMain.cs
Thread.Sleep(1000);
Hej Jeff, This is really great. I have tried it and it works for IsExecutedFromSSISCatalog = False. It means I can make it run from Visual Studio for when my package is saved in local folder. When I deployed the package to SSIS catalog and try to execute with IsExecutedFromSSISCatalog = True than package runs but do nothing. It does not throw any error and completed successfully. I have checked all the way but its not working. Can you please help? What could be the reason that I am unable to execute it from SSIS catalog. I have a package connection for template package.
Manish,
Thanks so much for reading. Have you gone back through the steps under 4.2? It might be that something got missed. Please make sure of the following:
In the ThreadQueue table the columns ExtractStatus and ThreadInstanceNumber should be NULL
The package ThreadCreator should be executed from the catalog with a right-click and Execute
ISC_PackageServerName should be set to your SQL instance
ISC_PackageFolderName is the folder where you deployed the solution on ISC_PackageServerName
ISC_PackageProjectName is the name of your project in ISC_PackageFolderName
If all of the following are correct are you able to detect any activity? Opening the All Executions report will give you some indication of this.
Upon completion does the ThreadQueue table get updated at all? I would expect the ExtractStatus and ThreadInstanceNumber columns to be populated.
Hopefully, it’s something simple and you find it by going through the items above. Let us know how it works out!
– Ken Adams, Senior Consultant
Thanks Ken for your reply. I really appreciate it. I have all parameters exactly as mentioned above. I just have a different template package ( simple package to test it easily). Now I am able to run it from Visual Studio with IsExecutedFromSSISCatalog = True and it works. I guess there is something wrong with multi threading. Now when I run it within Visual studio and IsExecutedFromSSISCatalog = True, I got ThreadInstanceNumber inserted as 1 even though I have set Package parameters (NumThreads = 3) in the Control package. When I deploy packages to SSIS catalog and try to run, its just run but do nothing. I guess it is unable to go into multi threading. will you have any pointers or expierence some can help me to troubleshoot further.
Hi Ken,
I have found a solution. I was just deploying the package as package deployment from Visual Studio. I was having SSMS opened with SSIS server connection. Somehow changes were not getting refreshed in SSMS. You will definitely ask me what I did 🙂 It was really silly SQL Server. I did project deployment of whole solution from visual studio and disconnect from SSMS and connect to SSMS again. This was the magic and the code worked like a charm by using SQL Agent and from SSIS catalog. Thanks much for your support. Jeff rocks.
Regards,
Manish
This solution is really awesome, and is what I am looking for. Thank you so much Ken.