Quickstart
This walkthrough runs a complete osqueue setup locally: a broker, a producer that submits jobs, and a worker that processes them.
1. Define a Job Registry
Create a shared registry of job types with Zod schemas. This gives you type-safe payloads across producers and workers.
// registry.ts
import { z } from "zod";
export const registry = {
"email:send": z.object({
to: z.string().email(),
subject: z.string(),
body: z.string(),
}),
"report:generate": z.object({
reportId: z.string(),
format: z.enum(["pdf", "csv"]),
}),
};
2. Start the Broker
The broker is the central coordinator. It manages queue state and exposes REST, WebSocket, and Connect (gRPC) endpoints.
// broker.ts
import { BrokerServer } from "@osqueue/broker";
import { MemoryBackend } from "@osqueue/storage";
const server = new BrokerServer({
storage: new MemoryBackend(),
host: "0.0.0.0",
port: 8080,
});
await server.start();
console.log(`Broker listening on ${server.address}`);
process.on("SIGINT", () => {
server.stop().then(() => process.exit(0));
});
Run it:
bun run broker.ts
# Broker listening on 0.0.0.0:8080
3. Submit Jobs (Producer)
The producer creates an OsqueueClient and submits jobs in a loop.
// producer.ts
import { OsqueueClient } from "@osqueue/client";
import { registry } from "./registry.js";
const client = new OsqueueClient(
{ brokerUrl: "http://localhost:8080" },
registry,
);
const jobId = await client.submitJob("email:send", {
to: "user@example.com",
subject: "Hello!",
body: "This is message #1",
});
console.log(`Submitted email:send job ${jobId}`);
const reportId = await client.submitJob("report:generate", {
reportId: "rpt-1",
format: "pdf",
});
console.log(`Submitted report:generate job ${reportId}`);
Run it:
bun run producer.ts
# Submitted email:send job 550e8400-e29b-41d4-a716-446655440000
# Submitted report:generate job 6ba7b810-9dad-11d1-80b4-00c04fd430c8
4. Process Jobs (Worker)
The worker polls for jobs, validates payloads against the registry, and runs handlers.
// worker.ts
import { OsqueueClient } from "@osqueue/client";
import { Worker } from "@osqueue/worker";
import { registry } from "./registry.js";
const client = new OsqueueClient(
{ brokerUrl: "http://localhost:8080" },
registry,
);
const worker = new Worker({
client,
handlers: {
"email:send": async (payload, signal) => {
console.log(`Sending email to ${payload.to}: "${payload.subject}"`);
// payload is typed as { to: string; subject: string; body: string }
// signal is an AbortSignal, triggered when worker.stop() is called
},
"report:generate": async (payload, signal) => {
console.log(`Generating ${payload.format} report ${payload.reportId}`);
// payload is typed as { reportId: string; format: "pdf" | "csv" }
},
},
});
worker.start();
console.log("Worker started, polling for jobs...");
process.on("SIGINT", () => {
worker.stop().then(() => process.exit(0));
});
Run it:
bun run worker.ts
# Worker started, polling for jobs...
# Sending email to user@example.com: "Hello!"
# Generating pdf report rpt-1
What's Happening
┌──────────┐ ┌──────────┐ ┌───────────────┐
│ Producer │────▶│ Broker │◀────│ Worker │
│ │ │ │ │ │
│ submit() │ │ enqueue │ │ claim() │
│ │ │ claim │ │ heartbeat() │
│ │ │ complete │ │ complete() │
└──────────┘ └────┬─────┘ └───────────────┘
│
▼
┌─────────────┐
│ Storage │
│ queue.json │
└─────────────┘
- The producer calls
submitJob()which sends a request to the broker - The broker batches the enqueue mutation and writes it to
queue.jsonvia CAS - The worker polls
claimJob(), the broker assigns the first unclaimed job - The worker runs the handler, sending periodic heartbeats
- On completion, the worker calls
completeJob()and the job is removed from the queue