Prisma IDB FaviconPrisma IDB

API Design & Helpers

Building sync endpoints and using server-side helpers

The generator provides two helper functions for implementing server-side sync: applyPush processes client mutations, and pullAndMaterializeLogs fetches server changes.

applyPush: Process Client Mutations

The applyPush function handles incoming outbox events from clients. Here's the actual signature from batch-processor.ts:

export interface ApplyPushOptions {
  events: OutboxEventRecord[];
  scopeKey: string | ((event: OutboxEventRecord) => string);
  prisma: PrismaClient;
  customValidation?: (
    event: EventsFor<typeof validators>
  ) => { errorMessage: string | null } | Promise<{ errorMessage: string | null }>;
}

export async function applyPush({
  events,
  scopeKey,
  prisma,
  customValidation,
}: ApplyPushOptions): Promise<PushResult[]>;

Each event in the result has this structure:

export interface PushResult {
  id: string; // Event ID from outbox
  appliedChangelogId: string | null; // ID if successfully applied
  error: null | {
    type: keyof typeof PushErrorTypes;
    message: string;
    retryable: boolean;
  };
}

Error types include:

  • INVALID_MODEL - Entity type not recognized
  • RECORD_VALIDATION_FAILURE - Record doesn't match schema
  • MISSING_PARENT - Foreign key target doesn't exist
  • SCOPE_VIOLATION - User lacks permission
  • UNKNOWN_OPERATION - Not create/update/delete

Push Endpoint Implementation

See Step 3: Push Endpoint for complete implementation. Here's the core pattern:

import { applyPush } from "$lib/prisma-idb/server/batch-processor";
import { outboxEventSchema } from "$lib/prisma-idb/validators";
import { auth } from "$lib/server/auth";
import { prisma } from "$lib/server/prisma";
import z from "zod";

export async function POST({ request }) {
  let body;
  try {
    body = await request.json();
  } catch {
    return new Response(JSON.stringify({ error: "Malformed JSON" }), { status: 400 });
  }

  const parsed = z.object({ events: z.array(outboxEventSchema) }).safeParse({
    events: body.events,
  });

  if (!parsed.success) {
    return new Response(JSON.stringify({ error: "Invalid request" }), { status: 400 });
  }

  const authResult = await auth.api.getSession({ headers: request.headers });
  if (!authResult?.user.id) {
    return new Response(JSON.stringify({ error: "Unauthorized" }), { status: 401 });
  }

  try {
    const pushResults = await applyPush({
      events: parsed.data.events,
      scopeKey: authResult.user.id,
      prisma,
    });

    return new Response(JSON.stringify(pushResults), { status: 200 });
  } catch (error) {
    const message = error instanceof Error ? error.message : "Unknown error";
    const status = message.includes("Batch size") ? 413 : 500;
    return new Response(JSON.stringify({ error: message }), { status });
  }
}

pullAndMaterializeLogs: Fetch Server Changes

The pullAndMaterializeLogs function returns changelog entries with their associated records:

export async function pullAndMaterializeLogs({
  prisma,
  scopeKey,
  lastChangelogId,
}: {
  prisma: PrismaClient;
  scopeKey: string;
  lastChangelogId?: string;
}): Promise<LogWithRecord<typeof validators>[]>;

Each result is a changelog entry with materialized record data:

export type LogWithRecord<V extends Partial<Record<string, ZodTypeAny>>> = {
  [M in keyof V & string]: Omit<Changelog, "model" | "keyPath"> & {
    model: M;
    keyPath: Array<string | number>;
    record?: z.infer<V[M]> | null;
    changelogId: string;
  };
}[keyof V & string];

Pull Endpoint Implementation

See Step 4: Pull Endpoint for complete implementation. Here's the core pattern:

import { pullAndMaterializeLogs } from "$lib/prisma-idb/server/batch-processor";
import { auth } from "$lib/server/auth";
import { prisma } from "$lib/server/prisma";
import z from "zod";

export async function POST({ request }) {
  let body;
  try {
    body = await request.json();
  } catch {
    return new Response(JSON.stringify({ error: "Malformed JSON" }), { status: 400 });
  }

  const parsed = z.object({ lastChangelogId: z.string().uuid().optional() }).safeParse(body);

  if (!parsed.success) {
    return new Response(JSON.stringify({ error: "Invalid request" }), { status: 400 });
  }

  const authResult = await auth.api.getSession({ headers: request.headers });
  if (!authResult?.user.id) {
    return new Response(JSON.stringify({ error: "Unauthorized" }), { status: 401 });
  }

  try {
    const logsWithRecords = await pullAndMaterializeLogs({
      prisma,
      scopeKey: authResult.user.id,
      lastChangelogId: parsed.data.lastChangelogId,
    });

    return new Response(
      JSON.stringify({
        cursor: logsWithRecords.at(-1)?.id ?? parsed.data.lastChangelogId ?? null,
        logsWithRecords,
      }),
      { status: 200 }
    );
  } catch (error) {
    console.error("Pull failed:", error);
    return new Response(JSON.stringify({ error: "Pull failed" }), { status: 500 });
  }
}

Client Integration

The sync worker calls these endpoints. See Step 5: Sync Worker for how to configure:

export class TodosState {
  syncWorker = getClient().createSyncWorker({
    push: {
      handler: async (events) => {
        const response = await fetch("/api/sync/push", {
          method: "POST",
          body: JSON.stringify({ events }),
        });
        if (!response.ok) throw new Error(`Push failed: ${response.status}`);
        return response.json();
      },
    },
    pull: {
      handler: async (cursor) => {
        const response = await fetch("/api/sync/pull", {
          method: "POST",
          body: JSON.stringify({ lastChangelogId: cursor?.toString() }),
        });
        if (!response.ok) throw new Error(`Pull failed: ${response.status}`);
        return response.json();
      },
    },
  });
}

Scope Key

The scopeKey ensures data isolation. It can be:

  • A static string: scopeKey: userId
  • A function: scopeKey: (event) => event.payload.tenantId

The applyPush function validates that mutations belong to the correct scope and rejects modifications to other users' data.

Custom Validation

Add domain-specific validation to applyPush:

const result = await applyPush({
  events,
  scopeKey: userId,
  prisma,
  customValidation: async (event) => {
    // Example: Board names must be unique per user
    if (event.entityType === "Board" && event.operation === "create") {
      const existing = await prisma.board.findFirst({
        where: {
          name: event.payload.name,
          userId,
        },
      });
      if (existing) {
        return { errorMessage: "Board name already exists" };
      }
    }
    return { errorMessage: null };
  },
});

Testing

Test sync endpoints with realistic scenarios:

import { expect, test } from "vitest";

test("push creates changelog entries", async () => {
  const result = await fetch("/api/sync/push", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({
      events: [
        {
          id: "evt-1",
          entityType: "Board",
          operation: "create",
          payload: { id: "board-1", name: "Test", createdAt: new Date(), userId: "user-1" },
          createdAt: new Date(),
          tries: 0,
          lastError: null,
          synced: false,
          syncedAt: null,
          retryable: true,
          lastAttemptedAt: null,
        },
      ],
    }),
  });

  expect(result.status).toBe(200);
  const data = await result.json();
  expect(data[0].error).toBeNull();
  expect(data[0].appliedChangelogId).toBeTruthy();
});

test("pull returns changes since cursor", async () => {
  const result = await fetch("/api/sync/pull", {
    method: "POST",
    body: JSON.stringify({}),
  });

  expect(result.status).toBe(200);
  const { logsWithRecords, cursor } = await result.json();
  expect(logsWithRecords).toBeInstanceOf(Array);
});

On this page