Apr 28, 2010

ETL Framework 0.1.0

I've been bit busy with work projects, so I had to skip last weeks post. As a consolation price, here's a v.0.1.0 release of the ETL framework.

-==- What's inside? -==-

Inputs:
- CSV
- SQL select (ADO.Net supported databases)

Outputs:
- CSV
- SQL Insert/Update/Delete (ADO.Net supported databases)
- SQL bulk insert (ADO.Net supported databases)
  * Bulk insert also supports "infinite" data sources with TimeSpan inserts. For example, you can commit everything coming in every five seconds.

This might be a nice solution for StreamInsight (just released) data handling, though I haven't tried it myself.

I tried using NHibernate for input/output, but I just couldn't get it to work fast enough so I had to skip it and work with ADO.Net instead. Luckily, with dynamic scenarios it works much better then NHibernate, which is better suited to typed scenarios.

Transformations:

- Conditional split - Splits one stream into multiple output streams
- "Derived column" - Used to modify object values and in case of dynamic objects, to create new columns and change datatypes [if the object in question supports that].
- Database lookup - Finds row(s) from database per each row in stream. Recommended to just (bulk) insert everything to database and use join, but in some cases it might not be possible.
- Filter - Filters out objects in streams using Linq
- Group by - Just what is says
- Merge join - Joins two streams together using key columns (currently buffers everything before passing it forward; probably not usable with massive amounts of data)
- Merge - Takes two inputs and creates a new object containing the values from both streams. First come, first served, so if you're using a parallel data source the results are not reproducible. If you've already splitted one stream to two, this is a fast way to merge them together as long as they both contain the same amount of rows.
- Multicast - Outputs one input row to multiple output rows. If you wan't to just forward something that you won't later modify _OR_ wan't to modify in all streams, you can just forward the same object reference. Otherwise the object must implement ICloneable interface so we can copy an individual row to each output stream.
- Pivot - Takes an object and creates as many output rows as there are non-key properties.
- Rowcount - Counts how many rows were passed through the component
- Rownumber - Sets an individual row number to each row.
- Sort - Sorts the objects in stream with Linq expressions. Though recommended to use data source (SQL) sorting.
- Truncate - Shortens string-columns to given length if it's too long.
- Union all - Combines two streams in SQL union style.
- "Table difference" - Used for SCD[n] operations. Takes two streams that are similarly sorted and outputs rows that are considered inserted, updated, deleted or unchanged.

Suggested usage with large data masses is to use SQL bulk insert to insert them into a database and from there perform updates and deletes.

Executing thousands of delete clauses like "delete from table where id = @param" is much slower then executing "delete from table where exists (select 0 from tmp_deleted tmp where tmp.id = table.id)". Though you have to remember the empty the table: "truncate table tablename".

This is a binary release containing two parts:

1) The framework itself
2) A command line ETL runner

-==- Caveat emptor -==-

As a version 0.1.0 (that's pre-pre-alpha :) the interfaces and components are probably (definitely) going to change.

And yes, there are bugs. The framework is mostly scenario tested and not unit tested.

You can report any bugs you find to toni@tonikielo.com

-==- What's up with vNext -==-

There's just so much you can do with Reactive Extensions.

0) Upgrade to RTM
1) Perhaps I can get enough free time to actually clean up the code to release it.
2) Some of the components are slow; they have to buffer the whole stream in so that it can operate on the full dataset. Hopefully I can fix those.
3) Deployment is one of the points that needs refining. Perhaps there'll be a windows service that will run the package based on triggers (time, new files in folder etc).
    * Also, packaging the component, even if in zip, would be nice.
4) Logging / "seeing" what the ETL actually does is a must have.
5) Exception handling. Now if you've misconfigured a dynamic column, all your rows will go to error output. That's useful, but usually you just want to blow the whole component sky high (with exception) instead of getting an error message per row. Of course that's not the only error type, so handling the different types of errors is important.

I'm sure I'll figure out more scenarios that it needs to fit in after I've used in a few more production projects.

If you have any suggestions, criticism (with suggestions for improvement) or something else:
toni // tonikielo.com

