1 OAK MLS

Sync Worker

Bridge/RESO to Postgres to Typesense sync pipeline specification

Sync Worker Specification

Bridge/RESO → Postgres → Typesense sync pipeline.

Overview

The sync worker is responsible for:

  1. Connecting to Bridge Interactive API (RESO Web API / OData)
  2. Pulling listing data incrementally (using ModificationTimestamp)
  3. Normalizing data using field mappings
  4. Upserting to Postgres (canonical listings + media)
  5. Indexing to Typesense
  6. Recording sync runs for audit

Worker Interface

interface SyncWorker {
  // Main sync operation
  runSync(workspaceId: string, mode: 'incremental' | 'full'): Promise<SyncResult>;

  // Test API connection (validates token + dataset)
  testConnection(workspaceId: string): Promise<ConnectionTestResult>;

  // Validate field mapping (samples listings and checks required fields)
  validateMapping(workspaceId: string): Promise<MappingValidationResult>;
}

interface SyncResult {
  syncRunId: string;
  status: 'success' | 'error';
  stats: {
    fetched: number;
    upserted: number;
    indexed: number;
    errors: number;
  };
  error?: string;
  duration: number; // ms
}

interface ConnectionTestResult {
  success: boolean;
  message: string;
  sampleCount?: number; // number of listings in dataset
}

interface MappingValidationResult {
  valid: boolean;
  missingFields: string[];
  sampleData: Record<string, any>[];
}

Bridge API Integration

Authentication

Bridge uses server tokens (passed as query param or header).

const BRIDGE_BASE_URL = 'https://api.bridgedataoutput.com/api/v2/OData';

interface BridgeConfig {
  baseUrl: string;
  datasetId: string; // e.g., 'test' or actual dataset ID
  accessToken: string; // decrypted from mls_connections.token_ciphertext
}

function buildBridgeUrl(
  config: BridgeConfig,
  resource: string,
  params?: Record<string, string>
) {
  const url = new URL(`${config.baseUrl}/${config.datasetId}/${resource}`);
  url.searchParams.set('access_token', config.accessToken);

  if (params) {
    Object.entries(params).forEach(([key, value]) => {
      url.searchParams.set(key, value);
    });
  }

  return url.toString();
}

OData Query Parameters

ParameterUsage
$filterFilter listings by criteria
$selectChoose specific fields
$topLimit results (max 200)
$skipPagination offset
$orderbySort results
$expandInclude related entities (Media)

Incremental Sync Query

function buildIncrementalQuery(lastSyncedAt: Date | null): string {
  if (!lastSyncedAt) {
    // Full sync - no filter
    return '';
  }

  // ISO 8601 format for OData
  const timestamp = lastSyncedAt.toISOString();
  return `ModificationTimestamp gt ${timestamp}`;
}

async function fetchListings(
  config: BridgeConfig,
  filter?: string,
  skip = 0
): Promise<BridgeResponse> {
  const params: Record<string, string> = {
    '$top': '200',
    '$skip': skip.toString(),
    '$orderby': 'ModificationTimestamp asc',
    '$expand': 'Media',
  };

  if (filter) {
    params['$filter'] = filter;
  }

  const url = buildBridgeUrl(config, 'Property', params);
  const response = await fetch(url);

  if (!response.ok) {
    throw new Error(`Bridge API error: ${response.status} ${response.statusText}`);
  }

  return response.json();
}

Response Structure

interface BridgeResponse {
  '@odata.context': string;
  '@odata.count'?: number;
  '@odata.nextLink'?: string; // URL for next page
  value: BridgeListing[];
}

interface BridgeListing {
  ListingKey: string;
  ListingId?: string;
  StandardStatus: string;
  ModificationTimestamp: string;
  PropertyType: string;
  ListPrice: number;
  // ... many more RESO fields
  Media?: BridgeMedia[];
}

interface BridgeMedia {
  MediaKey: string;
  MediaURL: string;
  Order: number;
  MediaCategory?: string;
}

