API Design & Helpers
Building sync endpoints and using server-side helpers
The generator provides two helper functions for implementing server-side sync: applyPush processes client mutations, and pullAndMaterializeLogs fetches server changes.
applyPush: Process Client Mutations
The applyPush function handles incoming outbox events from clients. Here's the actual signature from batch-processor.ts:
export interface ApplyPushOptions {
events: OutboxEventRecord[];
scopeKey: string | ((event: OutboxEventRecord) => string);
prisma: PrismaClient;
customValidation?: (
event: EventsFor<typeof validators>
) => { errorMessage: string | null } | Promise<{ errorMessage: string | null }>;
}
export async function applyPush({
events,
scopeKey,
prisma,
customValidation,
}: ApplyPushOptions): Promise<PushResult[]>;Each event in the result has this structure:
export interface PushResult {
id: string; // Event ID from outbox
appliedChangelogId: string | null; // ID if successfully applied
error: null | {
type: keyof typeof PushErrorTypes;
message: string;
retryable: boolean;
};
}Error types include:
INVALID_MODEL- Entity type not recognizedRECORD_VALIDATION_FAILURE- Record doesn't match schemaMISSING_PARENT- Foreign key target doesn't existSCOPE_VIOLATION- User lacks permissionUNKNOWN_OPERATION- Not create/update/delete
Push Endpoint Implementation
See Step 3: Push Endpoint for complete implementation. Here's the core pattern:
import { applyPush } from "$lib/prisma-idb/server/batch-processor";
import { outboxEventSchema } from "$lib/prisma-idb/validators";
import { auth } from "$lib/server/auth";
import { prisma } from "$lib/server/prisma";
import z from "zod";
export async function POST({ request }) {
let body;
try {
body = await request.json();
} catch {
return new Response(JSON.stringify({ error: "Malformed JSON" }), { status: 400 });
}
const parsed = z.object({ events: z.array(outboxEventSchema) }).safeParse({
events: body.events,
});
if (!parsed.success) {
return new Response(JSON.stringify({ error: "Invalid request" }), { status: 400 });
}
const authResult = await auth.api.getSession({ headers: request.headers });
if (!authResult?.user.id) {
return new Response(JSON.stringify({ error: "Unauthorized" }), { status: 401 });
}
try {
const pushResults = await applyPush({
events: parsed.data.events,
scopeKey: authResult.user.id,
prisma,
});
return new Response(JSON.stringify(pushResults), { status: 200 });
} catch (error) {
const message = error instanceof Error ? error.message : "Unknown error";
const status = message.includes("Batch size") ? 413 : 500;
return new Response(JSON.stringify({ error: message }), { status });
}
}pullAndMaterializeLogs: Fetch Server Changes
The pullAndMaterializeLogs function returns changelog entries with their associated records:
export async function pullAndMaterializeLogs({
prisma,
scopeKey,
lastChangelogId,
}: {
prisma: PrismaClient;
scopeKey: string;
lastChangelogId?: string;
}): Promise<LogWithRecord<typeof validators>[]>;Each result is a changelog entry with materialized record data:
export type LogWithRecord<V extends Partial<Record<string, ZodTypeAny>>> = {
[M in keyof V & string]: Omit<Changelog, "model" | "keyPath"> & {
model: M;
keyPath: Array<string | number>;
record?: z.infer<V[M]> | null;
changelogId: string;
};
}[keyof V & string];Pull Endpoint Implementation
See Step 4: Pull Endpoint for complete implementation. Here's the core pattern:
import { pullAndMaterializeLogs } from "$lib/prisma-idb/server/batch-processor";
import { auth } from "$lib/server/auth";
import { prisma } from "$lib/server/prisma";
import z from "zod";
export async function POST({ request }) {
let body;
try {
body = await request.json();
} catch {
return new Response(JSON.stringify({ error: "Malformed JSON" }), { status: 400 });
}
const parsed = z.object({ lastChangelogId: z.string().uuid().optional() }).safeParse(body);
if (!parsed.success) {
return new Response(JSON.stringify({ error: "Invalid request" }), { status: 400 });
}
const authResult = await auth.api.getSession({ headers: request.headers });
if (!authResult?.user.id) {
return new Response(JSON.stringify({ error: "Unauthorized" }), { status: 401 });
}
try {
const logsWithRecords = await pullAndMaterializeLogs({
prisma,
scopeKey: authResult.user.id,
lastChangelogId: parsed.data.lastChangelogId,
});
return new Response(
JSON.stringify({
cursor: logsWithRecords.at(-1)?.id ?? parsed.data.lastChangelogId ?? null,
logsWithRecords,
}),
{ status: 200 }
);
} catch (error) {
console.error("Pull failed:", error);
return new Response(JSON.stringify({ error: "Pull failed" }), { status: 500 });
}
}Client Integration
The sync worker calls these endpoints. See Step 5: Sync Worker for how to configure:
export class TodosState {
syncWorker = getClient().createSyncWorker({
push: {
handler: async (events) => {
const response = await fetch("/api/sync/push", {
method: "POST",
body: JSON.stringify({ events }),
});
if (!response.ok) throw new Error(`Push failed: ${response.status}`);
return response.json();
},
},
pull: {
handler: async (cursor) => {
const response = await fetch("/api/sync/pull", {
method: "POST",
body: JSON.stringify({ lastChangelogId: cursor?.toString() }),
});
if (!response.ok) throw new Error(`Pull failed: ${response.status}`);
return response.json();
},
},
});
}Scope Key
The scopeKey ensures data isolation. It can be:
- A static string:
scopeKey: userId - A function:
scopeKey: (event) => event.payload.tenantId
The applyPush function validates that mutations belong to the correct scope and rejects modifications to other users' data.
Custom Validation
Add domain-specific validation to applyPush:
const result = await applyPush({
events,
scopeKey: userId,
prisma,
customValidation: async (event) => {
// Example: Board names must be unique per user
if (event.entityType === "Board" && event.operation === "create") {
const existing = await prisma.board.findFirst({
where: {
name: event.payload.name,
userId,
},
});
if (existing) {
return { errorMessage: "Board name already exists" };
}
}
return { errorMessage: null };
},
});Testing
Test sync endpoints with realistic scenarios:
import { expect, test } from "vitest";
test("push creates changelog entries", async () => {
const result = await fetch("/api/sync/push", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
events: [
{
id: "evt-1",
entityType: "Board",
operation: "create",
payload: { id: "board-1", name: "Test", createdAt: new Date(), userId: "user-1" },
createdAt: new Date(),
tries: 0,
lastError: null,
synced: false,
syncedAt: null,
retryable: true,
lastAttemptedAt: null,
},
],
}),
});
expect(result.status).toBe(200);
const data = await result.json();
expect(data[0].error).toBeNull();
expect(data[0].appliedChangelogId).toBeTruthy();
});
test("pull returns changes since cursor", async () => {
const result = await fetch("/api/sync/pull", {
method: "POST",
body: JSON.stringify({}),
});
expect(result.status).toBe(200);
const { logsWithRecords, cursor } = await result.json();
expect(logsWithRecords).toBeInstanceOf(Array);
});