Sep 1, 2010

Entity Framework Feature CTP4

EFF CTP4 is a technical preview built on EF4 that was released on .Net framework 4.0.  It’s main purpose is to simplify usage in code first scenarios, so I wanted to go over a simple case here.

-==- Requirements & Preparation –==-
  1. If you don’t already have .Net framework 4.0, install that
  2. Download and install EFF CTP4
  3. Create a new console (or whatever you want to use for testing) project
  4. Add reference to Microsoft.Data.Entity.Ctp and System.Data.Entity
-==- POCOs & DBContext –==-

Creating the mapping and context is pretty easy:
public class Person {
    public long Id { get; set; }
    public string Name { get; set; }
    public virtual IList<Address> Addresses { get; set; }
}

public class Address {
    public long Id { get; set; }
    public string Street { get; set; }
    public string ZipCode { get; set; }
}

public class PersonCatalog : DbContext
{
    public PersonCatalog(string connectionString) : base(connectionString) { }

    public DbSet<person> Persons { get; set; }
    public DbSet<address> Addresses { get; set; }
}


After using the DBContext for the first time, it’ll create the database based on the context and POCOs used in it.

using (var dc = new PersonCatalog("PersonDB")) 
{
    dc.Persons.Add(new Person { Name = "James" });
    dc.SaveChanges(); 
}





As you can see, it automatically deduced primary and foreign keys by convention. You can also effect the creation (or prevent it) by setting the following:


Database.SetInitializer<PersonCatalog>(new CreateDatabaseOnlyIfNotExists<PersonCatalog>());

Other options are:
  • RecreateDatabaseIfModelChanges
  • AlwaysRecreateDatabase

 -==- Mapping – automatic & attributes –==-

If convention based mapping is not enough, you can specify more detailed metadata either by adding attributes to your POCO classes or using custom mapping inside the data context initialization.

For example:

public class Person {
    [Key]
    public string SocialSecurityNumber { get; set; }
    ...
}

Other supported annotations are

  • Key
  • StringLength
  • ConcurrencyCheck
  • Required
  • Timestamp
  • DataMember
  • RelatedTo
  • MaxLength
  • StoreGenerated

More information on these can be found on EF Design blog.

-==- Mapping – custom mapping with fluent API –==-

You can add to your existing metadata by overriding the OnModelCreating method in DBContext and using the ModelBuilders fluent configuration.

protected override void OnModelCreating(ModelBuilder modelBuilder)
{
    modelBuilder.Entity().HasKey(p => p.Id);
    modelBuilder.Entity().Property(p => p.Name).Optional = false;
}

-==- More resources –==-

This was a very short intro to EFFCTP4, so take a look around for more info on it.

Release notes for EFF CTP4 on Ado.Net blog
More tips & tricks by Ro Miller

Aug 20, 2010

ASP.Net MVC 3 (preview) and dependency injection

The new ASP.Net MVC 3 preview release now supports much easier dependency injection. No more creating your own controller factories to use DI in controllers. All you have to do is just find or implement your own IMvcServiceLocator for the DI framework you're using. As easy as it was to implement your own controller factories, they tend to change the interface between major versions, so this will be a more permanent solution.