Sync Flow

Full Sync

async function runFullSync(workspaceId: string): Promise<SyncResult> {
  const startedAt = new Date();
  const syncRun = await createSyncRun(workspaceId, 'running');

  try {
    // 1. Load workspace config
    const workspace = await getWorkspace(workspaceId);
    const connection = await getMlsConnection(workspaceId);
    const mapping = await getActiveMapping(workspaceId);

    // 2. Decrypt token
    const accessToken = await decryptToken(connection.token_ciphertext);
    const config: BridgeConfig = {
      baseUrl: connection.base_url,
      datasetId: connection.dataset_id,
      accessToken,
    };

    // 3. Fetch all listings (paginated)
    const allListings: BridgeListing[] = [];
    let skip = 0;
    let hasMore = true;

    while (hasMore) {
      const response = await fetchListings(config, undefined, skip);
      allListings.push(...response.value);

      hasMore = response['@odata.nextLink'] !== undefined ||
                response.value.length === 200;
      skip += 200;

      // Rate limiting
      await sleep(100);
    }

    // 4. Transform & upsert to Postgres
    const normalized = allListings.map(l =>
      normalizeListing(l, mapping, workspaceId, connection)
    );
    const upsertResult = await upsertListings(normalized);

    // 5. Upsert media
    for (const listing of allListings) {
      if (listing.Media?.length) {
        await upsertMedia(workspaceId, listing.ListingKey, listing.Media);
      }
    }

    // 6. Index to Typesense
    const typesenseDocs = await prepareTypesenseDocs(workspaceId, normalized);
    await indexToTypesense(workspace.slug, typesenseDocs);

    // 7. Update checkpoint
    const latestModification = allListings.reduce((max, l) => {
      const ts = new Date(l.ModificationTimestamp);
      return ts > max ? ts : max;
    }, new Date(0));

    await updateMlsConnection(connection.id, {
      last_synced_at: latestModification,
      status: 'active',
    });

    // 8. Complete sync run
    const stats = {
      fetched: allListings.length,
      upserted: upsertResult.count,
      indexed: typesenseDocs.length,
      errors: 0,
    };

    await completeSyncRun(syncRun.id, 'success', stats);

    return {
      syncRunId: syncRun.id,
      status: 'success',
      stats,
      duration: Date.now() - startedAt.getTime(),
    };

  } catch (error) {
    await completeSyncRun(
      syncRun.id,
      'error',
      { fetched: 0, upserted: 0, indexed: 0, errors: 1 },
      error.message
    );
    throw error;
  }
}

Incremental Sync

