where RunAsync essentially encapsulates:
await InitializeAsync(cancellationToken); await StartAsync(cancellationToken); await ReadDataInLoop(buffer, cancellationToken); await StopAsync(cancellationToken); await RemoveTraceAsync(cancellationToken);
We shift the cancellation responsibility to the CancellationToken, allowing us to cancel from any number of sources. The question then becomes, “What is the buffer?” Initially, we experimented with using a delegate in the form of an interface contract, called ITraceRowProcessor. We could then implement that multiple ways, including a BufferedTraceRowProcessor, which for the implementation of ITraceRowProcessor.Process(TraceRow traceRow) just added the trace row to a ConcurrentQueue or BlockingCollection<T>. Something still felt off though.
It turned out, however, that there was an even better way, using Channel<T>. Channel<T> has been covered many times by bloggers more capable than I, so I will just link to some of those posts:
- An Introduction to System.Threading.Channels by Steve Gordon
- An Introduction to System.Threading.Channels by Stephen Toub
By passing in ChannelWriter<TraceRow> as a buffer, a few things happen:
- We can asynchronously write to channels, which means as soon as we are done awaiting data from the trace source, and we get a trace row, we can then await the write to the channel. This is exactly what we want so that we stay on the CPU for as little time as possible since we are dealing with task scheduler threads.
- The channels have built-in buffering behaviors, like max size, and what to do when the max size is reached but expose a common class for both bounded and unbounded channels, allowing for flexibility.
- They are massively optimized for speed, better than anything before them.
- They support multiple writers, so the same channel could serve more than one TraceServer engine for the shared buffer scenario.
- They support multiple readers, so we don’t need to worry about IO stalls or processing backup. We could just add another reader from the same channel.
- They support both wait and peek style operations, allowing you to know if there are any more items and, if so, add them to a batch. This ensures we can do batch writing and just-in-time writing from the same consumer.
- They support async/await from the reader side, ensuring that we don’t need a dedicated thread just waiting for data that may not arrive.
Putting It All Together
Once we migrated to Channel<T>, everything came together. We also had a corresponding discovery of a way to do asynchronous XEvents reading, which meant that we could use the async TraceServer with both legacy SQL Server Trace APIs as well as newer XEvents APIs. The trace engine is fast, scalable, uses few threads, is fully async all the way down, supports cancellation, and has extensible buffer behavior support. Expect the new engine in a later version of our SentryOne monitoring software. It’s also worth noting that the techniques applied above do not just apply to trace, but also serve as an illustration for rapid data processing in general, and how the tools have evolved to support even better use cases with fewer lines of code.