Because I'm mostly using StructureMap, here's a simple implementation and use case for it. (I'm sure you can implement almost all of the current DI frameworks as easily and similarly as StructureMap.)


public class StructureMapServiceLocator : IMvcServiceLocator
{
    protected IContainer Container { get; private set; }

    public StructureMapServiceLocator(IContainer container)
    {
        Container = container;
    }

    #region IServiceLocator Members

    public System.Collections.Generic.IEnumerable<object> GetAllInstances(System.Type serviceType)
    {
        return Container.GetAllInstances(serviceType).Cast<object>();
    }

    public System.Collections.Generic.IEnumerable<TService> GetAllInstances<TService>()
    {
        return Container.GetAllInstances<TService>();
    }

    ...

    public TService GetInstance<TService>()
    {
        return Container.GetInstance<TService>();
    }

    #endregion
}

The wrapper implementation is pretty straightforward as StructureMap has basically the same methods available.

Using the StructureMap container is as easy as it would be anywhere else, with only one small addition. We have to explicitly say that the ASP.Net MVC controller factory is the DefaultControllerFactory. Just add the following to your Global.asax / Application_Start and you're good to go.

var container = new Container(configuration => {
 configuration.For<IControllerFactory>().Use(new DefaultControllerFactory());
});

MvcServiceLocator.SetCurrent(new StructureMapServiceLocator(container));

To add dependencies to your controllers, I've implemented a dummy service:

public interface ISampleService
{
    string SayHello(string name);
}

public class ConcreteSampleService : ISampleService
{
    public string SayHello(string name)
    {
        return string.Format("Hello, {0}", name);
    }
}

And modified the default HomeController created by the project template:

public class HomeController : Controller
{
    protected ISampleService Service { get; private set; }

    public HomeController(ISampleService service)
    {
        Service = service;
    }

    public ActionResult Index()
    {
        ViewModel.Message = Service.SayHello("you");

        return View();
    }
}

Now that we have a dependency in our controller, we can simply configure it with the container (just add it before or after the IControllerFactory registeration):

configuration.For<ISampleService>().Use<ConcreteSampleService>();

That's it. Just run the web site and you should see a nice "Hello, you" message instead of the standard "Welcome to ASP.NET MVC!".

Hello, you

-==- References -==-

You should also check out Brad Wilsons post that also covers view engine and filter injection.

Also, if you haven't already seen what else is new on MVC 3, then ScottGu's post is a must read.

-==- Source -==-

Can be downloaded from here.

May 19, 2010

Testing ETL workflows

Finally, some experiences on testing. I'm using the sample code from previous post,
except I modified it a bit to use more current input/output components. Previous version used the old NHibernate versions, but replacing them was pretty straightforward.

Note that all the test cases are more scenario based tests then actual unit tests, as the components are much larger then regularly tested by a single unit test _and_ that they also require an external component (in this case the database).

-==- Testing the dataflow -==-

Primary parts of the dataflow are:
- loading data from CSV
- skipping invalid rows
- inserting loaded date to table

Testing the dataflow is the easy part as it's mostly separated from anything else except the database. Unfortunately, as databases differ, there's no simple way to abstract away using real database,
so we'll assume for now that there's a local version of the database on (each) developer's machine.

For testing, I'm using Visual Studio Unit Testing Framework, eg. the one that comes with any VS version.

Ok, so first, let's see what's common between all the tests and setup them as initialize/cleanup:

private string workingDir = null;

private const string connectionString = @"Data Source=localhost;Initial Catalog=Test;Integrated Security=SSPI;";

private DynamicSQL sql = null;

[TestInitialize]
public void Initialize()
{
    workingDir = Path.GetDirectoryName(Assembly.GetAssembly(typeof(DataFlowTests)).Location);
    sql = new DynamicSQL(connectionString, "System.Data.SqlClient");

    // clean up the table
    sql.ExecuteSQL("truncate table comic");
}

[TestCleanup]
public void Cleanup()
{
    if (sql != null) sql.Dispose();
}

Here we'll setup the working directory; that is, where we'll be getting our input files from. Also, we'll open a database querying tool which we'll use to verify the input results. Also, to assure that the database will be in correct state before each test, we'll clear up the table between each test case.

Then on to the actual tests.

If you'll remember our input file is not a straight up CSV file, but contains rows that are not part of the data we will import. Therefore, we'll verify that only the data is imported.
I downloaded the latest list from http://previewsworld.com/shipping/archive/2010/051910.txt and took only a part of it for testing. There are 12 correct rows and 12 lines of either empty rows or some text.
I've added the file to my solution with "Copy to output directory" set to "Copy always", so we can access it from build directory (that's why we set working directory to the same directory as the assembly location).

