May 12, 2010

Tail utility utilizing Reactive extensions

I've had a bit of misfortune with my hardware last week as my desktop's hard drive crashed and I'm just about to begin restoring it from backups. Hopefully they'll work as expected :) There's a good lesson here to verify your backup strategy by actually using the restore functionality before it's critical that it works.

Anyway, being in a bit of an hurry, here's something I've coded for my personal projects. A simple tail utility to follow your logs and whatnots using Reactive extensions as the component that delivers the changed rows to the caller.

-==- Triggers -==-

Firstly, we'll need something to check that the file has actually changed, as we probably don't want to constantly check it for changes. For this there's already a good utility class called FileSystemWatcher that's included in .Net framework.

// initialize watcher
Watcher = new FileSystemWatcher(Path.GetDirectoryName(filePath), Path.GetFileName(filePath));


// notify only when the size changes
Watcher.NotifyFilter = NotifyFilters.Size;
(Note that I've used a parameter "filePath" that is a full path to a file. That's why I'm splitting it using Path.)

After initializing the watcher, we'll subscribe to the changed event using Rx:

var eventObservable = Observable.FromEvent<FileSystemEventArgs>(Watcher, "Changed");

As you can see, the event that we're following is given as a "magic string" instead of strongly typing it. For strongly typed events there are a few options like from Bobby Diaz and from Clarius Consulting.

The actual event information won't though convey how it's changed, so we'll need to process it ourselves. Also, it's probably a good idea to take only a single event that's happened during a constant time (here it's one second), and not all the events, as there might be multiple writes to the file.

from e in eventObservable.Throttle(TimeSpan.FromMilliseconds(1000)) // take only one event per second
select ProcessChanges();

Watcher.EnableRaisingEvents = true; // begins watching and raising events


To process what's happened during a single event, we'll use a StreamReader that we've kept open from the beginning and read all the complete lines (not unfinished, otherwise the information will be garbage).

protected IEnumerable<string> ProcessChanges()
{
    // prevent processing multiple events at the same time
    lock (lockObject) {
        // read lines until the EOF
        while (!Reader.EndOfStream)
        {
            // save current position, in case we stumble across an unfinished row write
            CurrentPosition = Reader.BaseStream.Position;

            StringWriter writer = new StringWriter();
            // read until finds end of line
            while (!Reader.EndOfStream && (Reader.Peek() != '\n' && Reader.Peek() != '\r'))
            {
                writer.Write((char)Reader.Read());
            }
   
            // if we got to EOF while reading, return back to the beginning of line and we'll wait for the next event that hopefully will have finished it.
            if (Reader.EndOfStream)
            {
                // return to beginning, no end of line found; assuming that line is not fully written.
                Reader.BaseStream.Position = CurrentPosition;
                break;
            }
            else
            {
                // skip the line ending
                while (Reader.Peek() == '\n' || Reader.Peek() == '\r') Reader.Read();

                // return row
                yield return writer.ToString();
            }
        }
    }
}

After all the eventing and processing is done, we can simply subscribe to the observable and wait things to happend:

eventObservable.Subscribe(
  rows =>
  {
    // print new rows to console
    foreach (var row in rows)
    {
      System.Console.WriteLine(row);
    }
  },
  // print exceptions
  e => System.Console.WriteLine(e));

System.Console.ReadKey();

-==- Source -==-

The full source can be found here. [http://www.tonikielo.com/files/rx_tail.cs]

0 comments:

Post a Comment