1 OAK MLS

Sync Worker

Bridge/RESO to Postgres to Typesense sync pipeline specification

Sync Worker Specification

Bridge/RESO → Postgres → Typesense sync pipeline.

Overview

The sync worker is responsible for:

  1. Connecting to Bridge Interactive API (RESO Web API / OData)
  2. Pulling listing data incrementally (using ModificationTimestamp)
  3. Normalizing data using field mappings
  4. Upserting to Postgres (canonical listings + media)
  5. Indexing to Typesense
  6. Recording sync runs for audit

Worker Interface

interface SyncWorker {
  // Main sync operation
  runSync(workspaceId: string, mode: 'incremental' | 'full'): Promise<SyncResult>;

  // Test API connection (validates token + dataset)
  testConnection(workspaceId: string): Promise<ConnectionTestResult>;

  // Validate field mapping (samples listings and checks required fields)
  validateMapping(workspaceId: string): Promise<MappingValidationResult>;
}

interface SyncResult {
  syncRunId: string;
  status: 'success' | 'error';
  stats: {
    fetched: number;
    upserted: number;
    indexed: number;
    errors: number;
  };
  error?: string;
  duration: number; // ms
}

interface ConnectionTestResult {
  success: boolean;
  message: string;
  sampleCount?: number; // number of listings in dataset
}

interface MappingValidationResult {
  valid: boolean;
  missingFields: string[];
  sampleData: Record<string, any>[];
}

Bridge API Integration

Authentication

Bridge uses server tokens (passed as query param or header).

const BRIDGE_BASE_URL = 'https://api.bridgedataoutput.com/api/v2/OData';

interface BridgeConfig {
  baseUrl: string;
  datasetId: string; // e.g., 'test' or actual dataset ID
  accessToken: string; // decrypted from mls_connections.token_ciphertext
}

function buildBridgeUrl(
  config: BridgeConfig,
  resource: string,
  params?: Record<string, string>
) {
  const url = new URL(`${config.baseUrl}/${config.datasetId}/${resource}`);
  url.searchParams.set('access_token', config.accessToken);

  if (params) {
    Object.entries(params).forEach(([key, value]) => {
      url.searchParams.set(key, value);
    });
  }

  return url.toString();
}

OData Query Parameters

ParameterUsage
$filterFilter listings by criteria
$selectChoose specific fields
$topLimit results (max 200)
$skipPagination offset
$orderbySort results
$expandInclude related entities

Do NOT use $expand=Media

Using $expand=Media in Bridge API queries causes MediaURL to return null for all media items. Media is already embedded in the Property response as a Media array — no expansion is needed. This was confirmed by Bridge Interactive Support. See Bridge API Reference for details.

Incremental Sync Query

function buildIncrementalQuery(lastSyncedAt: Date | null): string {
  if (!lastSyncedAt) {
    // Full sync - no filter
    return '';
  }

  // ISO 8601 format for OData
  const timestamp = lastSyncedAt.toISOString();
  return `ModificationTimestamp gt ${timestamp}`;
}

async function fetchListings(
  config: BridgeConfig,
  filter?: string,
  skip = 0
): Promise<BridgeResponse> {
  const params: Record<string, string> = {
    '$top': '200',
    '$skip': skip.toString(),
    '$orderby': 'ModificationTimestamp asc',
  };

  if (filter) {
    params['$filter'] = filter;
  }

  const url = buildBridgeUrl(config, 'Property', params);
  const response = await fetch(url);

  if (!response.ok) {
    throw new Error(`Bridge API error: ${response.status} ${response.statusText}`);
  }

  return response.json();
}

Response Structure

interface BridgeResponse {
  '@odata.context': string;
  '@odata.count'?: number;
  '@odata.nextLink'?: string; // URL for next page
  value: BridgeListing[];
}

interface BridgeListing {
  ListingKey: string;
  ListingId?: string;
  StandardStatus: string;
  ModificationTimestamp: string;
  PropertyType: string;
  ListPrice: number;
  // ... many more RESO fields
  Media?: BridgeMedia[];
}

interface BridgeMedia {
  MediaKey: string;
  MediaURL: string;
  Order: number;
  MediaCategory?: string;
}

Sync Flow

Full Sync