[TestMethod]
public void dataflow_should_skip_rows_that_doesnt_have_3_columns()
{
    // simulate datatime deducing
    var fileDateTime = new DateTime(2010, 5, 19);

    // load data
    var df = new ComicLoadDataflow(
        Path.Combine(workingDir, "051910_partial.txt"),
        fileDateTime,
        connectionString);

    var task = df.Execute();
    task.Wait();

    // verify that we loaded only the 12 correct rows
    Assert.AreEqual(12, sql.Single("select count(0) Count from comic").Count);
}

After verifying that we actually get rows to the database, we'll verify that the data is inserted into the right columns. For this, I'm using an input file containing only one row.

[TestMethod]
public void verify_that_csv_columns_are_placed_in_right_columns()
{
    // simulate datatime deducing
    var fileDateTime = new DateTime(2010, 5, 19);

    // load data
    var df = new ComicLoadDataflow(
        Path.Combine(workingDir, "051910_one_row.txt"),
        fileDateTime,
        connectionString);

    var task = df.Execute();
    task.Wait();

    // load loaded row from database
    var row = sql.Single("select * from comic");

    // verify columns
    Assert.AreEqual<string>("MAR100023", row.DiamondIdentifier);
    Assert.AreEqual<string>("AVP THREE WORLD WAR #4 (OF 6)", row.Name);
    Assert.AreEqual<string>("$3.50", row.Price);
    Assert.AreEqual<DateTime>(fileDateTime, row.ReleaseDate);
}

Then on to the hard part. Our data loading uses SCD so we'll need two files. An original file and updated file that contains new rows and updates to existing rows.

[TestMethod]
public void new_rows_in_csv_should_create_new_rows_in_db_and_changed_rows_should_be_updated()
{

    // simulate datatime deducing
    var fileDateTime = new DateTime(2010, 5, 19);

    // load data
    var df = new ComicLoadDataflow(
        Path.Combine(workingDir, "051910_scd_part1.txt"),
        fileDateTime,
        connectionString);

    var task = df.Execute();
    task.Wait();

    Assert.AreEqual(12, sql.Single("select count(0) Count from comic").Count);

    // update the datetime to simulate a new file coming the next day.
    fileDateTime = fileDateTime.AddDays(1);

    // load inserted and updated rows
    df = new ComicLoadDataflow(
        Path.Combine(workingDir, "051910_scd_part2.txt"),
        fileDateTime,
        connectionString);

    task = df.Execute();
    task.Wait();

    // verify updates
    var changedRow = sql.Single("select * from comic where diamondidentifier = 'DEC090058'");

    Assert.IsNotNull(changedRow);
    Assert.AreEqual<string>("OH MY GODDESS RTM TP VOL 35", changedRow.Name);
    Assert.AreEqual<DateTime>(fileDateTime, changedRow.ReleaseDate);

    var insertRows = sql.Single("select count(0) Count from comic where diamondidentifier in ('MAR100302', 'MAR100296', 'JAN100295')");

    // verify that we had 3 new rows
    Assert.AreEqual(3, insertRows.Count);
}

That's it. If you want, you can make the tests more explicit; I've used only row counts to verify inserts, so they might not be 100% accurate.

Running the tests is not as fast as running normal unit tests as there is an round-trip time to the database, but it's still fast enough to run whenever you want to.

-==- Testing the control flow -==-

Primary parts of the control flow are
1. create an archive directory if one does not exist
2. loop through all txt-files in input directory
3. parse DateTime from file name and if not possible, break control flow execution
4. execute dataflow for each file
5. move file to archive

Testing the control flow is a bit harder then the dataflow. Data flow is not currently injectable (which is a problem I need to solve), so I had to rework a bit of the control flow.
So, instead of newing up the dataflow inside the control flow I made a virtual function that we can override for mocking:

public virtual Dataflow CreateComicLoadDataflow(string file, DateTime parsedDateTime, string connectionString)

How to do this correctly is a problem for a later date. For now this should suffice for this simple exercise.

For the next part, I'm using OS mocking framework Moq.

