Sync Worker
Bridge/RESO to Postgres to Typesense sync pipeline specification
Sync Worker Specification
Bridge/RESO → Postgres → Typesense sync pipeline.
Overview
The sync worker is responsible for:
- Connecting to Bridge Interactive API (RESO Web API / OData)
- Pulling listing data incrementally (using
ModificationTimestamp) - Normalizing data using field mappings
- Upserting to Postgres (canonical listings + media)
- Indexing to Typesense
- Recording sync runs for audit
Worker Interface
interface SyncWorker {
// Main sync operation
runSync(workspaceId: string, mode: 'incremental' | 'full'): Promise<SyncResult>;
// Test API connection (validates token + dataset)
testConnection(workspaceId: string): Promise<ConnectionTestResult>;
// Validate field mapping (samples listings and checks required fields)
validateMapping(workspaceId: string): Promise<MappingValidationResult>;
}
interface SyncResult {
syncRunId: string;
status: 'success' | 'error';
stats: {
fetched: number;
upserted: number;
indexed: number;
errors: number;
};
error?: string;
duration: number; // ms
}
interface ConnectionTestResult {
success: boolean;
message: string;
sampleCount?: number; // number of listings in dataset
}
interface MappingValidationResult {
valid: boolean;
missingFields: string[];
sampleData: Record<string, any>[];
}Bridge API Integration
Authentication
Bridge uses server tokens (passed as query param or header).
const BRIDGE_BASE_URL = 'https://api.bridgedataoutput.com/api/v2/OData';
interface BridgeConfig {
baseUrl: string;
datasetId: string; // e.g., 'test' or actual dataset ID
accessToken: string; // decrypted from mls_connections.token_ciphertext
}
function buildBridgeUrl(
config: BridgeConfig,
resource: string,
params?: Record<string, string>
) {
const url = new URL(`${config.baseUrl}/${config.datasetId}/${resource}`);
url.searchParams.set('access_token', config.accessToken);
if (params) {
Object.entries(params).forEach(([key, value]) => {
url.searchParams.set(key, value);
});
}
return url.toString();
}OData Query Parameters
| Parameter | Usage |
|---|---|
$filter | Filter listings by criteria |
$select | Choose specific fields |
$top | Limit results (max 200) |
$skip | Pagination offset |
$orderby | Sort results |
$expand | Include related entities |
Do NOT use $expand=Media
Using $expand=Media in Bridge API queries causes MediaURL to return null for all media items. Media is already embedded in the Property response as a Media array — no expansion is needed. This was confirmed by Bridge Interactive Support. See Bridge API Reference for details.
Incremental Sync Query
function buildIncrementalQuery(lastSyncedAt: Date | null): string {
if (!lastSyncedAt) {
// Full sync - no filter
return '';
}
// ISO 8601 format for OData
const timestamp = lastSyncedAt.toISOString();
return `ModificationTimestamp gt ${timestamp}`;
}
async function fetchListings(
config: BridgeConfig,
filter?: string,
skip = 0
): Promise<BridgeResponse> {
const params: Record<string, string> = {
'$top': '200',
'$skip': skip.toString(),
'$orderby': 'ModificationTimestamp asc',
};
if (filter) {
params['$filter'] = filter;
}
const url = buildBridgeUrl(config, 'Property', params);
const response = await fetch(url);
if (!response.ok) {
throw new Error(`Bridge API error: ${response.status} ${response.statusText}`);
}
return response.json();
}Response Structure
interface BridgeResponse {
'@odata.context': string;
'@odata.count'?: number;
'@odata.nextLink'?: string; // URL for next page
value: BridgeListing[];
}
interface BridgeListing {
ListingKey: string;
ListingId?: string;
StandardStatus: string;
ModificationTimestamp: string;
PropertyType: string;
ListPrice: number;
// ... many more RESO fields
Media?: BridgeMedia[];
}
interface BridgeMedia {
MediaKey: string;
MediaURL: string;
Order: number;
MediaCategory?: string;
}Sync Flow
Full Sync
async function runFullSync(workspaceId: string): Promise<SyncResult> {
const startedAt = new Date();
const syncRun = await createSyncRun(workspaceId, 'running');
try {
// 1. Load workspace config
const workspace = await getWorkspace(workspaceId);
const connection = await getMlsConnection(workspaceId);
const mapping = await getActiveMapping(workspaceId);
// 2. Decrypt token
const accessToken = await decryptToken(connection.token_ciphertext);
const config: BridgeConfig = {
baseUrl: connection.base_url,
datasetId: connection.dataset_id,
accessToken,
};
// 3. Fetch all listings (paginated)
const allListings: BridgeListing[] = [];
let skip = 0;
let hasMore = true;
while (hasMore) {
const response = await fetchListings(config, undefined, skip);
allListings.push(...response.value);
hasMore = response['@odata.nextLink'] !== undefined ||
response.value.length === 200;
skip += 200;
// Rate limiting
await sleep(100);
}
// 4. Transform & upsert to Postgres
const normalized = allListings.map(l =>
normalizeListing(l, mapping, workspaceId, connection)
);
const upsertResult = await upsertListings(normalized);
// 5. Upsert media
for (const listing of allListings) {
if (listing.Media?.length) {
await upsertMedia(workspaceId, listing.ListingKey, listing.Media);
}
}
// 6. Index to Typesense
const typesenseDocs = await prepareTypesenseDocs(workspaceId, normalized);
await indexToTypesense(workspace.slug, typesenseDocs);
// 7. Update checkpoint
const latestModification = allListings.reduce((max, l) => {
const ts = new Date(l.ModificationTimestamp);
return ts > max ? ts : max;
}, new Date(0));
await updateMlsConnection(connection.id, {
last_synced_at: latestModification,
status: 'active',
});
// 8. Complete sync run
const stats = {
fetched: allListings.length,
upserted: upsertResult.count,
indexed: typesenseDocs.length,
errors: 0,
};
await completeSyncRun(syncRun.id, 'success', stats);
return {
syncRunId: syncRun.id,
status: 'success',
stats,
duration: Date.now() - startedAt.getTime(),
};
} catch (error) {
await completeSyncRun(
syncRun.id,
'error',
{ fetched: 0, upserted: 0, indexed: 0, errors: 1 },
error.message
);
throw error;
}
}Incremental Sync
async function runIncrementalSync(workspaceId: string): Promise<SyncResult> {
const startedAt = new Date();
const syncRun = await createSyncRun(workspaceId, 'running');
try {
const workspace = await getWorkspace(workspaceId);
const connection = await getMlsConnection(workspaceId);
const mapping = await getActiveMapping(workspaceId);
const accessToken = await decryptToken(connection.token_ciphertext);
const config: BridgeConfig = {
baseUrl: connection.base_url,
datasetId: connection.dataset_id,
accessToken,
};
// Build filter from last sync timestamp
const filter = buildIncrementalQuery(connection.last_synced_at);
// Fetch modified listings
const modifiedListings: BridgeListing[] = [];
let skip = 0;
let hasMore = true;
while (hasMore) {
const response = await fetchListings(config, filter, skip);
modifiedListings.push(...response.value);
hasMore = response['@odata.nextLink'] !== undefined ||
response.value.length === 200;
skip += 200;
await sleep(100);
}
if (modifiedListings.length === 0) {
// Nothing to sync
await completeSyncRun(
syncRun.id,
'success',
{ fetched: 0, upserted: 0, indexed: 0, errors: 0 }
);
return {
syncRunId: syncRun.id,
status: 'success',
stats: { fetched: 0, upserted: 0, indexed: 0, errors: 0 },
duration: Date.now() - startedAt.getTime(),
};
}
// Transform, upsert, index (same as full sync)
const normalized = modifiedListings.map(l =>
normalizeListing(l, mapping, workspaceId, connection)
);
const upsertResult = await upsertListings(normalized);
for (const listing of modifiedListings) {
if (listing.Media?.length) {
await upsertMedia(workspaceId, listing.ListingKey, listing.Media);
}
}
const typesenseDocs = await prepareTypesenseDocs(workspaceId, normalized);
await indexToTypesense(workspace.slug, typesenseDocs);
// Update checkpoint to latest modification
const latestModification = modifiedListings.reduce((max, l) => {
const ts = new Date(l.ModificationTimestamp);
return ts > max ? ts : max;
}, connection.last_synced_at || new Date(0));
await updateMlsConnection(connection.id, {
last_synced_at: latestModification,
status: 'active',
});
const stats = {
fetched: modifiedListings.length,
upserted: upsertResult.count,
indexed: typesenseDocs.length,
errors: 0,
};
await completeSyncRun(syncRun.id, 'success', stats);
return {
syncRunId: syncRun.id,
status: 'success',
stats,
duration: Date.now() - startedAt.getTime(),
};
} catch (error) {
await completeSyncRun(
syncRun.id,
'error',
{ fetched: 0, upserted: 0, indexed: 0, errors: 1 },
error.message
);
throw error;
}
}Field Normalization
interface FieldMapping {
[canonicalField: string]: string; // RESO field name
}
function normalizeListing(
raw: BridgeListing,
mapping: FieldMapping,
workspaceId: string,
connection: MlsConnection
): CanonicalListing {
const getValue = (canonical: string): any => {
const resoField = mapping[canonical];
return resoField ? raw[resoField] : undefined;
};
return {
workspace_id: workspaceId,
source_provider: 'bridge',
source_dataset_id: connection.dataset_id,
source_listing_key: raw.ListingKey,
mls_number: getValue('mls_number') || raw.ListingId,
standard_status: getValue('standard_status') || raw.StandardStatus,
modification_ts: raw.ModificationTimestamp,
list_date: getValue('list_date'),
close_date: getValue('close_date'),
address_full: getValue('address_full'),
street_number: getValue('street_number'),
street_name: getValue('street_name'),
unit_number: getValue('unit_number'),
city: getValue('city'),
state: getValue('state'),
postal_code: getValue('postal_code'),
subdivision: getValue('subdivision'),
building_name: getValue('building_name'),
community: getValue('community'),
county: getValue('county'),
latitude: getValue('latitude'),
longitude: getValue('longitude'),
property_type: getValue('property_type'),
bedrooms: getValue('bedrooms'),
bathrooms: getValue('bathrooms'),
living_area_sqft: getValue('living_area_sqft'),
lot_size_sqft: getValue('lot_size_sqft'),
year_built: getValue('year_built'),
list_price: getValue('list_price'),
close_price: getValue('close_price'),
price_per_sqft: getValue('price_per_sqft'),
public_remarks: getValue('public_remarks'),
photo_count: raw.Media?.length || 0,
is_active: true,
raw: raw, // Keep original for debugging
};
}Postgres Upsert
async function upsertListings(
listings: CanonicalListing[]
): Promise<{ count: number }> {
const supabase = createServiceClient();
const { data, error } = await supabase
.from('listings')
.upsert(listings, {
onConflict: 'workspace_id,source_provider,source_listing_key',
ignoreDuplicates: false,
});
if (error) throw error;
return { count: listings.length };
}
async function upsertMedia(
workspaceId: string,
listingKey: string,
media: BridgeMedia[]
) {
const supabase = createServiceClient();
// Get listing ID
const { data: listing } = await supabase
.from('listings')
.select('id')
.eq('workspace_id', workspaceId)
.eq('source_listing_key', listingKey)
.single();
if (!listing) return;
const mediaRows = media.map((m, idx) => ({
workspace_id: workspaceId,
listing_id: listing.id,
url: m.MediaURL,
sort_order: m.Order ?? idx,
caption: m.MediaCategory || null,
}));
await supabase
.from('listing_media')
.upsert(mediaRows, {
onConflict: 'listing_id,url',
});
}Post-Sync Hooks
After upserting listings, the sync worker runs post-sync hooks to detect changes and trigger downstream systems.
Change Detection
upsertListings() compares incoming listings against existing data and returns:
statusChanges— Listings whosestandard_statuschanged (e.g., Active → Pending, Pending → Closed)priceChanges— Listings whoselist_pricechanged, with direction and percentage
interface StatusChange { workspace_id: string; listing_id: string; previous_status: string; new_status: string }
interface PriceChange { workspace_id: string; listing_id: string; previous_price: number; new_price: number; percent_change: number; direction: 'increase' | 'decrease' }Automation Trigger Evaluation
If features.automations is enabled, evaluateAutomationTriggers() is called with both status changes and price changes. It:
- Fetches enabled workspace rules for
status_changeandprice_changetrigger types - Matches changes against rule configurations (toStatuses, fromStatuses, direction, minPercentChange)
- Checks cooldown to prevent duplicate triggers
- Creates
automation_outboxproposals for matching rules
Only personal and team category listings are eligible. See Automations Reference for full details.
Legacy Draft Social Posts
If features.autoDraftPosts is enabled, createDraftProposals() runs separately for status changes only. This is the legacy system — new workspaces should use Automations.
Error Handling
Retry Strategy
async function withRetry<T>(
fn: () => Promise<T>,
maxAttempts = 3,
baseDelay = 1000
): Promise<T> {
let lastError: Error;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error;
if (attempt < maxAttempts) {
const delay = baseDelay * Math.pow(2, attempt - 1);
await sleep(delay);
}
}
}
throw lastError;
}Failure Recovery
- On failure, do not update checkpoint — retry will re-fetch same range
- Log error to
sync_runs.error - Set
mls_connections.statustoerror - Alert admin (future: webhook/email)
Scheduling
Options
-
Vercel Cron (simplest)
// vercel.json { "crons": [ { "path": "/api/cron/sync", "schedule": "0 * * * *" } ] } -
Supabase Edge Function + pg_cron
-
External scheduler (Railway, Render, etc.)
Cron Endpoint
// app/api/cron/sync/route.ts
export async function GET(request: NextRequest) {
// Verify cron secret
const authHeader = request.headers.get('authorization');
if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
return new Response('Unauthorized', { status: 401 });
}
// Get all active workspaces with hourly cadence
const workspaces = await getWorkspacesForSync('hourly');
const results = [];
for (const workspace of workspaces) {
try {
const result = await runIncrementalSync(workspace.id);
results.push({ workspace: workspace.slug, ...result });
} catch (error) {
results.push({ workspace: workspace.slug, error: error.message });
}
}
return NextResponse.json({ results });
}Token Encryption
Option 1: Supabase Vault (recommended)
-- Store token
SELECT vault.create_secret('bridge_token_workspace_123', 'actual_token_here');
-- Retrieve token
SELECT decrypted_secret
FROM vault.decrypted_secrets
WHERE name = 'bridge_token_workspace_123';Option 2: Application-level encryption
import { createCipheriv, createDecipheriv, randomBytes } from 'crypto';
const ALGORITHM = 'aes-256-gcm';
const KEY = Buffer.from(process.env.TOKEN_ENCRYPTION_KEY!, 'hex');
export function encryptToken(token: string): string {
const iv = randomBytes(16);
const cipher = createCipheriv(ALGORITHM, KEY, iv);
let encrypted = cipher.update(token, 'utf8', 'hex');
encrypted += cipher.final('hex');
const authTag = cipher.getAuthTag();
return `${iv.toString('hex')}:${authTag.toString('hex')}:${encrypted}`;
}
export function decryptToken(ciphertext: string): string {
const [ivHex, authTagHex, encrypted] = ciphertext.split(':');
const iv = Buffer.from(ivHex, 'hex');
const authTag = Buffer.from(authTagHex, 'hex');
const decipher = createDecipheriv(ALGORITHM, KEY, iv);
decipher.setAuthTag(authTag);
let decrypted = decipher.update(encrypted, 'hex', 'utf8');
decrypted += decipher.final('utf8');
return decrypted;
}