async function runFullSync(workspaceId: string): Promise<SyncResult> {
  const startedAt = new Date();
  const syncRun = await createSyncRun(workspaceId, 'running');

  try {
    // 1. Load workspace config
    const workspace = await getWorkspace(workspaceId);
    const connection = await getMlsConnection(workspaceId);
    const mapping = await getActiveMapping(workspaceId);

    // 2. Decrypt token
    const accessToken = await decryptToken(connection.token_ciphertext);
    const config: BridgeConfig = {
      baseUrl: connection.base_url,
      datasetId: connection.dataset_id,
      accessToken,
    };

    // 3. Fetch all listings (paginated)
    const allListings: BridgeListing[] = [];
    let skip = 0;
    let hasMore = true;

    while (hasMore) {
      const response = await fetchListings(config, undefined, skip);
      allListings.push(...response.value);

      hasMore = response['@odata.nextLink'] !== undefined ||
                response.value.length === 200;
      skip += 200;

      // Rate limiting
      await sleep(100);
    }

    // 4. Transform & upsert to Postgres
    const normalized = allListings.map(l =>
      normalizeListing(l, mapping, workspaceId, connection)
    );
    const upsertResult = await upsertListings(normalized);

    // 5. Upsert media
    for (const listing of allListings) {
      if (listing.Media?.length) {
        await upsertMedia(workspaceId, listing.ListingKey, listing.Media);
      }
    }

    // 6. Index to Typesense
    const typesenseDocs = await prepareTypesenseDocs(workspaceId, normalized);
    await indexToTypesense(workspace.slug, typesenseDocs);

    // 7. Update checkpoint
    const latestModification = allListings.reduce((max, l) => {
      const ts = new Date(l.ModificationTimestamp);
      return ts > max ? ts : max;
    }, new Date(0));

    await updateMlsConnection(connection.id, {
      last_synced_at: latestModification,
      status: 'active',
    });

    // 8. Complete sync run
    const stats = {
      fetched: allListings.length,
      upserted: upsertResult.count,
      indexed: typesenseDocs.length,
      errors: 0,
    };

    await completeSyncRun(syncRun.id, 'success', stats);

    return {
      syncRunId: syncRun.id,
      status: 'success',
      stats,
      duration: Date.now() - startedAt.getTime(),
    };

  } catch (error) {
    await completeSyncRun(
      syncRun.id,
      'error',
      { fetched: 0, upserted: 0, indexed: 0, errors: 1 },
      error.message
    );
    throw error;
  }
}

Incremental Sync

async function runIncrementalSync(workspaceId: string): Promise<SyncResult> {
  const startedAt = new Date();
  const syncRun = await createSyncRun(workspaceId, 'running');

  try {
    const workspace = await getWorkspace(workspaceId);
    const connection = await getMlsConnection(workspaceId);
    const mapping = await getActiveMapping(workspaceId);

    const accessToken = await decryptToken(connection.token_ciphertext);
    const config: BridgeConfig = {
      baseUrl: connection.base_url,
      datasetId: connection.dataset_id,
      accessToken,
    };

    // Build filter from last sync timestamp
    const filter = buildIncrementalQuery(connection.last_synced_at);

    // Fetch modified listings
    const modifiedListings: BridgeListing[] = [];
    let skip = 0;
    let hasMore = true;

    while (hasMore) {
      const response = await fetchListings(config, filter, skip);
      modifiedListings.push(...response.value);

      hasMore = response['@odata.nextLink'] !== undefined ||
                response.value.length === 200;
      skip += 200;

      await sleep(100);
    }

    if (modifiedListings.length === 0) {
      // Nothing to sync
      await completeSyncRun(
        syncRun.id,
        'success',
        { fetched: 0, upserted: 0, indexed: 0, errors: 0 }
      );
      return {
        syncRunId: syncRun.id,
        status: 'success',
        stats: { fetched: 0, upserted: 0, indexed: 0, errors: 0 },
        duration: Date.now() - startedAt.getTime(),
      };
    }

    // Transform, upsert, index (same as full sync)
    const normalized = modifiedListings.map(l =>
      normalizeListing(l, mapping, workspaceId, connection)
    );
    const upsertResult = await upsertListings(normalized);

    for (const listing of modifiedListings) {
      if (listing.Media?.length) {
        await upsertMedia(workspaceId, listing.ListingKey, listing.Media);
      }
    }

    const typesenseDocs = await prepareTypesenseDocs(workspaceId, normalized);
    await indexToTypesense(workspace.slug, typesenseDocs);

    // Update checkpoint to latest modification
    const latestModification = modifiedListings.reduce((max, l) => {
      const ts = new Date(l.ModificationTimestamp);
      return ts > max ? ts : max;
    }, connection.last_synced_at || new Date(0));

    await updateMlsConnection(connection.id, {
      last_synced_at: latestModification,
      status: 'active',
    });

    const stats = {
      fetched: modifiedListings.length,
      upserted: upsertResult.count,
      indexed: typesenseDocs.length,
      errors: 0,
    };

    await completeSyncRun(syncRun.id, 'success', stats);

    return {
      syncRunId: syncRun.id,
      status: 'success',
      stats,
      duration: Date.now() - startedAt.getTime(),
    };

  } catch (error) {
    await completeSyncRun(
      syncRun.id,
      'error',
      { fetched: 0, upserted: 0, indexed: 0, errors: 1 },
      error.message
    );
    throw error;
  }
}