Settings up and cleaning up the tests:

private Mock<Dataflow> df = null;
private Mock<ComicLoaderControlFlow> cf = null;

private string tmpInputpath = null;
ETLParameters parameters = null;

[TestInitialize]
public void Initialize()
{
    // create a mockup for dataflow
    df = new Mock<Dataflow>(MockBehavior.Strict);

    // setup dataflow execution to always to an empty task
    df.Setup(d => d.Execute()).Returns(() => {
        var task = new System.Threading.Tasks.Task(() => { Thread.Sleep(100); });
        task.Start();
        return task;
    });

    // create a partially mocked controlflow
    cf = new Mock<ComicLoaderControlFlow>(MockBehavior.Strict);

    // setup dataflow creation to return previously mocked dataflow
    cf.Setup(c => c.CreateComicLoadDataflow(It.IsAny<string>(), It.IsAny<DateTime>(), It.IsAny<string>())).Returns(
        df.Object
    );

    // setup input path
    tmpInputpath = Path.Combine(Path.GetTempPath() + "TestDir");
    Directory.CreateDirectory(tmpInputpath);

    parameters = new ETLParameters();
}

[TestCleanup]
public void Cleanup()
{
    // delete the whole input path directory recursively. 
    if (Directory.Exists(tmpInputpath))
        Directory.Delete(tmpInputpath, true);
}

Let's verify step 1.

[TestMethod]
public void control_flow_creates_archive_directory()
{
    var parameters = new ETLParameters();

    string archiveDir = Path.Combine(tmpInputpath, "Archive");

    parameters.Set("DirectoryComics", tmpInputpath);
    parameters.Set("ConnectionString", "not empty");

    // execute the dataflow
    var result = cf.Object.Execute(parameters);

    // verify that archive directory was created

    Assert.IsTrue(Directory.Exists(archiveDir), "Control flow should have created an archive directory.");
    Assert.AreEqual(ControlFlowResultType.Succeeded, result.Result);
}

Step 2 (looping (only) text files), step 3 part 1 (parse datetime succesfully) and step 4, execute dataflow for each file.

[TestMethod]
public void control_flow_processes_all_txt_files()
{
    // create a file to be processed
    File.CreateText(Path.Combine(tmpInputpath, "123010.txt")).Close();
    File.CreateText(Path.Combine(tmpInputpath, "123110.txt")).Close();
    File.CreateText(Path.Combine(tmpInputpath, "123110.csv")).Close();

    parameters.Set("DirectoryComics", tmpInputpath);
    parameters.Set("ConnectionString", "not empty");

    var result = cf.Object.Execute(parameters);

    string archiveDir = Path.Combine(tmpInputpath, "Archive");

    // verify that only the two text files are processed
    df.Verify(d => d.Execute(), Times.Exactly(2));
}

Yes, you should probably do this to 2-3 parts, because if one of the two cases fails, you won't know which one it was. Unfortunately, we don't have enough seems in the code, so this'll have to suffice for now.


Step 3 part 2.

[TestMethod]
public void control_flow_fails_if_filename_is_not_a_date()
{
    // create a file to be processed
    File.CreateText(Path.Combine(tmpInputpath, "fail.txt")).Close();

    parameters.Set("DirectoryComics", tmpInputpath);
    parameters.Set("ConnectionString", "not empty");

    // execute the dataflow
    var result = cf.Object.Execute(parameters);

    Assert.AreEqual(ControlFlowResultType.Failed, result.Result, "Control flow should fail if filename is not a data.");

    df.Verify(d => d.Execute(), Times.Never());
}

Step 5. moving file to archive.

[TestMethod]
public void control_flow_moves_processed_file_to_archive()
{
    string archiveDir = Path.Combine(tmpInputpath, "Archive");

    // create a file to be processed
    File.CreateText(Path.Combine(tmpInputpath, "123110.txt")).Close();

    parameters.Set("DirectoryComics", tmpInputpath);
    parameters.Set("ConnectionString", "not empty");

    // execute the dataflow
    var result = cf.Object.Execute(parameters);

    // verify that archive directory was created

    Assert.IsTrue(File.Exists(Path.Combine(archiveDir, "123110.txt")), "File wasn't moved to archive.");
    Assert.AreEqual(ControlFlowResultType.Succeeded, result.Result);
}

