Sync Worker
Bridge/RESO to Postgres to Typesense sync pipeline specification
Sync Worker Specification
Bridge/RESO → Postgres → Typesense sync pipeline.
Overview
The sync worker is responsible for:
- Connecting to Bridge Interactive API (RESO Web API / OData)
- Pulling listing data incrementally (using
ModificationTimestamp) - Normalizing data using field mappings
- Upserting to Postgres (canonical listings + media)
- Indexing to Typesense
- Recording sync runs for audit
Worker Interface
interface SyncWorker {
// Main sync operation
runSync(workspaceId: string, mode: 'incremental' | 'full'): Promise<SyncResult>;
// Test API connection (validates token + dataset)
testConnection(workspaceId: string): Promise<ConnectionTestResult>;
// Validate field mapping (samples listings and checks required fields)
validateMapping(workspaceId: string): Promise<MappingValidationResult>;
}
interface SyncResult {
syncRunId: string;
status: 'success' | 'error';
stats: {
fetched: number;
upserted: number;
indexed: number;
errors: number;
};
error?: string;
duration: number; // ms
}
interface ConnectionTestResult {
success: boolean;
message: string;
sampleCount?: number; // number of listings in dataset
}
interface MappingValidationResult {
valid: boolean;
missingFields: string[];
sampleData: Record<string, any>[];
}Bridge API Integration
Authentication
Bridge uses server tokens (passed as query param or header).
const BRIDGE_BASE_URL = 'https://api.bridgedataoutput.com/api/v2/OData';
interface BridgeConfig {
baseUrl: string;
datasetId: string; // e.g., 'test' or actual dataset ID
accessToken: string; // decrypted from mls_connections.token_ciphertext
}
function buildBridgeUrl(
config: BridgeConfig,
resource: string,
params?: Record<string, string>
) {
const url = new URL(`${config.baseUrl}/${config.datasetId}/${resource}`);
url.searchParams.set('access_token', config.accessToken);
if (params) {
Object.entries(params).forEach(([key, value]) => {
url.searchParams.set(key, value);
});
}
return url.toString();
}OData Query Parameters
| Parameter | Usage |
|---|---|
$filter | Filter listings by criteria |
$select | Choose specific fields |
$top | Limit results (max 200) |
$skip | Pagination offset |
$orderby | Sort results |
$expand | Include related entities (Media) |
Incremental Sync Query
function buildIncrementalQuery(lastSyncedAt: Date | null): string {
if (!lastSyncedAt) {
// Full sync - no filter
return '';
}
// ISO 8601 format for OData
const timestamp = lastSyncedAt.toISOString();
return `ModificationTimestamp gt ${timestamp}`;
}
async function fetchListings(
config: BridgeConfig,
filter?: string,
skip = 0
): Promise<BridgeResponse> {
const params: Record<string, string> = {
'$top': '200',
'$skip': skip.toString(),
'$orderby': 'ModificationTimestamp asc',
'$expand': 'Media',
};
if (filter) {
params['$filter'] = filter;
}
const url = buildBridgeUrl(config, 'Property', params);
const response = await fetch(url);
if (!response.ok) {
throw new Error(`Bridge API error: ${response.status} ${response.statusText}`);
}
return response.json();
}Response Structure
interface BridgeResponse {
'@odata.context': string;
'@odata.count'?: number;
'@odata.nextLink'?: string; // URL for next page
value: BridgeListing[];
}
interface BridgeListing {
ListingKey: string;
ListingId?: string;
StandardStatus: string;
ModificationTimestamp: string;
PropertyType: string;
ListPrice: number;
// ... many more RESO fields
Media?: BridgeMedia[];
}
interface BridgeMedia {
MediaKey: string;
MediaURL: string;
Order: number;
MediaCategory?: string;
}Sync Flow
Full Sync
async function runFullSync(workspaceId: string): Promise<SyncResult> {
const startedAt = new Date();
const syncRun = await createSyncRun(workspaceId, 'running');
try {
// 1. Load workspace config
const workspace = await getWorkspace(workspaceId);
const connection = await getMlsConnection(workspaceId);
const mapping = await getActiveMapping(workspaceId);
// 2. Decrypt token
const accessToken = await decryptToken(connection.token_ciphertext);
const config: BridgeConfig = {
baseUrl: connection.base_url,
datasetId: connection.dataset_id,
accessToken,
};
// 3. Fetch all listings (paginated)
const allListings: BridgeListing[] = [];
let skip = 0;
let hasMore = true;
while (hasMore) {
const response = await fetchListings(config, undefined, skip);
allListings.push(...response.value);
hasMore = response['@odata.nextLink'] !== undefined ||
response.value.length === 200;
skip += 200;
// Rate limiting
await sleep(100);
}
// 4. Transform & upsert to Postgres
const normalized = allListings.map(l =>
normalizeListing(l, mapping, workspaceId, connection)
);
const upsertResult = await upsertListings(normalized);
// 5. Upsert media
for (const listing of allListings) {
if (listing.Media?.length) {
await upsertMedia(workspaceId, listing.ListingKey, listing.Media);
}
}
// 6. Index to Typesense
const typesenseDocs = await prepareTypesenseDocs(workspaceId, normalized);
await indexToTypesense(workspace.slug, typesenseDocs);
// 7. Update checkpoint
const latestModification = allListings.reduce((max, l) => {
const ts = new Date(l.ModificationTimestamp);
return ts > max ? ts : max;
}, new Date(0));
await updateMlsConnection(connection.id, {
last_synced_at: latestModification,
status: 'active',
});
// 8. Complete sync run
const stats = {
fetched: allListings.length,
upserted: upsertResult.count,
indexed: typesenseDocs.length,
errors: 0,
};
await completeSyncRun(syncRun.id, 'success', stats);
return {
syncRunId: syncRun.id,
status: 'success',
stats,
duration: Date.now() - startedAt.getTime(),
};
} catch (error) {
await completeSyncRun(
syncRun.id,
'error',
{ fetched: 0, upserted: 0, indexed: 0, errors: 1 },
error.message
);
throw error;
}
}Incremental Sync
async function runIncrementalSync(workspaceId: string): Promise<SyncResult> {
const startedAt = new Date();
const syncRun = await createSyncRun(workspaceId, 'running');
try {
const workspace = await getWorkspace(workspaceId);
const connection = await getMlsConnection(workspaceId);
const mapping = await getActiveMapping(workspaceId);
const accessToken = await decryptToken(connection.token_ciphertext);
const config: BridgeConfig = {
baseUrl: connection.base_url,
datasetId: connection.dataset_id,
accessToken,
};
// Build filter from last sync timestamp
const filter = buildIncrementalQuery(connection.last_synced_at);
// Fetch modified listings
const modifiedListings: BridgeListing[] = [];
let skip = 0;
let hasMore = true;
while (hasMore) {
const response = await fetchListings(config, filter, skip);
modifiedListings.push(...response.value);
hasMore = response['@odata.nextLink'] !== undefined ||
response.value.length === 200;
skip += 200;
await sleep(100);
}
if (modifiedListings.length === 0) {
// Nothing to sync
await completeSyncRun(
syncRun.id,
'success',
{ fetched: 0, upserted: 0, indexed: 0, errors: 0 }
);
return {
syncRunId: syncRun.id,
status: 'success',
stats: { fetched: 0, upserted: 0, indexed: 0, errors: 0 },
duration: Date.now() - startedAt.getTime(),
};
}
// Transform, upsert, index (same as full sync)
const normalized = modifiedListings.map(l =>
normalizeListing(l, mapping, workspaceId, connection)
);
const upsertResult = await upsertListings(normalized);
for (const listing of modifiedListings) {
if (listing.Media?.length) {
await upsertMedia(workspaceId, listing.ListingKey, listing.Media);
}
}
const typesenseDocs = await prepareTypesenseDocs(workspaceId, normalized);
await indexToTypesense(workspace.slug, typesenseDocs);
// Update checkpoint to latest modification
const latestModification = modifiedListings.reduce((max, l) => {
const ts = new Date(l.ModificationTimestamp);
return ts > max ? ts : max;
}, connection.last_synced_at || new Date(0));
await updateMlsConnection(connection.id, {
last_synced_at: latestModification,
status: 'active',
});
const stats = {
fetched: modifiedListings.length,
upserted: upsertResult.count,
indexed: typesenseDocs.length,
errors: 0,
};
await completeSyncRun(syncRun.id, 'success', stats);
return {
syncRunId: syncRun.id,
status: 'success',
stats,
duration: Date.now() - startedAt.getTime(),
};
} catch (error) {
await completeSyncRun(
syncRun.id,
'error',
{ fetched: 0, upserted: 0, indexed: 0, errors: 1 },
error.message
);
throw error;
}
}Field Normalization
interface FieldMapping {
[canonicalField: string]: string; // RESO field name
}
function normalizeListing(
raw: BridgeListing,
mapping: FieldMapping,
workspaceId: string,
connection: MlsConnection
): CanonicalListing {
const getValue = (canonical: string): any => {
const resoField = mapping[canonical];
return resoField ? raw[resoField] : undefined;
};
return {
workspace_id: workspaceId,
source_provider: 'bridge',
source_dataset_id: connection.dataset_id,
source_listing_key: raw.ListingKey,
mls_number: getValue('mls_number') || raw.ListingId,
standard_status: getValue('standard_status') || raw.StandardStatus,
modification_ts: raw.ModificationTimestamp,
list_date: getValue('list_date'),
close_date: getValue('close_date'),
address_full: getValue('address_full'),
street_number: getValue('street_number'),
street_name: getValue('street_name'),
unit_number: getValue('unit_number'),
city: getValue('city'),
state: getValue('state'),
postal_code: getValue('postal_code'),
subdivision: getValue('subdivision'),
building_name: getValue('building_name'),
community: getValue('community'),
county: getValue('county'),
latitude: getValue('latitude'),
longitude: getValue('longitude'),
property_type: getValue('property_type'),
bedrooms: getValue('bedrooms'),
bathrooms: getValue('bathrooms'),
living_area_sqft: getValue('living_area_sqft'),
lot_size_sqft: getValue('lot_size_sqft'),
year_built: getValue('year_built'),
list_price: getValue('list_price'),
close_price: getValue('close_price'),
price_per_sqft: getValue('price_per_sqft'),
public_remarks: getValue('public_remarks'),
photo_count: raw.Media?.length || 0,
is_active: true,
raw: raw, // Keep original for debugging
};
}Postgres Upsert
async function upsertListings(
listings: CanonicalListing[]
): Promise<{ count: number }> {
const supabase = createServiceClient();
const { data, error } = await supabase
.from('listings')
.upsert(listings, {
onConflict: 'workspace_id,source_provider,source_listing_key',
ignoreDuplicates: false,
});
if (error) throw error;
return { count: listings.length };
}
async function upsertMedia(
workspaceId: string,
listingKey: string,
media: BridgeMedia[]
) {
const supabase = createServiceClient();
// Get listing ID
const { data: listing } = await supabase
.from('listings')
.select('id')
.eq('workspace_id', workspaceId)
.eq('source_listing_key', listingKey)
.single();
if (!listing) return;
const mediaRows = media.map((m, idx) => ({
workspace_id: workspaceId,
listing_id: listing.id,
url: m.MediaURL,
sort_order: m.Order ?? idx,
caption: m.MediaCategory || null,
}));
await supabase
.from('listing_media')
.upsert(mediaRows, {
onConflict: 'listing_id,url',
});
}Error Handling
Retry Strategy
async function withRetry<T>(
fn: () => Promise<T>,
maxAttempts = 3,
baseDelay = 1000
): Promise<T> {
let lastError: Error;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error;
if (attempt < maxAttempts) {
const delay = baseDelay * Math.pow(2, attempt - 1);
await sleep(delay);
}
}
}
throw lastError;
}Failure Recovery
- On failure, do not update checkpoint — retry will re-fetch same range
- Log error to
sync_runs.error - Set
mls_connections.statustoerror - Alert admin (future: webhook/email)
Scheduling
Options
-
Vercel Cron (simplest)
// vercel.json { "crons": [ { "path": "/api/cron/sync", "schedule": "0 * * * *" } ] } -
Supabase Edge Function + pg_cron
-
External scheduler (Railway, Render, etc.)
Cron Endpoint
// app/api/cron/sync/route.ts
export async function GET(request: NextRequest) {
// Verify cron secret
const authHeader = request.headers.get('authorization');
if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
return new Response('Unauthorized', { status: 401 });
}
// Get all active workspaces with hourly cadence
const workspaces = await getWorkspacesForSync('hourly');
const results = [];
for (const workspace of workspaces) {
try {
const result = await runIncrementalSync(workspace.id);
results.push({ workspace: workspace.slug, ...result });
} catch (error) {
results.push({ workspace: workspace.slug, error: error.message });
}
}
return NextResponse.json({ results });
}Token Encryption
Option 1: Supabase Vault (recommended)
-- Store token
SELECT vault.create_secret('bridge_token_workspace_123', 'actual_token_here');
-- Retrieve token
SELECT decrypted_secret
FROM vault.decrypted_secrets
WHERE name = 'bridge_token_workspace_123';Option 2: Application-level encryption
import { createCipheriv, createDecipheriv, randomBytes } from 'crypto';
const ALGORITHM = 'aes-256-gcm';
const KEY = Buffer.from(process.env.TOKEN_ENCRYPTION_KEY!, 'hex');
export function encryptToken(token: string): string {
const iv = randomBytes(16);
const cipher = createCipheriv(ALGORITHM, KEY, iv);
let encrypted = cipher.update(token, 'utf8', 'hex');
encrypted += cipher.final('hex');
const authTag = cipher.getAuthTag();
return `${iv.toString('hex')}:${authTag.toString('hex')}:${encrypted}`;
}
export function decryptToken(ciphertext: string): string {
const [ivHex, authTagHex, encrypted] = ciphertext.split(':');
const iv = Buffer.from(ivHex, 'hex');
const authTag = Buffer.from(authTagHex, 'hex');
const decipher = createDecipheriv(ALGORITHM, KEY, iv);
decipher.setAuthTag(authTag);
let decrypted = decipher.update(encrypted, 'hex', 'utf8');
decrypted += decipher.final('utf8');
return decrypted;
}