One of the things I enjoy most about computer programming is multi-threading. It’s hard but very rewarding when you find an improvement that makes a tangible difference to a system. It’s also interesting how sometimes it’s actually desirable to allow some redundant processing in order to reduce contention an thus improve performance overall. I have recently written a class that processes a message asynchronously. The requirement is that only the most recently received message should be processed and that messages are not processed concurrently.
T Event
0 Message A received.
1 Process begin execute asynchronously on message A.
2 Message B received.
3 Message C received
4 Process end execute asynchronously on message A.
5 Process begin execute asynchronously on message C.
… and so on
It’s a trivial problem, but I enjoyed trying different implementations that produced a wide range of results in terms of speed, thread use and memory footprint. As is more often than not the case there is a balancing act between various concerns. I want the least amount of work done on the receive message call so that there is as little delay to the client application as possible. I want the least amount of latency between the receive and process tasks which discounts any type of polling. Finally, as is standard good practice, I want to block the least amount of threads for the least possible time. I settled on the following:
using System;
using System.Threading;
namespace MPM.Threading
{
public class MostRecentMessageProcessor<TMessage> : IMessageProcessor<TMessage> where TMessage : class
{
private delegate void ProcessMessageDelegate();
private readonly Action<TMessage> _action;
private readonly ProcessMessageDelegate _processMessageDelegate;
private readonly object _lock = new object();
private TMessage _last;
private TMessage _current;
private int _workItemsQueued;
public MostRecentMessageProcessor(Action<TMessage> action)
{
_action = action;
_processMessageDelegate = ProcessMessage;
}
public void Process(TMessage message)
{
_current = message;
if (Interlocked.CompareExchange(ref _workItemsQueued, 1, 0)==0)
_processMessageDelegate.BeginInvoke(null, null);
}
private void ProcessMessage()
{
lock (_lock)
{
Interlocked.Decrement(ref _workItemsQueued);
if (_current == _last) return;
_action(_last = _current);
}
}
}
}
I’m not for a moment supposing that this is the best possible solution. I’m wise enough to know that I am not wise enough to know that. This is the best solution I found in the time I spent looking at the problem. I’d be very interested to know of any optimizations or entirely different approaches that might be an improvement over this.
I have created and packaged a library that includes this code and will grow over time as I produce more solutions in this space.
https://github.com/myles-mcdonnell/MPM.Threading
http://nuget.org/packages/MPM.Threading
