A .NET job runner built for customization.
This is probably the most flexible and easy to understand job runner out there. Implement an IJobStorageRecord and an IJobStorageProvider, then the library’s hosted worker does the rest. Out of the box you get a WYSIWYG background queue: enqueue work, the worker runs it, that’s it.
Every section below shows an optional way to extend that base behavior. Because you own the record and the provider, you can bolt on batching, continuations, cron schedules, richer retries, and telemetry whenever you need them—and ignore what you don’t.
Boot the stock worker against your schema.
Start exactly where the README starts. Install the NuGet package, implement the bare-minimum storage record, wire up the in-box worker with AddGusto, and configure BatchSize, Concurrency, and PollInterval in appsettings.json. No custom fields, no extra logic—this is the stock loop. Once registered, inject JobQueue<JobRecord> and enqueue work directly.
dotnet add package ByteBard.GUSTO
public class JobRecord : IJobStorageRecord
{
public Guid TrackingId { get; set; }
public DateTime CreatedOn { get; set; }
public DateTime? ExecuteAfter { get; set; }
public DateTime? ExpireOn { get; set; }
public bool IsComplete { get; set; }
public string JobType { get; set; }
public string MethodName { get; set; }
public string ArgumentsJson { get; set; }
}
// Program.cs
builder.Services.AddDbContext<ExampleDbContext>(options =>
options.UseNpgsql(connectionString));
builder.Services.AddScoped<IJobStorageProvider<JobRecord>, ExampleJobStorageProvider>();
builder.Services.AddGusto<JobRecord, ExampleJobStorageProvider>(
builder.Configuration.GetSection("Gusto"),
ServiceLifetime.Scoped);
public class UserController : ControllerBase
{
private readonly JobQueue<JobRecord> _jobQueue;
public UserController(JobQueue<JobRecord> jobQueue)
{
_jobQueue = jobQueue;
}
[HttpPost("register")]
public async Task Register(RegisterRequest request)
{
await _jobQueue.EnqueueAsync<EmailService>(
service => service.SendWelcomeEmailAsync(request.Email, request.UserName));
return Ok();
}
}
The rest of this page builds from there: once you’ve seen the default loop, you can start extending the storage record and provider to unlock batching, continuations, cron, and retry behaviors.
Full EF Core provider (Storage + Scheduling brain)
The worker the package ships is intentionally thin; all orchestration happens here. The example below mirrors a production deployment that keeps everything in PostgreSQL via EF Core: the baseline only filters for unfinished records, saves completions, and nudges failures forward. Continuations, cron scheduling, and custom retry policies are layered on later as optional extensions.
public class ExampleJobStorageProvider : IJobStorageProvider<JobRecord>
{
private readonly ExampleDbContext _context;
public ExampleJobStorageProvider(ExampleDbContext context) => _context = context;
public async Task StoreJobAsync(JobRecord jobStorageRecord, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(jobStorageRecord);
try
{
_context.JobRecords.Add(jobStorageRecord);
await _context.SaveChangesAsync(cancellationToken);
}
catch (DbUpdateException ex) when (ex.InnerException?.Message?.Contains("duplicate key") == true)
{
throw new InvalidOperationException($"Job with TrackingId {jobStorageRecord.TrackingId} already exists.", ex);
}
}
public async Task<IEnumerable<JobRecord>> GetBatchAsync(
JobSearchParams<JobRecord> parameters,
CancellationToken cancellationToken)
{
// parameters.Match already contains the worker's baseline predicate (see FAQ)
return await _context.JobRecords
.Where(parameters.Match.Compile())
.Take(parameters.Limit)
.ToListAsync(cancellationToken);
}
public async Task MarkJobAsCompleteAsync(JobRecord jobStorageRecord, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(jobStorageRecord);
var job = await _context.JobRecords.FirstOrDefaultAsync(j => j.TrackingId == jobStorageRecord.TrackingId, cancellationToken);
if (job == null) return;
job.IsComplete = true;
job.ExpireOn ??= DateTime.UtcNow.AddDays(30);
await _context.SaveChangesAsync(cancellationToken);
}
public async Task CancelJobAsync(Guid trackingId, CancellationToken cancellationToken) =>
await _context.JobRecords
.Where(j => j.TrackingId == trackingId && !j.IsComplete)
.ExecuteDeleteAsync(cancellationToken);
public async Task OnHandlerExecutionFailureAsync(
JobRecord jobStorageRecord,
Exception exception,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(jobStorageRecord);
ArgumentNullException.ThrowIfNull(exception);
var job = await _context.JobRecords.FirstOrDefaultAsync(j => j.TrackingId == jobStorageRecord.TrackingId, cancellationToken);
if (job == null) return;
job.ExecuteAfter = DateTime.UtcNow.AddMinutes(5);
await _context.SaveChangesAsync(cancellationToken);
}
}
Every other scenario on this page extends or calls back into this provider. The built-in worker stays the same; you simply change what it reads and writes.
Fan out work, gate downstream steps.
Optional extension: add a shared BatchId so related jobs run in parallel but unlock dependents together.
A batch is just a Guid written onto each job record. Workers process batch members independently, but continuations won’t fire until CanProcessContinuations reports every row finished (or whatever rule you encode). Here’s the exact helper and provider logic lifted from the reference implementation.
// JobRecord additions for batching
public Guid? BatchId { get; set; }
// Helper extension that fans out expressions and stores them with a shared BatchId
public static async Task<Guid> BatchAsync(
this JobQueue<JobRecord> queue,
params Expression<Func<Task>>[] methodCalls)
{
var batchId = Guid.NewGuid();
foreach (var methodCall in methodCalls)
{
var record = queue.ConstructRecordFromExpression(methodCall, null);
record.BatchId = batchId;
await queue.StorageProvider.StoreJobAsync(record, CancellationToken.None);
}
return batchId;
}
var batchId = await jobQueue.BatchAsync(
() => _onboarding.CreateDirectoryEntryAsync(customerId),
() => _billing.ProvisionPlanAsync(customerId),
() => _messaging.SendWelcomeAsync(customerId));
No provider changes are required beyond what you built in Section 01—the baseline GetBatchAsync already respects IsComplete, ExecuteAfter, and Limit. Add BatchId, enqueue related jobs together, and the default worker handles them independently.
Parent/child orchestration managed in the DB.
Optional extension: park follow-up work in WaitingForParent until a parent job (or entire batch) finishes.
Continuations are “waiting” jobs tied to either a specific parent (or an entire batch). The helper below writes the row and parks it in JobStatus.WaitingForParent. The provider’s ProcessContinuations and CanProcessContinuations methods determine when to flip those rows back to Ready.
Introduce a simple JobStatus enum (plus three new fields) so the provider can reason about state transitions:
public enum JobStatus
{
Ready,
Processing,
WaitingForParent,
Completed,
Failed
}
// JobRecord additions for continuations
public Guid? ParentJobId { get; set; }
public bool RunEvenIfParentsFail { get; set; }
public JobStatus Status { get; set; } = JobStatus.Ready;
// Helper extension that serializes a continuation and parks it in WaitingForParent
public static async Task<Guid> ContinueWithAsync(
this JobQueue<JobRecord> queue,
Guid parentJobId,
Expression<Func<Task>> methodCall,
CancellationToken cancellationToken = default,
bool runEvenIfParentsFail = false)
{
var record = queue.ConstructRecordFromExpression(methodCall, null);
record.Status = JobStatus.WaitingForParent;
record.ParentJobId = parentJobId;
record.RunEvenIfParentsFail = runEvenIfParentsFail;
record.LastUpdatedAt = DateTime.UtcNow;
await queue.StorageProvider.StoreJobAsync(record, cancellationToken);
return record.TrackingId;
}
Because these jobs now carry an explicit status, tweak GetBatchAsync to only return rows in the Ready state:
// Inside ExampleJobStorageProvider.GetBatchAsync
var now = DateTime.UtcNow;
var query = _context.JobRecords
.Where(j => j.Status == JobStatus.Ready)
.Where(parameters.Match.Compile())
.Take(parameters.Limit);
// ExampleJobStorageProvider additions inside MarkJobAsCompleteAsync
// Call this right after you mark a job complete and before returning.
await ProcessContinuations(jobStorageRecord, cancellationToken);
private async Task ProcessContinuations(JobRecord jobRecord, CancellationToken cancellationToken)
{
// If you kept BatchId from Section 02, this lets continuations wait on entire batches.
var parentId = jobRecord.BatchId ?? jobRecord.TrackingId;
if (!await CanProcessContinuations(jobRecord, cancellationToken))
{
return;
}
await _context.JobRecords
.Where(j => j.ParentJobId == parentId && j.Status == JobStatus.WaitingForParent)
.ExecuteUpdateAsync(j => j
.SetProperty(x => x.Status, JobStatus.Ready)
.SetProperty(x => x.ExecuteAfter, DateTime.UtcNow)
.SetProperty(x => x.LastUpdatedAt, DateTime.UtcNow),
cancellationToken);
}
private async Task<bool> CanProcessContinuations(JobRecord jobRecord, CancellationToken cancellationToken)
{
var parentId = jobRecord.BatchId ?? jobRecord.TrackingId;
var hasContinuationThatRunsOnFailure = await _context.JobRecords
.AnyAsync(j => j.ParentJobId == parentId && j.RunEvenIfParentsFail, cancellationToken);
if (hasContinuationThatRunsOnFailure)
{
if (!jobRecord.BatchId.HasValue)
{
return jobRecord.IsComplete;
}
return await _context.JobRecords
.Where(j => j.BatchId == jobRecord.BatchId)
.AllAsync(j => j.IsComplete, cancellationToken);
}
if (jobRecord.Status == JobStatus.Failed)
{
return false;
}
if (!jobRecord.BatchId.HasValue)
{
return jobRecord.IsComplete;
}
return await _context.JobRecords
.Where(j => j.BatchId == jobRecord.BatchId)
.AllAsync(j => j.IsComplete && j.Status != JobStatus.Failed, cancellationToken);
}
var parentId = await jobQueue.EnqueueAsync<IImageJob>(
job => job.RenderAsync(assetId));
await jobQueue.ContinueWithAsync(parentId,
() => _cdn.PublishAsync(assetId),
runEvenIfParentsFail: false);
Because everything is stored server-side, you can troubleshoot or replay flows with SQL alone: inspect batch rows, update statuses, cancel or restart parents.
Recurring jobs without another service.
Optional extension: add cron metadata so the provider reschedules recurring work without external schedulers.
By combining Cronos with provider-level upserts, you can schedule resilient recurring work. Add two things: (1) cron metadata on the record, and (2) provider methods that reschedule rows instead of marking them complete. The worker still just calls into your provider.
Like the continuation section, this drop-in expects the JobStatus field/enum to exist.
// JobRecord additions
public string CronExpression { get; set; }
public string RecurringJobId { get; set; }
// ExampleJobStorageProvider additions
public async Task<JobRecord?> GetRecurringJobByIdAsync(
string recurringJobId,
CancellationToken cancellationToken) =>
_context.JobRecords
.Where(j => j.RecurringJobId == recurringJobId && !j.IsComplete)
.FirstOrDefaultAsync(cancellationToken);
// Drop this inside MarkJobAsCompleteAsync before you mark the job finished:
if (!string.IsNullOrWhiteSpace(job.CronExpression))
{
var cron = CronExpression.Parse(job.CronExpression, CronFormat.IncludeSeconds);
var next = cron.GetNextOccurrence(DateTime.UtcNow, TimeZoneInfo.Utc);
if (next.HasValue)
{
job.ExecuteAfter = next.Value;
job.Status = JobStatus.Ready;
job.IsComplete = false;
job.LastUpdatedAt = DateTime.UtcNow;
await _context.SaveChangesAsync(cancellationToken);
return;
}
}
public static async Task<Guid> ScheduleRecurringAsync(
this JobQueue<JobRecord> queue,
string recurringJobId,
Expression<Func<Task>> methodCall,
string cronExpression,
CancellationToken cancellationToken = default)
{
var cronFormat = CronFormat.IncludeSeconds;
var parsedCron = CronExpression.Parse(cronExpression, cronFormat);
var nextOccurrence = parsedCron.GetNextOccurrence(DateTime.UtcNow, TimeZoneInfo.Utc)
?? throw new InvalidOperationException($"Cron expression '{cronExpression}' will never execute.");
if (queue.StorageProvider is ExampleJobStorageProvider provider)
{
// Allows idempotent scheduling at startup: cancel the previous job with the same recurring ID.
var existing = await provider.GetRecurringJobByIdAsync(recurringJobId, cancellationToken);
if (existing != null)
{
await queue.StorageProvider.CancelJobAsync(existing.TrackingId, cancellationToken);
}
}
var record = queue.ConstructRecordFromExpression(methodCall, null);
record.RecurringJobId = recurringJobId;
record.CronExpression = cronExpression;
record.ExecuteAfter = nextOccurrence.Value;
record.Status = JobStatus.Ready;
record.LastUpdatedAt = DateTime.UtcNow;
await queue.StorageProvider.StoreJobAsync(record, cancellationToken);
return record.TrackingId;
}
await jobQueue.ScheduleRecurringAsync(
recurringJobId: "nightly-ledger",
methodCall: () => _ledger.CloseDayAsync(),
cronExpression: "0 0 2 * * *");
After adding those snippets, the provider will re-queue the same row for the next cron occurrence instead of marking it complete. That gives you pause/resume/inspect controls with nothing more than SQL updates.
Exponential backoff handled entirely in SQL.
Optional extension: track failure metadata and control backoff directly in your provider.
The README calls out “customizable strategies.” This is where you plug them in. The worker invokes OnHandlerExecutionFailureAsync when any handler throws, and you decide how to recover. This sample uses exponential backoff (1s, 2s, 4s …), caps at ten attempts, then marks the job failed so continuation logic can react.
Because this strategy updates job.Status, make sure the JobStatus enum/field from Section 03 is present.
// JobRecord additions for advanced retry tracking
public int FailureCount { get; set; }
public string LastFailureReason { get; set; }
public DateTime? LastFailureTime { get; set; }
Swap the simple retry body from section 01 with a call to the helper below:
public async Task OnHandlerExecutionFailureAsync(
JobRecord jobStorageRecord,
Exception exception,
CancellationToken cancellationToken)
{
await RetryOrFailAsync(jobStorageRecord, exception, cancellationToken);
}
private async Task RetryOrFailAsync(
JobRecord jobStorageRecord,
Exception exception,
CancellationToken cancellationToken)
{
const int MaxRetries = 10;
ArgumentNullException.ThrowIfNull(jobStorageRecord);
ArgumentNullException.ThrowIfNull(exception);
var job = await _context.JobRecords
.FirstOrDefaultAsync(j => j.TrackingId == jobStorageRecord.TrackingId, cancellationToken);
if (job == null) return;
var newFailureCount = job.FailureCount + 1;
if (newFailureCount >= MaxRetries)
{
job.FailureCount = newFailureCount;
job.LastFailureReason = exception.Message;
job.Status = JobStatus.Failed;
job.LastFailureTime = DateTime.UtcNow;
job.LastUpdatedAt = DateTime.UtcNow;
await _context.SaveChangesAsync(cancellationToken);
await MarkJobAsCompleteAsync(jobStorageRecord, cancellationToken);
return;
}
var delaySeconds = Math.Pow(2, newFailureCount - 1);
job.FailureCount = newFailureCount;
job.LastFailureReason = exception.Message;
job.LastFailureTime = DateTime.UtcNow;
job.ExecuteAfter = DateTime.UtcNow.AddSeconds(delaySeconds);
job.Status = JobStatus.Ready;
job.LastUpdatedAt = DateTime.UtcNow;
await _context.SaveChangesAsync(cancellationToken);
}
Want jitter, custom limits, or dead-lettering? Branch on JobType or FailureCount inside this method and store whatever telemetry you need—the worker remains oblivious.
Telemetry out of the box.
Optional extension: plug the built-in ActivitySource and Meter into your existing OpenTelemetry pipeline.
GUSTO ships an OpenTelemetry ActivitySource and Meter named ByteBard.GUSTO.JobQueue. Hook them into your existing pipeline and the worker immediately emits spans and counters for every batch and job your provider executes.
// Program.cs
builder.Services.AddOpenTelemetry()
.WithTracing(tracing => tracing
.AddSource(GustoTelemetry.ActivitySourceName)
.AddAspNetCoreInstrumentation()
.AddOtlpExporter())
.WithMetrics(metrics => metrics
.AddMeter(GustoTelemetry.MeterName)
.AddAspNetCoreInstrumentation()
.AddOtlpExporter());
Metrics published by the worker:
| Metric | Type | Description & Tags |
|---|---|---|
| gusto.jobs.processed | Counter | Total successful jobs (job.type, job.method) |
| gusto.jobs.failed | Counter | Total failed jobs (job.type, job.method, exception.type) |
| gusto.job.duration | Histogram | Job execution time in ms (job.type, job.method, job.status) |
| gusto.batch.duration | Histogram | Batch processing duration |
| gusto.batch.size | Histogram | Number of jobs processed per batch |
Tracing emits two spans: ProcessBatch (one per polling cycle) and ExecuteJob (one per job). Use them to correlate queue throughput with handler performance, and extend the meter with your own counters if you need additional signals.
Deterministic integration tests.
Optional extension: use the built-in test barriers to run the worker in-process without sleeps.
The library ships with test barriers built in, so you can run the worker in-process without flakiness. Set BatchStartBarrier to pause the worker right before it drains a batch, and BatchCompletedBarrier to know when the batch is done. This gives you deterministic integration tests without polling.
[Fact]
public async Task JobQueue_ProcessesJobSuccessfully()
{
var services = new ServiceCollection();
services.AddGusto(configuration);
services.AddScoped();
var provider = services.BuildServiceProvider();
var jobQueue = provider.GetRequiredService<JobQueue>();
var hostedService = provider.GetRequiredService<IHostedService>();
JobQueueWorker.BatchStartBarrier = new TaskCompletionSource();
JobQueueWorker.BatchCompletedBarrier = new TaskCompletionSource();
await jobQueue.EnqueueAsync(svc => svc.DoWork("test"));
await hostedService.StartAsync(CancellationToken.None);
JobQueueWorker.BatchStartBarrier.SetResult();
await JobQueueWorker.BatchCompletedBarrier.Task;
await hostedService.StopAsync(CancellationToken.None);
Assert.True(MyService.WorkCompleted);
}
Keep barriers confined to tests—they’re static, so always reset them (e.g., set them back to null) when your test finishes. This pattern makes GUSTO easy to exercise in CI without resorting to sleeps or polling loops.
FAQ & comparison.
What does GUSTO stand for?
GUSTO is the Generic Utility for Scheduling & Task Orchestration. The name reflects the library’s goal: ship a worker, let you own everything else.
Why build another job runner?
Existing runners hide orchestration behind filters, triggers, or custom DSLs. GUSTO keeps the built-in worker simple and hands storage orchestration back to your C# code—no hidden magic, no framework migrations when requirements change.
How does it compare?
| Project | Extension model | Type safety | Operational feel | Best when… |
|---|---|---|---|---|
| GUSTO | Implement two interfaces; add helpers for batching/continuations | Expression enqueue keeps jobs strongly typed | You control storage, retries, logging | You want to extend behavior with plain C# and own the data model |
| Hangfire | Dashboard filters & attributes | Strongly-typed expression enqueue (similar to GUSTO) | Easy start, complex filters get dense | You want a hosted dashboard and shared storage |
| Quartz.NET | Triggers, calendars, listeners | IoC friendly but verbose scheduling API | Powerful—also heavyweight | You need enterprise-grade calendars and cluster scheduling |
| TickerQ | Pipeline-based timers | Strings for dispatch | Minimal overhead, limited orchestration | You need lightweight distributed timers without custom storage |
What is expected from a GetBatchAsync implementation?
The worker calls your GetBatchAsync every poll and passes a JobSearchParams with a Match predicate that enforces not complete, ready to run, and not expired. You can apply additional filters or ordering (tenancy, priority, status flags) before returning the records, but make sure to honor the provided predicate and the Limit. Returning fewer items than requested is fine; returning jobs that fail the predicate will cause the worker to skip them and log warnings.