async function runIncrementalSync(workspaceId: string): Promise<SyncResult> {
  const startedAt = new Date();
  const syncRun = await createSyncRun(workspaceId, 'running');

  try {
    const workspace = await getWorkspace(workspaceId);
    const connection = await getMlsConnection(workspaceId);
    const mapping = await getActiveMapping(workspaceId);

    const accessToken = await decryptToken(connection.token_ciphertext);
    const config: BridgeConfig = {
      baseUrl: connection.base_url,
      datasetId: connection.dataset_id,
      accessToken,
    };

    // Build filter from last sync timestamp
    const filter = buildIncrementalQuery(connection.last_synced_at);

    // Fetch modified listings
    const modifiedListings: BridgeListing[] = [];
    let skip = 0;
    let hasMore = true;

    while (hasMore) {
      const response = await fetchListings(config, filter, skip);
      modifiedListings.push(...response.value);

      hasMore = response['@odata.nextLink'] !== undefined ||
                response.value.length === 200;
      skip += 200;

      await sleep(100);
    }

    if (modifiedListings.length === 0) {
      // Nothing to sync
      await completeSyncRun(
        syncRun.id,
        'success',
        { fetched: 0, upserted: 0, indexed: 0, errors: 0 }
      );
      return {
        syncRunId: syncRun.id,
        status: 'success',
        stats: { fetched: 0, upserted: 0, indexed: 0, errors: 0 },
        duration: Date.now() - startedAt.getTime(),
      };
    }

    // Transform, upsert, index (same as full sync)
    const normalized = modifiedListings.map(l =>
      normalizeListing(l, mapping, workspaceId, connection)
    );
    const upsertResult = await upsertListings(normalized);

    for (const listing of modifiedListings) {
      if (listing.Media?.length) {
        await upsertMedia(workspaceId, listing.ListingKey, listing.Media);
      }
    }

    const typesenseDocs = await prepareTypesenseDocs(workspaceId, normalized);
    await indexToTypesense(workspace.slug, typesenseDocs);

    // Update checkpoint to latest modification
    const latestModification = modifiedListings.reduce((max, l) => {
      const ts = new Date(l.ModificationTimestamp);
      return ts > max ? ts : max;
    }, connection.last_synced_at || new Date(0));

    await updateMlsConnection(connection.id, {
      last_synced_at: latestModification,
      status: 'active',
    });

    const stats = {
      fetched: modifiedListings.length,
      upserted: upsertResult.count,
      indexed: typesenseDocs.length,
      errors: 0,
    };

    await completeSyncRun(syncRun.id, 'success', stats);

    return {
      syncRunId: syncRun.id,
      status: 'success',
      stats,
      duration: Date.now() - startedAt.getTime(),
    };

  } catch (error) {
    await completeSyncRun(
      syncRun.id,
      'error',
      { fetched: 0, upserted: 0, indexed: 0, errors: 1 },
      error.message
    );
    throw error;
  }
}

Field Normalization

interface FieldMapping {
  [canonicalField: string]: string; // RESO field name
}

function normalizeListing(
  raw: BridgeListing,
  mapping: FieldMapping,
  workspaceId: string,
  connection: MlsConnection
): CanonicalListing {
  const getValue = (canonical: string): any => {
    const resoField = mapping[canonical];
    return resoField ? raw[resoField] : undefined;
  };

  return {
    workspace_id: workspaceId,
    source_provider: 'bridge',
    source_dataset_id: connection.dataset_id,
    source_listing_key: raw.ListingKey,
    mls_number: getValue('mls_number') || raw.ListingId,

    standard_status: getValue('standard_status') || raw.StandardStatus,
    modification_ts: raw.ModificationTimestamp,
    list_date: getValue('list_date'),
    close_date: getValue('close_date'),

    address_full: getValue('address_full'),
    street_number: getValue('street_number'),
    street_name: getValue('street_name'),
    unit_number: getValue('unit_number'),
    city: getValue('city'),
    state: getValue('state'),
    postal_code: getValue('postal_code'),

    subdivision: getValue('subdivision'),
    building_name: getValue('building_name'),
    community: getValue('community'),
    county: getValue('county'),

    latitude: getValue('latitude'),
    longitude: getValue('longitude'),

    property_type: getValue('property_type'),
    bedrooms: getValue('bedrooms'),
    bathrooms: getValue('bathrooms'),
    living_area_sqft: getValue('living_area_sqft'),
    lot_size_sqft: getValue('lot_size_sqft'),
    year_built: getValue('year_built'),

    list_price: getValue('list_price'),
    close_price: getValue('close_price'),
    price_per_sqft: getValue('price_per_sqft'),

    public_remarks: getValue('public_remarks'),
    photo_count: raw.Media?.length || 0,

    is_active: true,
    raw: raw, // Keep original for debugging
  };
}

Postgres Upsert

async function upsertListings(
  listings: CanonicalListing[]
): Promise<{ count: number }> {
  const supabase = createServiceClient();

  const { data, error } = await supabase
    .from('listings')
    .upsert(listings, {
      onConflict: 'workspace_id,source_provider,source_listing_key',
      ignoreDuplicates: false,
    });

  if (error) throw error;

  return { count: listings.length };
}