-==- Conclusion -==-

Dataflow testing is simple, control flow is still on the wrong level (and needs to be split up) for testability and needs to be refactored.

May 12, 2010

Tail utility utilizing Reactive extensions

I've had a bit of misfortune with my hardware last week as my desktop's hard drive crashed and I'm just about to begin restoring it from backups. Hopefully they'll work as expected :) There's a good lesson here to verify your backup strategy by actually using the restore functionality before it's critical that it works.

Anyway, being in a bit of an hurry, here's something I've coded for my personal projects. A simple tail utility to follow your logs and whatnots using Reactive extensions as the component that delivers the changed rows to the caller.

-==- Triggers -==-

Firstly, we'll need something to check that the file has actually changed, as we probably don't want to constantly check it for changes. For this there's already a good utility class called FileSystemWatcher that's included in .Net framework.

// initialize watcher
Watcher = new FileSystemWatcher(Path.GetDirectoryName(filePath), Path.GetFileName(filePath));


// notify only when the size changes
Watcher.NotifyFilter = NotifyFilters.Size;
(Note that I've used a parameter "filePath" that is a full path to a file. That's why I'm splitting it using Path.)

After initializing the watcher, we'll subscribe to the changed event using Rx:

var eventObservable = Observable.FromEvent<FileSystemEventArgs>(Watcher, "Changed");

As you can see, the event that we're following is given as a "magic string" instead of strongly typing it. For strongly typed events there are a few options like from Bobby Diaz and from Clarius Consulting.

The actual event information won't though convey how it's changed, so we'll need to process it ourselves. Also, it's probably a good idea to take only a single event that's happened during a constant time (here it's one second), and not all the events, as there might be multiple writes to the file.

from e in eventObservable.Throttle(TimeSpan.FromMilliseconds(1000)) // take only one event per second
select ProcessChanges();

Watcher.EnableRaisingEvents = true; // begins watching and raising events


To process what's happened during a single event, we'll use a StreamReader that we've kept open from the beginning and read all the complete lines (not unfinished, otherwise the information will be garbage).

protected IEnumerable<string> ProcessChanges()
{
    // prevent processing multiple events at the same time
    lock (lockObject) {
        // read lines until the EOF
        while (!Reader.EndOfStream)
        {
            // save current position, in case we stumble across an unfinished row write
            CurrentPosition = Reader.BaseStream.Position;

            StringWriter writer = new StringWriter();
            // read until finds end of line
            while (!Reader.EndOfStream && (Reader.Peek() != '\n' && Reader.Peek() != '\r'))
            {
                writer.Write((char)Reader.Read());
            }
   
            // if we got to EOF while reading, return back to the beginning of line and we'll wait for the next event that hopefully will have finished it.
            if (Reader.EndOfStream)
            {
                // return to beginning, no end of line found; assuming that line is not fully written.
                Reader.BaseStream.Position = CurrentPosition;
                break;
            }
            else
            {
                // skip the line ending
                while (Reader.Peek() == '\n' || Reader.Peek() == '\r') Reader.Read();

                // return row
                yield return writer.ToString();
            }
        }
    }
}

After all the eventing and processing is done, we can simply subscribe to the observable and wait things to happend:

eventObservable.Subscribe(
  rows =>
  {
    // print new rows to console
    foreach (var row in rows)
    {
      System.Console.WriteLine(row);
    }
  },
  // print exceptions
  e => System.Console.WriteLine(e));

System.Console.ReadKey();

-==- Source -==-

