Step 4: Pull Endpoint
Implement the endpoint that sends server changes to clients
Step 4: Create the Pull Endpoint
The pull endpoint sends changelog entries and affected records from the server to clients. The sync worker calls this endpoint periodically to fetch changes and stay synchronized.
Endpoint Setup
Create a new API route (e.g., routes/api/sync/pull/+server.ts for SvelteKit):
import { pullAndMaterializeLogs } from "$lib/prisma-idb/server/batch-processor";
import { auth } from "$lib/server/auth";
import { prisma } from "$lib/server/prisma";
import z from "zod";
export async function POST({ request }) {
// Parse and validate request body
let pullRequestBody;
try {
pullRequestBody = await request.json();
} catch {
return new Response(JSON.stringify({ error: "Malformed JSON" }), { status: 400 });
}
const parsed = z.object({ lastChangelogId: z.uuidv7().optional() }).safeParse(pullRequestBody);
if (!parsed.success) {
return new Response(JSON.stringify({ error: "Invalid request", details: parsed.error }), {
status: 400,
});
}
// Authenticate the request
const authResult = await auth.api.getSession({ headers: request.headers });
if (!authResult?.user.id) {
return new Response(JSON.stringify({ error: "Unauthorized" }), { status: 401 });
}
// Fetch and materialize changes
const logsWithRecords = await pullAndMaterializeLogs({
prisma,
scopeKey: authResult.user.id,
lastChangelogId: parsed.data.lastChangelogId,
});
return new Response(
JSON.stringify({
cursor: logsWithRecords.at(-1)?.id ?? parsed.data.lastChangelogId ?? null,
logsWithRecords,
}),
{
status: 200,
headers: { "Content-Type": "application/json" },
}
);
}How It Works
- Validate Input: Parse the optional
lastChangelogIdcursor - Authenticate: Get the user ID from the session/auth context
- Materialize Changes: Call
pullAndMaterializeLogs()to fetch changelog entries and their associated records - Return Cursor: Send back the latest changelog ID for the client to use on the next pull
Key Parameters
pullAndMaterializeLogs() Options
prisma: Prisma Client instancescopeKey: User ID. Only returns changes scoped to this user.lastChangelogId(optional): UUID v7 changelog ID from the client's last pull. Defaults to the beginning of time.
Response Format
The endpoint returns an object with:
{
cursor: string | null, // Latest changelog ID (use for next pull)
logsWithRecords: LogWithRecord[] // Array of changes with associated data
}Each LogWithRecord contains:
{
id: string, // Changelog entry ID
model: string, // Model name (e.g., "Post")
keyPath: (string | number)[], // Primary key(s) of the affected record
operation: "create" | "update" | "delete",
scopeKey: string, // User ID that owns this data
record?: object | null, // The actual record (null for deletes)
changelogId: string // Same as id
}Pagination & Batching
The pull operation is naturally paginated:
- Each pull request is limited by the database query, not the response size
- The client stores the returned
cursorand uses it for the next pull - Repeated pulls automatically continue from where they left off
- No explicit page size parameter needed
Scope Isolation
The scopeKey parameter ensures:
- Clients only receive changes to their own data
- If a user modifies a shared resource, the change is only visible to users who own that resource
- Multi-user conflicts are impossible (each user sees only their own changes)
Client Integration
After implementing both endpoints, the client's sync worker will:
- Periodically call the push endpoint with new mutations from the outbox
- Periodically call the pull endpoint with the last cursor
- Apply incoming changes to IndexedDB
- Keep the local state in sync with the server
See Sync Engine for how the sync worker orchestrates push and pull operations.
Best Practices
- Authentication: Always verify the user before accessing their data
- Error Recovery: Return proper HTTP status codes so the client knows if an error is retryable
- Performance: The
pullAndMaterializeLogs()function is optimized; trust it to fetch only necessary records - Changelog Cleanup: The
Changelogtable grows over time. Implement a periodic job to clean up old entries (keep at least 7 days of history for active users)
Complete Example with Error Handling
export async function POST({ request }) {
let pullRequestBody;
try {
pullRequestBody = await request.json();
} catch {
return new Response(JSON.stringify({ error: "Malformed JSON" }), { status: 400 });
}
const parsed = z.object({ lastChangelogId: z.uuidv7().optional() }).safeParse(pullRequestBody);
if (!parsed.success) {
return new Response(JSON.stringify({ error: "Invalid request", details: parsed.error }), {
status: 400,
});
}
const authResult = await auth.api.getSession({ headers: request.headers });
if (!authResult?.user.id) {
return new Response(JSON.stringify({ error: "Unauthorized" }), { status: 401 });
}
try {
const logsWithRecords = await pullAndMaterializeLogs({
prisma,
scopeKey: authResult.user.id,
lastChangelogId: parsed.data.lastChangelogId,
});
return new Response(
JSON.stringify({
cursor: logsWithRecords.at(-1)?.id ?? parsed.data.lastChangelogId ?? null,
logsWithRecords,
}),
{
status: 200,
headers: { "Content-Type": "application/json" },
}
);
} catch (error) {
console.error("Pull failed:", error);
return new Response(JSON.stringify({ error: "Failed to fetch changes" }), {
status: 500,
});
}
}For a complete working example, check the pidb-kanban-example.