-==- Scenario -==-
As it is wednesday, it's as good as time as any to get information on what comics going to be released today.
We can find (most of) the data from Diamond Comic Distributors web site.
Releases:
http://previewsworld.com/public/default.asp?t=3
So what we're going to do is download a few comic lists load them to a database. Also, we'll check if we have already loaded them previously and update the release date as there's a possibility that it's release has been pushed forward for some reason.
-==- Database -==-
Comic
------------
DiamondIdentifier varchar(10)
Name varchar(100)
Price varchar(10)
ReleaseDate date
-==- Control flow -==-
Parameters:
- DirectoryComics - Where the comics txt files are
- Connection string - Connection string to the database
We'll get these parameters from the ETLRunner (see ."Executing the ETL" below).
public class ComicLoadDataflow : Dataflow
{
public ComicLoadDataflow(string file, DateTime fileDate, ISession session)
{
// we'll take two inputs. one from the file and one from the database.
// load the file. the format is not strictly a CSV, so we'll skip everything that doesn't have a second column.
var inputFile = new DynamicCSV.DynamicCSVInput(
file, // filename
'\t', // separator character
false, // the file has no header
filter: row => row.ColumnCount() == 3 // filter out invalid rows
,orderBy: row => (string)row[0] // order by diamond identifier
);
// by registering the components we can follow their status
RegisterInput(inputFile);
// load the existing data from the database
var inputDB = new NHibernateDynamicSelect(
session,
"select DiamondIdentifier, Name, Price, ReleaseDate from Comic "+
"order by DiamondIdentifier"
);
RegisterInput(inputDB);
// check how they match. note! both streams MUST be sorted by primary keys (DiamondIdentifier)
var diff = new TableDifferenceTransformation<dynamic, dynamic>(
inputFile.OutputStream,
inputDB.OutputStream,
// we'll use the diamond identifier as a natural key for joining the datasets
new ColumnList<dynamic, dynamic> {
{ f => (string)f[0], db => db.DiamondIdentifier }
},
// if the name changes, we'll count is as changed and update it
new ColumnList<dynamic, dynamic> {
{ f => (string)f[1], db => db.Name },
{ f => fileDate, db => db.ReleaseDate },
});
var insert = new NHibernateDynamicInsert<dynamic>(
diff.OutputNew,
session,
"Comic",
input => new
{
DiamondIdentifier = (string)input[0],
Name = (string)input[1],
Price = (string)input[2],
ReleaseDate = fileDate
});
RegisterOutput(insert);
var update = new NHibernateDynamicUpdate<dynamic>(
diff.OutputUpdated,
session,
"Comic",
columns => new { Name = (string)columns[1], Price = (string)columns[2], ReleaseDate = fileDate },
where => new { DiamondIdentifier = (string)where[0] }
);
RegisterOutput(update);
}
}-==- Data flow -==-
Now that we have the general flow of the ETL figured out, we can do the actual moving of data.
public class ComicLoaderControlFlow : ControlFlow
{
public string Name
{
get { return "ComicLoader"; }
}
public ControlFlowResult Execute(ETLParameters etlParameters = null)
{
// verify parameters [will be refactored to a simpler one]
if (etlParameters == null) throw new ArgumentNullException("etlParameters");
if (!etlParameters.Exists("DirectoryComics")) throw new ParameterException("DirectoryComics", "DirectoryComics parameter is required");
if (!etlParameters.Exists("ConnectionString")) throw new ParameterException("ConnectionString", "ConnectionString parameter is required");
string directoryComics = etlParameters.Get<string>("DirectoryComics");
string connectionString = etlParameters.Get<string>("ConnectionString");
// verify directories
if (!Directory.Exists(directoryComics)) throw new ParameterException("DirectoryComics", "DirectoryComics parameter points to a directory that doesn't exist.");
// create archive directories
Directory.CreateDirectory(Path.Combine(directoryComics, "Archive"));
// create a NHibernate connection [hopefully refactored out later]
ISessionFactory factory = Fluently.Configure().Database(
MsSqlConfiguration.MsSql2008.ConnectionString(
connectionString
)).BuildSessionFactory();
using (ISession session = factory.OpenSession()) {
// loop through all txt files in the directory
foreach (var file in Directory.EnumerateFiles(directoryComics).Where(s => s.EndsWith("txt")))
{
// the files should be named mmddyy.txt
var date = Path.GetFileNameWithoutExtension(file);
DateTime parsedDateTime = default(DateTime);
if (!DateTime.TryParseExact(date, "MMddyy", CultureInfo.InvariantCulture, DateTimeStyles.None, out parsedDateTime))
{
// can't parse date, we'll have to quit.
return new ControlFlowResult { Result = ControlFlowResultType.Failed };
}
// we could either pass explictly the file and date or store them in parameters and pass that
var task = new ComicLoadDataflow(file, parsedDateTime, session).Execute();
// executing the dataflow we'll get a TPL Task object that's already started
// we're running the loading sequentially and not in parallel, so we'll wait it out
// if wanted to run all of them in parallel, we could collect them to a list and wait untill
// everything's done, or use Parallel.ForEach.
task.Wait();
// move file to archive
File.Move(file, Path.Combine(directoryComics, "Archive", Path.GetFileName(file)));
}
}
return new ControlFlowResult { Result = ControlFlowResultType.Succeeded };
}
}
-==- Executing the ETL -==-
After we've compiled our ControlFlow library, we'll execute it using a simple ETLRunner cmdline executable.
The ETLRunner is using MEF (Managed extensibility framework), which, if I understood correctly, should come with the new .Net framework 4.0. (I'm still building this with my VS2010 RC as I haven't had time yet to upgrade)
MEF allows us to find "requirements" to an object similarly as any other DI (dependency injection) framework, except with it we can also scan outside the current assemblies for classes. The ETLRunner scans the subdirectories of the directory we're executing in for all classes that implements ControlFlow interface and from there, we'll find a correct one using the Name property in the interface.
public class ETLRunner
{
[ImportMany]
public IEnumerable<ControlFlow> ControlFlows { get; set; }
...
}
var runner = new ETLRunner();
// create MEF container, that seeks ControlFlow components in current directory or below.
AggregateCatalog agcatalog = new AggregateCatalog();
foreach (var plugindir in Directory.GetDirectories(Environment.CurrentDirectory))
{
DirectoryCatalog dcat = new DirectoryCatalog(plugindir);
agcatalog.Catalogs.Add(dcat);
}
var container = new CompositionContainer(agcatalog);
// initialize runner's dependencies
container.ComposeParts(runner);
After calling the ComposeParts all the found ControlFlow classes are populated to the property.
The counterpart to the runners [ImportMany] attribute is in ControlFlow interface itself, thus reducing the responsibility of the framework user.
[InheritedExport(typeof(ControlFlow))] public interface ControlFlow
So, we'll copy the resulting binaries to a folder and execute it from the command line
etlrunner /name=ComicLoader /parameters=DirectoryComics:c:\diamondlist,ConnectionString:"Data Source=localhost;Initial Catalog=Comics;Integrated Security=SSPI;"
(Yes, that's a lot of parameters and one of the task in my endles todo-list is loading parameters from, for example, a text file.)
That's it. If you did everything correctly, you should get a result of
Execution of control flow was successful. Duration: n seconds.
0 comments:
Post a Comment