The full source can be found here. [http://www.tonikielo.com/files/rx_tail.cs]

Apr 28, 2010

ETL Framework 0.1.0

I've been bit busy with work projects, so I had to skip last weeks post. As a consolation price, here's a v.0.1.0 release of the ETL framework.

-==- What's inside? -==-

Inputs:
- CSV
- SQL select (ADO.Net supported databases)

Outputs:
- CSV
- SQL Insert/Update/Delete (ADO.Net supported databases)
- SQL bulk insert (ADO.Net supported databases)
  * Bulk insert also supports "infinite" data sources with TimeSpan inserts. For example, you can commit everything coming in every five seconds.

This might be a nice solution for StreamInsight (just released) data handling, though I haven't tried it myself.

I tried using NHibernate for input/output, but I just couldn't get it to work fast enough so I had to skip it and work with ADO.Net instead. Luckily, with dynamic scenarios it works much better then NHibernate, which is better suited to typed scenarios.

Transformations:

- Conditional split - Splits one stream into multiple output streams
- "Derived column" - Used to modify object values and in case of dynamic objects, to create new columns and change datatypes [if the object in question supports that].
- Database lookup - Finds row(s) from database per each row in stream. Recommended to just (bulk) insert everything to database and use join, but in some cases it might not be possible.
- Filter - Filters out objects in streams using Linq
- Group by - Just what is says
- Merge join - Joins two streams together using key columns (currently buffers everything before passing it forward; probably not usable with massive amounts of data)
- Merge - Takes two inputs and creates a new object containing the values from both streams. First come, first served, so if you're using a parallel data source the results are not reproducible. If you've already splitted one stream to two, this is a fast way to merge them together as long as they both contain the same amount of rows.
- Multicast - Outputs one input row to multiple output rows. If you wan't to just forward something that you won't later modify _OR_ wan't to modify in all streams, you can just forward the same object reference. Otherwise the object must implement ICloneable interface so we can copy an individual row to each output stream.
- Pivot - Takes an object and creates as many output rows as there are non-key properties.
- Rowcount - Counts how many rows were passed through the component
- Rownumber - Sets an individual row number to each row.
- Sort - Sorts the objects in stream with Linq expressions. Though recommended to use data source (SQL) sorting.
- Truncate - Shortens string-columns to given length if it's too long.
- Union all - Combines two streams in SQL union style.
- "Table difference" - Used for SCD[n] operations. Takes two streams that are similarly sorted and outputs rows that are considered inserted, updated, deleted or unchanged.

Suggested usage with large data masses is to use SQL bulk insert to insert them into a database and from there perform updates and deletes.

Executing thousands of delete clauses like "delete from table where id = @param" is much slower then executing "delete from table where exists (select 0 from tmp_deleted tmp where tmp.id = table.id)". Though you have to remember the empty the table: "truncate table tablename".

This is a binary release containing two parts:

1) The framework itself
2) A command line ETL runner

-==- Caveat emptor -==-

As a version 0.1.0 (that's pre-pre-alpha :) the interfaces and components are probably (definitely) going to change.

And yes, there are bugs. The framework is mostly scenario tested and not unit tested.

You can report any bugs you find to toni@tonikielo.com

-==- What's up with vNext -==-

There's just so much you can do with Reactive Extensions.

0) Upgrade to RTM
1) Perhaps I can get enough free time to actually clean up the code to release it.
2) Some of the components are slow; they have to buffer the whole stream in so that it can operate on the full dataset. Hopefully I can fix those.
3) Deployment is one of the points that needs refining. Perhaps there'll be a windows service that will run the package based on triggers (time, new files in folder etc).
    * Also, packaging the component, even if in zip, would be nice.
4) Logging / "seeing" what the ETL actually does is a must have.
5) Exception handling. Now if you've misconfigured a dynamic column, all your rows will go to error output. That's useful, but usually you just want to blow the whole component sky high (with exception) instead of getting an error message per row. Of course that's not the only error type, so handling the different types of errors is important.

I'm sure I'll figure out more scenarios that it needs to fit in after I've used in a few more production projects.

If you have any suggestions, criticism (with suggestions for improvement) or something else:
toni // tonikielo.com

