Skip to main content

Worker

The worker runtime polls for jobs, validates payloads, runs handlers, and manages heartbeats and concurrency.

Constructor

import { Worker } from "@osqueue/worker";

const worker = new Worker(options);

Options

interface WorkerOptions<R extends JobTypeRegistry> {
client: OsqueueClient<R>;
workerId?: string;
handlers: JobHandlerMap<R>;
pollIntervalMs?: number; // default: 1000
heartbeatIntervalMs?: number; // default: 5000
concurrency?: number; // default: 1
onJobClaimed?: (job: ClaimedJobInfo) => void;
onJobCompleted?: (job: ClaimedJobInfo) => void;
onJobFailed?: (job: ClaimedJobInfo, error: unknown) => void;
}
OptionDefaultDescription
client— (required)OsqueueClient instance
workerIdcrypto.randomUUID()Unique worker identifier
handlers— (required)Map of job type → handler function
pollIntervalMs1000How often to poll for new jobs
heartbeatIntervalMs5000How often to send heartbeats for active jobs
concurrency1Maximum concurrent jobs
onJobClaimedCallback when a job is claimed
onJobCompletedCallback when a job completes successfully
onJobFailedCallback when a job handler throws

Handler Signature

type TypedJobHandler<P> = (payload: P, signal: AbortSignal) => Promise<void>;

Handlers receive the validated payload and an AbortSignal:

const worker = new Worker({
client,
handlers: {
"email:send": async (payload, signal) => {
// payload is typed based on the registry
// signal is aborted when worker.stop() is called
if (signal.aborted) return;

await sendEmail(payload.to, payload.subject, payload.body);
},
},
});

The AbortSignal is triggered when worker.stop() is called, allowing handlers to clean up gracefully.

Lifecycle Callbacks

const worker = new Worker({
client,
handlers: { /* ... */ },
onJobClaimed: (job) => {
console.log(`Claimed ${job.type} job ${job.jobId}`);
},
onJobCompleted: (job) => {
console.log(`Completed ${job.type} job ${job.jobId}`);
},
onJobFailed: (job, error) => {
console.error(`Failed ${job.type} job ${job.jobId}:`, error);
},
});

ClaimedJobInfo

interface ClaimedJobInfo {
jobId: JobId;
type: string;
payload: unknown;
}

Methods

start()

Start polling for jobs.

worker.start();

stop()

Stop the worker gracefully. Stops polling, aborts active job signals, and waits for active jobs to finish.

await worker.stop();

isRunning

Whether the worker is currently running.

worker.isRunning; // boolean

activeJobCount

Number of jobs currently being processed.

worker.activeJobCount; // number

Execution Flow

  1. Poll: On each interval, check if activeJobs < concurrency
  2. Claim: Call claimJob(workerId, handledTypes) on the broker
  3. Validate: Parse the payload against the registry schema (if available). If validation fails, the job is completed (removed) to prevent invalid payloads from retrying indefinitely
  4. Execute: Run the handler with the validated payload and an AbortSignal
  5. Heartbeat: Send heartbeats every heartbeatIntervalMs during execution
  6. Complete: On success, call completeJob() to remove the job
  7. Error: On failure, log the error and invoke onJobFailed (the job's heartbeat stops, and the broker will eventually expire it for retry)
  8. Reconnect: If a request fails, attempt to reconnect to the broker

Graceful Shutdown

process.on("SIGINT", () => {
worker.stop().then(() => process.exit(0));
});

When stop() is called:

  1. Polling stops immediately
  2. All active job AbortSignals are triggered
  3. Heartbeat timers are cleared
  4. The method waits until activeJobCount reaches 0