Lawhive Framework
FrameworkEvents

Outbox Pattern

Reliable event delivery with the transactional outbox pattern

The outbox pattern ensures reliable event delivery by writing events to a database table within the same transaction as business operations, then processing them asynchronously.

Why Use Outbox?

Without outbox, you risk:

  • Lost events - If the message queue fails after DB commit
  • Duplicate events - If DB fails after queue publish
  • Ordering issues - Events may arrive out of order

The outbox pattern provides at-least-once delivery with transactional consistency.

Architecture

Outbox Adapter

Add the outbox adapter to write events during publishing:

import { createOutboxAdapter } from "@lawhive/framework/adapters"

const outboxAdapter = createOutboxAdapter({
  storage: {
    createMany: async (records, ctx) => {
      const results = await ctx.transaction.outbox.createManyAndReturn({
        data: records,
      })
      return { ids: results.map((r) => r.id) }
    },
  },
})

const client = createEventClient(router, {
  adapters: [
    createEventStoreAdapter({ storage: eventStorage }),
    outboxAdapter,
  ],
  createContext: async (fn) => prisma.$transaction((tx) => fn({ tx })),
})

Filtering Events

Only send certain events to the outbox:

const outboxAdapter = createOutboxAdapter({
  storage: outboxStorage,
  filter: (event) => {
    // Only aggregate events, not ephemeral
    return event.type === "aggregate"
  },
})

Outbox Record Schema

The outbox table should have this structure:

model Outbox {
  id            String       @id @default(cuid())
  createdAt     DateTime     @default(now())
  status        OutboxStatus @default(pending)
  key           String
  aggregateId   String?
  aggregateType String?
  payload       Json
  retryCount    Int          @default(0)
  lastError     String?
  processedAt   DateTime?
}

enum OutboxStatus {
  pending
  processing
  completed
  failed
}

Outbox Processor

Process outbox entries asynchronously:

import { createOutboxProcessor } from "@lawhive/framework/adapters"

const processor = createOutboxProcessor({
  storage: {
    fetchPending: async (limit) => {
      return prisma.outbox.findMany({
        where: { status: "pending" },
        take: limit,
        orderBy: { createdAt: "asc" },
      })
    },
    markProcessing: async (ids) => {
      await prisma.outbox.updateMany({
        where: { id: { in: ids } },
        data: { status: "processing" },
      })
    },
    markCompleted: async (ids) => {
      await prisma.outbox.updateMany({
        where: { id: { in: ids } },
        data: { status: "completed", processedAt: new Date() },
      })
    },
    markFailed: async (ids, error) => {
      await prisma.outbox.updateMany({
        where: { id: { in: ids } },
        data: {
          status: "failed",
          lastError: error,
          retryCount: { increment: 1 },
        },
      })
    },
  },
  handler: async (entries) => {
    for (const entry of entries) {
      await messageQueue.publish(entry.key, entry.payload)
    }
  },
  batchSize: 10,
  pollInterval: 1000,
  maxRetries: 3,
})

// Start polling
processor.start()

// Stop gracefully
processor.stop()

Processor Options

OptionTypeDefaultDescription
batchSizenumber10Entries per poll
pollIntervalnumber1000Milliseconds between polls
maxRetriesnumber3Max retries before dead letter
onDeadLetterfunction-Called when max retries exceeded

Dead Letter Handling

Handle events that fail after all retries:

const processor = createOutboxProcessor({
  storage: processorStorage,
  handler: publishToQueue,
  maxRetries: 3,
  onDeadLetter: async (entries, error) => {
    // Move to dead letter table
    await prisma.deadLetter.createMany({
      data: entries.map((e) => ({
        originalId: e.id,
        key: e.key,
        payload: e.payload,
        error: error.message,
      })),
    })

    // Alert operations team
    await alerting.send({
      severity: "high",
      message: `${entries.length} events moved to dead letter`,
      error: error.message,
    })
  },
})

Manual Processing

For testing or one-off runs:

// Process once without polling
const processed = await processor.processOnce()
console.log(`Processed ${processed} entries`)

// Check if running
if (processor.isRunning()) {
  processor.stop()
}

Integration with Inngest

Use with Inngest for serverless processing:

import { Inngest } from "inngest"

const inngest = new Inngest({ id: "my-app" })

// Inngest function to process outbox
export const processOutbox = inngest.createFunction(
  { id: "process-outbox" },
  { cron: "* * * * *" }, // Every minute
  async () => {
    const entries = await prisma.outbox.findMany({
      where: { status: "pending" },
      take: 100,
    })

    for (const entry of entries) {
      await inngest.send({
        name: entry.key,
        data: entry.payload,
      })
    }

    await prisma.outbox.updateMany({
      where: { id: { in: entries.map((e) => e.id) } },
      data: { status: "completed", processedAt: new Date() },
    })
  },
)

Best Practices

  1. Index status column - For efficient pending queries
  2. Use transactions - Outbox writes must be in same transaction as business logic
  3. Idempotent handlers - Consumers should handle duplicates gracefully
  4. Monitor lag - Alert if outbox queue grows too large
  5. Cleanup completed - Archive or delete old completed entries