SqlBulkTool is a command line utility that is used to quickly create a mirror of a database. It reads its configuration from an XML file containing source and destination command strings and a list of all the tables to mirror. It then handles the work of copying the database in an automated and highly parallelized way.
The parallelism can use the partition capabilities of SQL Server 2005: to handle a huge table it is enough to partition it to make the tool load it by running each single partition in a separate thread, dramatically decreasing table load time. In the case where no partitioning is defined the parallelism is handled at the table level.
In the process of extracting data from a host to load your data mart you basically have two choices:
- Extracting data directly from the host to the datamart;
- Creating a mirror of the data to a staging SQL server database and then loading it into the datamart.
While the first choice seems easier to develop, there are several advantages to the latter:
- You can develop all the packages disconnected from the host,
- You can easily create test cases modifying the staging database,
- You don’t depend on the availability of the host during the TL phases of ETL,
- You have the power of the SQL Server 2005 syntax for querying tables,
- You can easily create indexes to make your queries faster.
On the other hand, the main disadvantage of this choice is that your system will spend time loading the host database into the SQL staging; furthermore, you still need to build code to create the mirror. Though this is pretty easy with SSIS, you still have code to manage over time.
Up to now, I don’t think I have said anything interesting or new. Everybody involved in BI knows this situation well. The good news is that we decided to develop a tool to facilitate the process of mirroring several tables from the host to a staging database. By using SQL 2005 partitions we achieve a high level of parallelism and consequently of loading speed, so the interesting part of the article is the description of the tool and the technique beside it.
The naive solution
As you normally don’t need all the columns of the host table, the staging table has fewer fields than the original one. The first (and naive) solution is that of creating an SSIS package that has source SELECT statements fired against the host database and destination components to the SQL staging database. SSIS does the rest of the work for us, managing some kind of parallelism at the table level. A very easy solution that still has severe limitations:
- You have to manually code SELECT statements,
- If you have a lot of tables the package is not so easy to manage,
- The level of parallelism is at the table level; if you have a huge table and several small tables, the final time to run the package is that of the biggest table,
- When you need other columns and/or tables from the host you will have to modify the package with all the implications of making some sort of change to the software.
Sql Bulk Tool
In writing SqlBulkTool, our goal was to make this process easy and fast by:
- Never depending on manual code to mirror tables,
- Parallelizing the load process of the mirror using partitions.
So, SqlBulkTool is a command line utility that reads its configuration from an XML file containing source and destination command strings and a list of all the tables to mirror. It handles the work of mirroring the database in an automated and highly parallelized way.
The parallelism is handled using the partition capabilities of SQL Server 2005. To handle a huge table it is enough to partition it to make the tool load it by running each single partition in a separate thread, dramatically increasing table load time.
Why parallelizing operations improve performance
We observed that gaining speed using partitions is often possible even when all partitions are on the same physical disk: the reason is often the bottleneck of protocol/driver connecting with the source database, that doesn’t use the full bandwidth available to transfer data.
We would expect the bottleneck to be the write speed capacity of our destination disk subsystem, but we don’t often reach this limit nor that of network and CPU. In this situation, chances are that protocol latency, bandwidth limitation on client connections or ODBC/OLE DB client driver quality are the real reasons for low performance.
Another aspect is that bulk insert operations on SQL Server are (strangely?) CPU-intensive – as we use ADO.NET 2.0 classes to get bulk functionality, and most of the time is presumably spent on marshaling data from the unmanaged to managed world and vice-versa. Anyway, having many CPU cores is a great performance advantage compared to sequential bulk insert operations.
Long story short, the fact is that in many situations we get a great advantage using SqlBulkTool. Sometime this doesn’t happen, for example when you move data from a SQL Server to another SQL Server: in this case, the main advantage in using SqlBulkTool is less code to write and not increased performance.
Partitioning the table
Let’s say you have a table with 20 million rows. Loading it as a whole is very time-consuming. It would really be better to load it using 20 chunks of one million records each. You achieve a dramatic increase in speed because increasing parallelism on slow network operations optimizes the use of fast CPU and disks. When a thread is waiting for data from the network, other threads can use CPU and disk to load the data that has arrived.
This was our situation: we decided to partition the table based on a date field (stored as int containing yyyymmdd on the host) creating a partition for each quarter, so the partition function is
CREATE PARTITION FUNCTION [DatePartitionFunction](int)AS RANGE RIGHT FOR VALUES ( 20030101, 20030401, 20030901, 20040101, 20040401, 20040901, 20050101, 20050401, 20050901, 20060101, 20060401, 20060901, 20070101, 20070401, 20070901, 20080101, 20080401, 20080901, 20090101, 20090401, 20090901)
As you can see, we decided to create ALL the partitions we will need over time. This is so we don’t have to create code to automate partition handling in the future.
The partition scheme is very easy. We don’t need separate data files, we will use partitioning to achieve parallelism, not to manage storage:
CREATE PARTITION SCHEME [DatePartitionSchema] AS PARTITION [DatePartitionFunction] ALL TO ([PRIMARY])
The last phase is that of creating the mirror table with all and only the fields we need to handle the TL phases:
CREATE TABLE [BORI200F](..... [DATTS] [decimal](18, 0) NOT NULL,.....) ON [DatePartitionSchema]([DATTS])
So, now we have a huge table that can be divided into several smaller partitions; the task of loading them in parallel will be pretty easy and will be shown later.
The config file
The configuration is an XML file that looks like this:
<?xml version="1.0" encoding="UTF-8"?> <Configuration xmlns:xsd="SqlBulkToolConfig.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" MaxNumberOfThreads="15"> <DataBases> <DataBase Name="AdventureWorks TEST" Run="true" DataBaseClass="SqlServer" SourceConnectionString="Data Source=localhost; Integrated Security=True;Initial Catalog=AdventureWorksDW" DestConnectionString="Data Source=localhost; Integrated Security=True;Initial Catalog=SqlBulkToolTest"> <Tables> <Table SourceTableName="AdventureWorksDWBuildVersion" DestTableSchema="dbo" DestTableName="AdventureWorksDWBuildVersion" TruncateBeforeLoad="true" /> <Table SourceTableName="DatabaseLog" DestTableSchema="dbo" DestTableName="DatabaseLog" TruncateBeforeLoad="true" /> <Table SourceTableName="DimAccount" DestTableSchema="dbo" DestTableName="DimAccount" TruncateBeforeLoad="true" /> <Table SourceTableName="DimCurrency" DestTableSchema="dbo" DestTableName="DimCurrency" TruncateBeforeLoad="true" /> <Table SourceTableName="DimCustomer" DestTableSchema="dbo" DestTableName="DimCustomer" TruncateBeforeLoad="true" /> <Table SourceTableName="DimDepartmentGroup" DestTableSchema="dbo" DestTableName="DimDepartmentGroup" TruncateBeforeLoad="true" /> </Tables> </DataBase> </DataBases> </Configuration>
- MaxNumberOfThreads is the maximum number of parallel processes (table and partitions) that will run. Tweak it to meet your needs depending on CPU numbers and network speed.
- DataBase is the configuration of one database; you can have as many entries as you need.
- Name is a simple name used in the log to identify your database.
- Run is a Boolean, if false then the database will not be mirrored. It can be used to create a test database that will not run unless you are testing.
- DataBaseClass is the name of the destination DB. Up to now valid values are:
- SqlServer: Microsoft SQL Server.
- OleDb: Any OleDB provider.
- Navision: OleDB attached to a Navision database (via the ODBC driver).
- Firebird: Firebird database.
- Db2: OleDB attached to a DB2 database.
- SourceConnectionString, DestConnectionString are the connection strings for both source and destination DB. Beware that destination DB is always SQL Server, source DB may be any of the supported DB of SqlBulkTool.
- Tables: for each table you want to mirror you must provide:
- SourceTableName: the name used to build the SELECT clause.
- DestTableSchema: the schema of the destination table.
- DestTableName: the name of the destination table.
- TruncateBeforeLoad: if true the table will be truncated before it is loaded.
We have provided a sample script to create a database that will mirror some of the table in the AdventureWorksDW database. It is not intended as a demo or a test. It is just a sample to make it easier to understand the configurations of SqlBulkTool and to test the tool easily.
Here we provide a basic description of the project, for those of you who want to update/modify the code. The tool is built upon several classes:
- DataBase, Table and Partition: these classes contain information about how we can mirror information.
- ThreadManager: this class will handle all the threads that will do the mirroring. It will detach them when needed and guarantee proper synchronization of the operations.
- DatabaseOperation: this is the class used by ThreadManager to achieve parallelism. Each DatabaseOperation will do the atomic mirroring of one table or one partition, depending on the structure of the destination table.
The process of mirroring the database is to:
- Gather information about each table, decide whether it is partitioned or not and create the operations that will mirror the table;
- Start one thread for each operation, and let them work in parallel upon completion, limiting the maximum number of threads in order not to waste time in useless context switching.
When the system starts:
- It reads an XML file containing a list of databases, and for each database a list of tables to be mirrored.
- It analyzes each table of each database and optionally divides it into partitions. For each table or partition a DatabaseOperation is created which will load that table or partition.
- Upon termination of the analysis the real work starts, and one thread will be detached for each operation. When all the threads are finished the mirroring process is terminated.
Let’s see the classes in more detail.
DataBaseCopy is an abstract class designed to make it easy to handle specific dialect details of the source database. Its subclasses are specialized implementation that define details about the dialect used by the source database.
DataBaseCopy manages three important operations:
- iterate over its tables to prepare the loading threads;
- handle specific dialect related topics in building the SELECT statements for the source database;
- handle a list of commands in a critical section for the table; indeed, metadata operations for switching partitions in and out need to be carried on atomically.
Tablecopy is a class that holds information about one mirror table. If the table is partitioned then TableCopy holds a list of all the partitions in the table, otherwise TableCopy contains all the information needed to mirror the table by itself.
TableCopy is designed to:
- analyze the schema of the target table and build a SELECT statement that contains all the necessary columns to fill the target database. In this way, when you need another field from a host table, you will only have to add the field with a proper type to the target table; the tool will recognize it and add it to the SELECT statement.
- determine the partitioning structure of the table by analyzing partition function name, partition column name and partition limits.
- decide which partitions need to be rebuilt, if the table is partitioned.
If a table is partitioned, then for each single partition we need to maintain the information that will let us handle the mirror operation. These details are:
- The minimum and maximum values of the partition columns allowed to reside in that partition.
- The partition ID.
- Whether the partition needs to be rebuilt or not. If we decide not to load the entire source table – e.g. we can decide to just refresh the last year and avoid useless loading of years already consolidated – not all the partitions need rebuilding. The tool will just load the partition that actually needs to be rebuilt.
The process of copying a single partition involves:
- Creating a temporary table with the same structure as that of the target table,
- Switching out the partition to rebuild into the temporary table,
- Truncating it,
- Loading all the data into it,
- Switching in the temporary table into the partition,
- Dropping the temporary table.
As you should already know, this is the only sequence that guarantees fast partition handling. All the operations (beside loading) are metadata operations and do not need any operation on the data – switching a partition in and out is really fast.
Metadata operations are carried on in a critical section; loading is handled in parallel.
ThreadManager is responsible for coordinating threads. It will create a separate thread for each operation that has to be carried out, and detach it when opportune.
As it is useless to launch all the threads at once, ThreadManager holds a semaphore that will limit the number of running threads to a configured value. This way, it is easy to scale the system based on the power of the source or target server.
The tool is not intended to be perfect. Its main limitations up to now is that it will only handle RIGHT partitions – if someone wants to update it to provide handling of LEFT partitions… he/she will be our guest… Moreover, you may have different sources than the ones we provided; if so, it will be enough to derivate a class from DatabaseCopy and handle the specific dialect of your DB there.
The project can easily be extended to handle more complex situations. Nevertheless, we believe that it is a good example to demonstrate the power of SQL 2005 partitions as they can be used to parallelize the process of loading huge tables – thus gaining a dramatic speed improvement simply by making a better use of the resources of the server.
We used the tool in several projects and it worked well, providing us with a dramatic speed improvement in the mirroring of the production database to a first staging DB.
Please note that using SqlBulkTool, you may experience a timeout problem. This can be easily resolved by applying the patch that you can find in the Microsoft Knowledge Base.