Field Normalization

interface FieldMapping {
  [canonicalField: string]: string; // RESO field name
}

function normalizeListing(
  raw: BridgeListing,
  mapping: FieldMapping,
  workspaceId: string,
  connection: MlsConnection
): CanonicalListing {
  const getValue = (canonical: string): any => {
    const resoField = mapping[canonical];
    return resoField ? raw[resoField] : undefined;
  };

  return {
    workspace_id: workspaceId,
    source_provider: 'bridge',
    source_dataset_id: connection.dataset_id,
    source_listing_key: raw.ListingKey,
    mls_number: getValue('mls_number') || raw.ListingId,

    standard_status: getValue('standard_status') || raw.StandardStatus,
    modification_ts: raw.ModificationTimestamp,
    list_date: getValue('list_date'),
    close_date: getValue('close_date'),

    address_full: getValue('address_full'),
    street_number: getValue('street_number'),
    street_name: getValue('street_name'),
    unit_number: getValue('unit_number'),
    city: getValue('city'),
    state: getValue('state'),
    postal_code: getValue('postal_code'),

    subdivision: getValue('subdivision'),
    building_name: getValue('building_name'),
    community: getValue('community'),
    county: getValue('county'),

    latitude: getValue('latitude'),
    longitude: getValue('longitude'),

    property_type: getValue('property_type'),
    bedrooms: getValue('bedrooms'),
    bathrooms: getValue('bathrooms'),
    living_area_sqft: getValue('living_area_sqft'),
    lot_size_sqft: getValue('lot_size_sqft'),
    year_built: getValue('year_built'),

    list_price: getValue('list_price'),
    close_price: getValue('close_price'),
    price_per_sqft: getValue('price_per_sqft'),

    public_remarks: getValue('public_remarks'),
    photo_count: raw.Media?.length || 0,

    is_active: true,
    raw: raw, // Keep original for debugging
  };
}

Postgres Upsert

async function upsertListings(
  listings: CanonicalListing[]
): Promise<{ count: number }> {
  const supabase = createServiceClient();

  const { data, error } = await supabase
    .from('listings')
    .upsert(listings, {
      onConflict: 'workspace_id,source_provider,source_listing_key',
      ignoreDuplicates: false,
    });

  if (error) throw error;

  return { count: listings.length };
}

async function upsertMedia(
  workspaceId: string,
  listingKey: string,
  media: BridgeMedia[]
) {
  const supabase = createServiceClient();

  // Get listing ID
  const { data: listing } = await supabase
    .from('listings')
    .select('id')
    .eq('workspace_id', workspaceId)
    .eq('source_listing_key', listingKey)
    .single();

  if (!listing) return;

  const mediaRows = media.map((m, idx) => ({
    workspace_id: workspaceId,
    listing_id: listing.id,
    url: m.MediaURL,
    sort_order: m.Order ?? idx,
    caption: m.MediaCategory || null,
  }));

  await supabase
    .from('listing_media')
    .upsert(mediaRows, {
      onConflict: 'listing_id,url',
    });
}

Post-Sync Hooks

After upserting listings, the sync worker runs post-sync hooks to detect changes and trigger downstream systems.

