Reactive Extensions: Elapsed Time
I’m using SqlBulkCopy to insert a large number of rows into a database table. I used Observable.FromEvent to hook the SqlRowsCopied event. Rx provides the .TimeStamp() method, but I wanted to know the elapsed time, so I did the following:
var bulkCopy = new SqlBulkCopy(connectionString) { DestinationTableName = tableName, NotifyAfter = 5000, BatchSize = 5000 }; var rowsCopied = Observable.FromEvent<SqlRowsCopiedEventHandler, SqlRowsCopiedEventArgs>( h => h.Invoke, h => bulkCopy.SqlRowsCopied += h, h => bulkCopy.SqlRowsCopied -= h); rowsCopied .WithElapsedTime() .Sample(TimeSpan.FromSeconds(1)) .Subscribe(x => Console.WriteLine("{0:N0} rows copied in {1}", x.Value.EventArgs.RowsCopied, x.Elapsed)); bulkCopy.WriteToServer(dataReader);
Obviously, I had to write the WithElapsedTime extension. It was really easy:
static class ElapsedTimeObservable { public static IObservable<ElapsedTime> WithElapsedTime (this IObservable source) { var timer = Stopwatch.StartNew(); return source.Select(x => new ElapsedTime (x, timer.Elapsed)); } } internal class ElapsedTime { public TSource Value { get; private set; } public TimeSpan Elapsed { get; private set; } public ElapsedTime(TSource value, TimeSpan elapsed) { Value = value; Elapsed = elapsed; } }</pre> ...and that’s it.