FrameworkEvents
Outbox Pattern
Reliable event delivery with the transactional outbox pattern
The outbox pattern ensures reliable event delivery by writing events to a database table within the same transaction as business operations, then processing them asynchronously.
Why Use Outbox?
Without outbox, you risk:
- Lost events - If the message queue fails after DB commit
- Duplicate events - If DB fails after queue publish
- Ordering issues - Events may arrive out of order
The outbox pattern provides at-least-once delivery with transactional consistency.
Architecture
Outbox Adapter
Add the outbox adapter to write events during publishing:
import { createOutboxAdapter } from "@lawhive/framework/adapters"
const outboxAdapter = createOutboxAdapter({
storage: {
createMany: async (records, ctx) => {
const results = await ctx.transaction.outbox.createManyAndReturn({
data: records,
})
return { ids: results.map((r) => r.id) }
},
},
})
const client = createEventClient(router, {
adapters: [
createEventStoreAdapter({ storage: eventStorage }),
outboxAdapter,
],
createContext: async (fn) => prisma.$transaction((tx) => fn({ tx })),
})Filtering Events
Only send certain events to the outbox:
const outboxAdapter = createOutboxAdapter({
storage: outboxStorage,
filter: (event) => {
// Only aggregate events, not ephemeral
return event.type === "aggregate"
},
})Outbox Record Schema
The outbox table should have this structure:
model Outbox {
id String @id @default(cuid())
createdAt DateTime @default(now())
status OutboxStatus @default(pending)
key String
aggregateId String?
aggregateType String?
payload Json
retryCount Int @default(0)
lastError String?
processedAt DateTime?
}
enum OutboxStatus {
pending
processing
completed
failed
}Outbox Processor
Process outbox entries asynchronously:
import { createOutboxProcessor } from "@lawhive/framework/adapters"
const processor = createOutboxProcessor({
storage: {
fetchPending: async (limit) => {
return prisma.outbox.findMany({
where: { status: "pending" },
take: limit,
orderBy: { createdAt: "asc" },
})
},
markProcessing: async (ids) => {
await prisma.outbox.updateMany({
where: { id: { in: ids } },
data: { status: "processing" },
})
},
markCompleted: async (ids) => {
await prisma.outbox.updateMany({
where: { id: { in: ids } },
data: { status: "completed", processedAt: new Date() },
})
},
markFailed: async (ids, error) => {
await prisma.outbox.updateMany({
where: { id: { in: ids } },
data: {
status: "failed",
lastError: error,
retryCount: { increment: 1 },
},
})
},
},
handler: async (entries) => {
for (const entry of entries) {
await messageQueue.publish(entry.key, entry.payload)
}
},
batchSize: 10,
pollInterval: 1000,
maxRetries: 3,
})
// Start polling
processor.start()
// Stop gracefully
processor.stop()Processor Options
| Option | Type | Default | Description |
|---|---|---|---|
batchSize | number | 10 | Entries per poll |
pollInterval | number | 1000 | Milliseconds between polls |
maxRetries | number | 3 | Max retries before dead letter |
onDeadLetter | function | - | Called when max retries exceeded |
Dead Letter Handling
Handle events that fail after all retries:
const processor = createOutboxProcessor({
storage: processorStorage,
handler: publishToQueue,
maxRetries: 3,
onDeadLetter: async (entries, error) => {
// Move to dead letter table
await prisma.deadLetter.createMany({
data: entries.map((e) => ({
originalId: e.id,
key: e.key,
payload: e.payload,
error: error.message,
})),
})
// Alert operations team
await alerting.send({
severity: "high",
message: `${entries.length} events moved to dead letter`,
error: error.message,
})
},
})Manual Processing
For testing or one-off runs:
// Process once without polling
const processed = await processor.processOnce()
console.log(`Processed ${processed} entries`)
// Check if running
if (processor.isRunning()) {
processor.stop()
}Integration with Inngest
Use with Inngest for serverless processing:
import { Inngest } from "inngest"
const inngest = new Inngest({ id: "my-app" })
// Inngest function to process outbox
export const processOutbox = inngest.createFunction(
{ id: "process-outbox" },
{ cron: "* * * * *" }, // Every minute
async () => {
const entries = await prisma.outbox.findMany({
where: { status: "pending" },
take: 100,
})
for (const entry of entries) {
await inngest.send({
name: entry.key,
data: entry.payload,
})
}
await prisma.outbox.updateMany({
where: { id: { in: entries.map((e) => e.id) } },
data: { status: "completed", processedAt: new Date() },
})
},
)Best Practices
- Index status column - For efficient pending queries
- Use transactions - Outbox writes must be in same transaction as business logic
- Idempotent handlers - Consumers should handle duplicates gracefully
- Monitor lag - Alert if outbox queue grows too large
- Cleanup completed - Archive or delete old completed entries