async function upsertMedia(
  workspaceId: string,
  listingKey: string,
  media: BridgeMedia[]
) {
  const supabase = createServiceClient();

  // Get listing ID
  const { data: listing } = await supabase
    .from('listings')
    .select('id')
    .eq('workspace_id', workspaceId)
    .eq('source_listing_key', listingKey)
    .single();

  if (!listing) return;

  const mediaRows = media.map((m, idx) => ({
    workspace_id: workspaceId,
    listing_id: listing.id,
    url: m.MediaURL,
    sort_order: m.Order ?? idx,
    caption: m.MediaCategory || null,
  }));

  await supabase
    .from('listing_media')
    .upsert(mediaRows, {
      onConflict: 'listing_id,url',
    });
}

Error Handling

Retry Strategy

async function withRetry<T>(
  fn: () => Promise<T>,
  maxAttempts = 3,
  baseDelay = 1000
): Promise<T> {
  let lastError: Error;

  for (let attempt = 1; attempt <= maxAttempts; attempt++) {
    try {
      return await fn();
    } catch (error) {
      lastError = error;

      if (attempt < maxAttempts) {
        const delay = baseDelay * Math.pow(2, attempt - 1);
        await sleep(delay);
      }
    }
  }

  throw lastError;
}

Failure Recovery

  • On failure, do not update checkpoint — retry will re-fetch same range
  • Log error to sync_runs.error
  • Set mls_connections.status to error
  • Alert admin (future: webhook/email)

Scheduling

Options

  1. Vercel Cron (simplest)

    // vercel.json
    {
      "crons": [
        {
          "path": "/api/cron/sync",
          "schedule": "0 * * * *"
        }
      ]
    }
  2. Supabase Edge Function + pg_cron

  3. External scheduler (Railway, Render, etc.)

Cron Endpoint

// app/api/cron/sync/route.ts

export async function GET(request: NextRequest) {
  // Verify cron secret
  const authHeader = request.headers.get('authorization');
  if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
    return new Response('Unauthorized', { status: 401 });
  }

  // Get all active workspaces with hourly cadence
  const workspaces = await getWorkspacesForSync('hourly');

  const results = [];
  for (const workspace of workspaces) {
    try {
      const result = await runIncrementalSync(workspace.id);
      results.push({ workspace: workspace.slug, ...result });
    } catch (error) {
      results.push({ workspace: workspace.slug, error: error.message });
    }
  }

  return NextResponse.json({ results });
}

Token Encryption

-- Store token
SELECT vault.create_secret('bridge_token_workspace_123', 'actual_token_here');

-- Retrieve token
SELECT decrypted_secret
FROM vault.decrypted_secrets
WHERE name = 'bridge_token_workspace_123';

Option 2: Application-level encryption

import { createCipheriv, createDecipheriv, randomBytes } from 'crypto';

const ALGORITHM = 'aes-256-gcm';
const KEY = Buffer.from(process.env.TOKEN_ENCRYPTION_KEY!, 'hex');

export function encryptToken(token: string): string {
  const iv = randomBytes(16);
  const cipher = createCipheriv(ALGORITHM, KEY, iv);

  let encrypted = cipher.update(token, 'utf8', 'hex');
  encrypted += cipher.final('hex');

  const authTag = cipher.getAuthTag();

  return `${iv.toString('hex')}:${authTag.toString('hex')}:${encrypted}`;
}

export function decryptToken(ciphertext: string): string {
  const [ivHex, authTagHex, encrypted] = ciphertext.split(':');

  const iv = Buffer.from(ivHex, 'hex');
  const authTag = Buffer.from(authTagHex, 'hex');

  const decipher = createDecipheriv(ALGORITHM, KEY, iv);
  decipher.setAuthTag(authTag);

  let decrypted = decipher.update(encrypted, 'hex', 'utf8');
  decrypted += decipher.final('utf8');

  return decrypted;
}

On this page