Skip to content

adamfoneil/AsyncWorkflow

Repository files navigation

This is a web queue worker library inspired by a Derek Comartin video Web-Queue-Worker Architecture Style for Scaling. I've done some work with background services BackgroundService.Extensions, but hadn't done anything with multiple, coordinated workers. I've played a little bit recently with Coravel, and it has a nice queueing feature. But as usual, I wanted to try thinking through the problem myself just to get my head around it. Derek mentions a project Wolverine by name, which I have not looked at.

At the heart of this are some interfaces:

  • IQueue defines fundamental operations on a persistent queue of some kind
  • IStatusRepository defines a standard way of tracking success, failure, or any kind of process outcome for all workers
  • ITrackedPayload is something you'd implement on a payload model that you need to identify uniquely

The actual BackgroundService implementation is WorkflowBackgroundService. A few key points:

  • The first thing to look at is the ProcessNextMessageAsync call. This is where the worker is doing its main work within the BackgroundService.ExecuteAsync loop, which is running continually until canceled.
  • The outcome of that work recorded with Status.SetAsync. This captures the duration, status result, and name of the worker (handler) that did the work.
  • When work is done, the worker has a chance to define what happens next with the OnCompletedAsync method. This is gives us a chance to coordinate sequential and parallel work.
  • The dequeing of messages and parsing json payload data happens in ProcessNextMessageAsync. We'll see the implementation of this shortly. Note that I use the current MachineName as an argument. This is the best way I know to ensure that concurrent workers don't do redundant work. By filtering queue reads by machine name, you can have as many different worker machines as you want, and they won't step on each other. The dequeue implementation also really matters here as well. When querying queue messages from a database table (for example), there are some special considerations to prevent duplicate query results that we'll see shortly. The HandlerName is really just the class name -- used as a filter to target queue messages to the appropriate handler.
  • The ProcessMessageAsync call is the abstract method where you supply the implementation of the work you're doing.
  • There's an extension method EnqueuePayloadAsync that performs a json serialization as well as passes the current machine name automatically. This is used throughout the project instead of the interface EnqueueAsync method.

Dapper + SQL Server implementation

You have many options for a queue backing store and data access approach. Dapper is a great choice for working with inline SQL in a type-safe way while having full control of the SQL, so I went with that. I used SQL Server as the backing database because it's easy to get started with.

  • This is the Queue implementation.
  • I have a couple low-level extension methods relevant here, in particular DequeueAsync, which has the special sauce for preventing duplicate reads in concurrent environments -- the WITH (ROWLOCK, READPAST) option.
  • Here is the StatusRepository implementation.
  • The backing tables needed by the implementation are defined in DbObjects, which works hand-in-hand with DbTable. You may be tempted to use EF migrations for creating these objects, but I didn't think that would work here.
  • There are a couple convenience methods to help with startup code: ServiceExtensions. This is how we ensure the backing tables get created at startup as well as adding necessary services to the DI container.

Sample API

A very simple implementation is in the SampleAPI project.

  • See Program.cs to see how an API project would be configured
  • See appsettings.json configuration
  • I added several dummy workers. All they do is delay a random number of seconds
  • You can test this by clicking "Debug" on the /process endpoint post in the .http file. This is simulating a file upload (although there's no actual content being uploaded -- it's just a file name being provided).
  • To make it so Step2 runs when the 3 Step1 processes complete, notice that I override the OnCompleteAsync method in the Step1* workers, example. This is calling Step2.StartWhenReady, which is checking to see if all the Step1 processes are in a "Completed" status before it initiates Step2.

Testing

To test the inline SQL without needing to run actual workers, I did this here in DapperWorkflow. This was to ensure that the tables created okay and could be queried without error.

About

A demo web queue worker pattern implementation

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages