Reactive Extensions: Elapsed Time

30 Jan 2011 15:05

I’m using SqlBulkCopy to insert a large number of rows into a database table. I used Observable.FromEvent to hook the SqlRowsCopied event. Rx provides the .TimeStamp() method, but I wanted to know the elapsed time, so I did the following:

var bulkCopy = new SqlBulkCopy(connectionString)
{
    DestinationTableName = tableName,
    NotifyAfter = 5000,
    BatchSize = 5000
};

var rowsCopied =
    Observable.FromEvent<SqlRowsCopiedEventHandler, SqlRowsCopiedEventArgs>(
        h => h.Invoke,
        h => bulkCopy.SqlRowsCopied += h,
        h => bulkCopy.SqlRowsCopied -= h);
rowsCopied
    .WithElapsedTime()
    .Sample(TimeSpan.FromSeconds(1))
    .Subscribe(x =>
         Console.WriteLine("{0:N0} rows copied in {1}", x.Value.EventArgs.RowsCopied, x.Elapsed));

bulkCopy.WriteToServer(dataReader);

Obviously, I had to write the WithElapsedTime extension. It was really easy:

static class ElapsedTimeObservable
{
    public static IObservable<ElapsedTime>
        WithElapsedTime(this IObservable source)
    {
        var timer = Stopwatch.StartNew();
        return source.Select(x => new ElapsedTime(x, timer.Elapsed));
    }
}

internal class ElapsedTime
{
    public TSource Value { get; private set; }
    public TimeSpan Elapsed { get; private set; }

    public ElapsedTime(TSource value, TimeSpan elapsed)
    {
        Value = value;
        Elapsed = elapsed;
    }
}</pre>

...and that’s it.