Change Detection

upsertListings() compares incoming listings against existing data and returns:

  • statusChanges — Listings whose standard_status changed (e.g., Active → Pending, Pending → Closed)
  • priceChanges — Listings whose list_price changed, with direction and percentage
interface StatusChange { workspace_id: string; listing_id: string; previous_status: string; new_status: string }
interface PriceChange { workspace_id: string; listing_id: string; previous_price: number; new_price: number; percent_change: number; direction: 'increase' | 'decrease' }

Automation Trigger Evaluation

If features.automations is enabled, evaluateAutomationTriggers() is called with both status changes and price changes. It:

  1. Fetches enabled workspace rules for status_change and price_change trigger types
  2. Matches changes against rule configurations (toStatuses, fromStatuses, direction, minPercentChange)
  3. Checks cooldown to prevent duplicate triggers
  4. Creates automation_outbox proposals for matching rules

Only personal and team category listings are eligible. See Automations Reference for full details.

Legacy Draft Social Posts

If features.autoDraftPosts is enabled, createDraftProposals() runs separately for status changes only. This is the legacy system — new workspaces should use Automations.

Error Handling

Retry Strategy

async function withRetry<T>(
  fn: () => Promise<T>,
  maxAttempts = 3,
  baseDelay = 1000
): Promise<T> {
  let lastError: Error;

  for (let attempt = 1; attempt <= maxAttempts; attempt++) {
    try {
      return await fn();
    } catch (error) {
      lastError = error;

      if (attempt < maxAttempts) {
        const delay = baseDelay * Math.pow(2, attempt - 1);
        await sleep(delay);
      }
    }
  }

  throw lastError;
}

Failure Recovery

  • On failure, do not update checkpoint — retry will re-fetch same range
  • Log error to sync_runs.error
  • Set mls_connections.status to error
  • Alert admin (future: webhook/email)

Scheduling

Options

  1. Vercel Cron (simplest)

    // vercel.json
    {
      "crons": [
        {
          "path": "/api/cron/sync",
          "schedule": "0 * * * *"
        }
      ]
    }
  2. Supabase Edge Function + pg_cron

  3. External scheduler (Railway, Render, etc.)

Cron Endpoint

// app/api/cron/sync/route.ts

export async function GET(request: NextRequest) {
  // Verify cron secret
  const authHeader = request.headers.get('authorization');
  if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
    return new Response('Unauthorized', { status: 401 });
  }

  // Get all active workspaces with hourly cadence
  const workspaces = await getWorkspacesForSync('hourly');

  const results = [];
  for (const workspace of workspaces) {
    try {
      const result = await runIncrementalSync(workspace.id);
      results.push({ workspace: workspace.slug, ...result });
    } catch (error) {
      results.push({ workspace: workspace.slug, error: error.message });
    }
  }

  return NextResponse.json({ results });
}

Token Encryption

-- Store token
SELECT vault.create_secret('bridge_token_workspace_123', 'actual_token_here');

-- Retrieve token
SELECT decrypted_secret
FROM vault.decrypted_secrets
WHERE name = 'bridge_token_workspace_123';

Option 2: Application-level encryption

import { createCipheriv, createDecipheriv, randomBytes } from 'crypto';

const ALGORITHM = 'aes-256-gcm';
const KEY = Buffer.from(process.env.TOKEN_ENCRYPTION_KEY!, 'hex');

export function encryptToken(token: string): string {
  const iv = randomBytes(16);
  const cipher = createCipheriv(ALGORITHM, KEY, iv);

  let encrypted = cipher.update(token, 'utf8', 'hex');
  encrypted += cipher.final('hex');

  const authTag = cipher.getAuthTag();

  return `${iv.toString('hex')}:${authTag.toString('hex')}:${encrypted}`;
}

export function decryptToken(ciphertext: string): string {
  const [ivHex, authTagHex, encrypted] = ciphertext.split(':');

  const iv = Buffer.from(ivHex, 'hex');
  const authTag = Buffer.from(authTagHex, 'hex');

  const decipher = createDecipheriv(ALGORITHM, KEY, iv);
  decipher.setAuthTag(authTag);

  let decrypted = decipher.update(encrypted, 'hex', 'utf8');
  decrypted += decipher.final('utf8');

  return decrypted;
}

On this page