I’m currently developing a Windows service application that receives data from multiple sources. It isn’t a highly concurrent application, but the incoming messages are come fairly fast after one another.
Incoming messages enter the system via WCF and that part is multithreaded. But these messages must be handled sequentially. What’s more, certain pieces of our application must run at regular intervals.
We can experience problems if component A is running and changing objects in memory, and component B is triggered and starts using these same objects.
That’s why we introduced a simple command queue:
public interface ICommandQueue : IEnumerable<Action> { void Add(Action action); void CompleteAdding(); }
The implementation uses a BlockingCollection:
public class ThreadSafeCommandQueue : ICommandQueue { private readonly BlockingCollection<Action> _actions = new BlockingCollection<Action>(); public void Add(Action action) { _actions.Add(action); } public IEnumerator<Action> GetEnumerator() { return _actions.GetConsumingEnumerable().GetEnumerator(); } IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } public void CompleteAdding() { _actions.CompleteAdding(); } }
This means another class can loop over it and execute every action that is added. If the queue is empty, it will wait until a new item is added. You can read more on BlockingCollections for the ins and outs.
This is our implementation:
Task.Factory.StartNew(() => { foreach (var action in _queue) { action(); } }, TaskCreationOptions.LongRunning);
So, what happens is we add stuff that needs to be executed safely to this command queue:
_commandQueue.Add(() => { _something.Do(_input); });
As you can see, you have access to private fields, as you’re just passing in the entire Action to be performed.
Now the cool thing is this is entirely unit-testable. Because we’re using a DI-container (Autofac in our case), we can inject a different implementation when we’re testing. Because we don’t want our Assert methods to start asserting if the Action hasn’t been executed yet, we can inject the following when we’re testing:
public class ImmediateCommandQueue : ICommandQueue { public IEnumerator<action> GetEnumerator() { throw new NotImplementedException(); } IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } public void Add(Action action) { action(); } public void CompleteAdding() { throw new NotImplementedException(); } }
When an Action is added, it is immediately executed. So when we’ve done the ‘Act’-part of our tests (see AAA), we can be sure there aren’t any Actions still waiting to be executed.
But what this also allows us to do is control when the Actions are executed. Take this method:
public void Process() { _a = new List<int> { 1, 2, 3 }; _commandQueue.Add(() => { _innerProcessor.Process(_list); }); _list.Clear(); }
This is of course a silly method, but bear with me. The point is that, at runtime, the innerProcessor might receive en empty list because we’re clearing it immediately after we added the Action. But in our tests, this isn’t very clear because we’re immediately executing the Action.
So we can introduce yet another implementation of ICommandQueue, purely for testing purposes:
public class ManuallyTriggeredCommandQueue : ICommandQueue { private readonly IList<Action> _actions = new List<Action>(); public IEnumerator<Action> GetEnumerator() { throw new NotImplementedException(); } IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } public void Add(Action action) { _actions.Add(action); } public void CompleteAdding() { } public void ExecuteNextAction() { var action = _actions[0]; _actions.RemoveAt(0); action(); } }
Now, in our tests, we can call the Process method first, and only then execute the Action by calling ExecuteNextAction. This way we can get a failing test (because _innerProcessor.Process was called with an empty list instead of a list with 3 integers). And after a failing test, we can turn it into a green test:
public void Process() { _a = new List<int> { 1, 2, 3 }; _commandQueue.Add(() => { _innerProcessor.Process(_list); _list.Clear(); }); }
I’m very happy with this solution. It’s geared towards our domain, as we’re not going to flood our queue because we won’t be getting thousands of messages every millisecond. But it allows us to avoid threads getting in each others way, and our code remains testable.