-==- Binaries -==-

Framework
ETL Runner

-==- Usage -==-

Here's a simple usage sample. Loading a CSV file and inserting everything to database.

Prerequisites:
- Rx RC is installed to the machine [http://go.microsoft.com/fwlink/?LinkId=182997]
- Visual Studio 2010 RC. Yes, the RTM is out, but I haven't yet had to opportunity to install it and test everything. Also, Rx seems to still be in RC mode.
Create a CSV file containing:

FirstName;LastName
Matti;Mikkonen
Teppo;Mikkonen

// get rows from CSV file as dynamic rows. You can use FirstName and LastName columns as properties in a row.

var input = new DynamicCSVInput(
    // location of the csv file
    @"c:\file\location\file.csv", 
    // separator character
    ';', 
    // file has header
    true
  );
RegisterInput(input); // always remember to register inputs and outputs!

// Static in the name describes how you'll be handling the columns. 
// Static means that the structure (table) is known in advance and Dynamic means that you can configure it on the fly.

var personOutput = new StaticSQLBulkInsertOutput<dynamic>(
  // take the input from csv file
  input.OutputStream, 
  // input the data to Persons-table
  "dbo.Persons",
  // create a mapping between the CSV and the table
  row => new
  {
    // FirstName column from CSV will be inserted to Persons table's Person_FirstName column
    Person_FirstName = (string)row.FirstName, 
    // CSV properties are cast as strings, because of the dynamic nature of the CSV property.
    // As all the data in CSV is of course in string format, the Property returns an object that when cast,
    // will be automaticly convert to the given value type, if it's supported and possible.
    // For example, casting "1.1" or "1,1" to a float will work, but casting to a int won't.
    // If you always want get strings from property instead of the "casting object", you can set it in the DynamicCSVInput.
    Person_EntityName = (string)row.LastName
  },
  // connection string to the database. See www.connectionstrings.com for more info.
  @"Data Source=localhost;Initial Catalog=SetInitialCatalog;Integrated Security=SSPI;", 
  // How many rows to insert at a time
  // I've gotten speeds of about 30000 inserts per second when using large data masses. 15000 buffer seems to be a fitting size for millions of rows.
  bufferSize: 15000
);
RegisterOutput(personOutput); 

So, now we have a simple dataflow.
Now all we need to do is put it to a Dataflow class.

public class SampleDataflow : Dataflow
{
  public SampleDataflow() {
    // copy above code here
  }
}

And now we can execute it from anywhere in our code. Simply by:
new SampleDataflow().Execute();

If you're using a more complex structure and/or you wan't to use it from the ETLRunner, you'll have to create a controlflow class, that'll call it.

public class SampleControlflow : ControlFlow
{
 public string Name { get { return "Sample"; } }

 public List<ParameterRequirement> RequiredParameters
 {
  // we have no required parameters
  get { return new List<ParameterRequirement>(); }
 }

 public ControlFlowResult Execute(ETLParameters etlParameters = null, Logging.ETLLogger logger = null)
 {
  var task = new SampleDataflow().Execute();
  // Wait until the dataflow is completed. You can execute multiple dataflows in parallel with TPL, if needed.
  task.Wait();
  return new ControlFlowResult { Result = ControlFlowResultType.Succeeded };
 }
}


Now all you have to do is copy the project results (all dll files including the framework dlls) to a directory below where you unpacked the ETLRunner and execute it from the command line:

ETLRunner /name=Sample

-==- Using 3rd party ADO.Net providers -==-

Add the following to your App.Config (sample's for SQLite, see other providers documentation for theirs):

<system.data>
  <DbProviderFactories>
    <remove invariant="System.Data.SQLite" />
    <add name="SQLite Data Provider" invariant="System.Data.SQLite" description=".Net Framework Data Provider for SQLite" type="System.Data.SQLite.SQLiteFactory, System.Data.SQLite" />
  </DbProviderFactories>
</system.data>

-==- Next up... -==-

Probably more samples and how to test your dataflows.