-==- Binaries -==-

Framework
ETL Runner

-==- Usage -==-

Here's a simple usage sample. Loading a CSV file and inserting everything to database.

Prerequisites:
- Rx RC is installed to the machine [http://go.microsoft.com/fwlink/?LinkId=182997]
- Visual Studio 2010 RC. Yes, the RTM is out, but I haven't yet had to opportunity to install it and test everything. Also, Rx seems to still be in RC mode.
Create a CSV file containing:

FirstName;LastName
Matti;Mikkonen
Teppo;Mikkonen

// get rows from CSV file as dynamic rows. You can use FirstName and LastName columns as properties in a row.

var input = new DynamicCSVInput(
    // location of the csv file
    @"c:\file\location\file.csv", 
    // separator character
    ';', 
    // file has header
    true
  );
RegisterInput(input); // always remember to register inputs and outputs!

// Static in the name describes how you'll be handling the columns. 
// Static means that the structure (table) is known in advance and Dynamic means that you can configure it on the fly.

var personOutput = new StaticSQLBulkInsertOutput<dynamic>(
  // take the input from csv file
  input.OutputStream, 
  // input the data to Persons-table
  "dbo.Persons",
  // create a mapping between the CSV and the table
  row => new
  {
    // FirstName column from CSV will be inserted to Persons table's Person_FirstName column
    Person_FirstName = (string)row.FirstName, 
    // CSV properties are cast as strings, because of the dynamic nature of the CSV property.
    // As all the data in CSV is of course in string format, the Property returns an object that when cast,
    // will be automaticly convert to the given value type, if it's supported and possible.
    // For example, casting "1.1" or "1,1" to a float will work, but casting to a int won't.
    // If you always want get strings from property instead of the "casting object", you can set it in the DynamicCSVInput.
    Person_EntityName = (string)row.LastName
  },
  // connection string to the database. See www.connectionstrings.com for more info.
  @"Data Source=localhost;Initial Catalog=SetInitialCatalog;Integrated Security=SSPI;", 
  // How many rows to insert at a time
  // I've gotten speeds of about 30000 inserts per second when using large data masses. 15000 buffer seems to be a fitting size for millions of rows.
  bufferSize: 15000
);
RegisterOutput(personOutput); 

So, now we have a simple dataflow.
Now all we need to do is put it to a Dataflow class.

public class SampleDataflow : Dataflow
{
  public SampleDataflow() {
    // copy above code here
  }
}

And now we can execute it from anywhere in our code. Simply by:
new SampleDataflow().Execute();

If you're using a more complex structure and/or you wan't to use it from the ETLRunner, you'll have to create a controlflow class, that'll call it.

public class SampleControlflow : ControlFlow
{
 public string Name { get { return "Sample"; } }

 public List<ParameterRequirement> RequiredParameters
 {
  // we have no required parameters
  get { return new List<ParameterRequirement>(); }
 }

 public ControlFlowResult Execute(ETLParameters etlParameters = null, Logging.ETLLogger logger = null)
 {
  var task = new SampleDataflow().Execute();
  // Wait until the dataflow is completed. You can execute multiple dataflows in parallel with TPL, if needed.
  task.Wait();
  return new ControlFlowResult { Result = ControlFlowResultType.Succeeded };
 }
}


Now all you have to do is copy the project results (all dll files including the framework dlls) to a directory below where you unpacked the ETLRunner and execute it from the command line:

ETLRunner /name=Sample

-==- Using 3rd party ADO.Net providers -==-

Add the following to your App.Config (sample's for SQLite, see other providers documentation for theirs):

<system.data>
  <DbProviderFactories>
    <remove invariant="System.Data.SQLite" />
    <add name="SQLite Data Provider" invariant="System.Data.SQLite" description=".Net Framework Data Provider for SQLite" type="System.Data.SQLite.SQLiteFactory, System.Data.SQLite" />
  </DbProviderFactories>
</system.data>

-==- Next up... -==-

Probably more samples and how to test your dataflows.

0 comments:

Post a Comment