Source: [
ebisu/docs/reference/schemas/drizzle schemas/crunchybridge_implementation_guide_reference_data.md](https://github.com/goldfish-inc/ebisu/blob/main/docs/reference/schemas/drizzle schemas/crunchybridge_implementation_guide_reference_data.md) | [✏️ Edit on GitHub](https://github.com/goldfish-inc/ebisu/edit/main/docs/reference/schemas/drizzle schemas/crunchybridge_implementation_guide_reference_data.md)
CrunchyBridge Implementation Guide for Reference Data
🎯 Overview
This guide walks through migrating your enhanced reference data system from local PostgreSQL to CrunchyBridge, including schema deployment, data migration, and historical change tracking setup.
📋 Prerequisites
1. CrunchyBridge Account Setup
# Install CrunchyBridge CLI
npm install -g @crunchydata/bridge-cli
# Login to your account
bridge login
2. Project Dependencies
# Core dependencies
npm install drizzle-orm drizzle-kit pg @types/pg
npm install -D tsx dotenv
# Optional: Database management tools
npm install postgres-migrations
3. Environment Configuration
# .env.production
DATABASE_URL="postgres://username:password@cluster-id.aws-us-east-1.crunchydata.com:5432/dbname?sslmode=require"
CRUNCHY_CLUSTER_ID="your-cluster-id"
CRUNCHY_DATABASE_NAME="your-database-name"
# Development/Local (for comparison)
DATABASE_URL_LOCAL="postgres://user:pass@localhost:5432/your_local_db"
🚀 Phase 1: CrunchyBridge Cluster Setup
1. Create Cluster
# Create a new cluster
bridge clusters create \
--name reference-data-prod \
--plan hobby-2 \
--region us-east-1
# Get connection details
bridge clusters show reference-data-prod
2. Database Configuration
-- Connect to your cluster and run initial setup
-- Enable required extensions
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "pg_trgm";
CREATE EXTENSION IF NOT EXISTS "btree_gin";
-- Create application user (optional but recommended)
CREATE USER app_user WITH PASSWORD 'secure_password_here';
GRANT CONNECT ON DATABASE your_db_name TO app_user;
🔧 Phase 2: Drizzle Schema Deployment
1. Project Structure
src/
├── db/
│ ├── schema/
│ │ ├── reference-data.ts # Your Drizzle schema
│ │ └── index.ts
│ ├── migrations/
│ ├── migrate.ts
│ └── connection.ts
├── scripts/
│ ├── migrate-data.ts
│ ├── verify-migration.ts
│ └── setup-historical-tracking.ts
└── drizzle.config.ts
2. Drizzle Configuration
// drizzle.config.ts
import type { Config } from 'drizzle-kit';
import { config } from 'dotenv';
config({ path: '.env.production' });
export default {
schema: './src/db/schema/*.ts',
out: './src/db/migrations',
driver: 'pg',
dbCredentials: {
connectionString: process.env.DATABASE_URL!,
ssl: true,
},
verbose: true,
strict: true,
} satisfies Config;
3. Database Connection
// src/db/connection.ts
import { drizzle } from 'drizzle-orm/postgres-js';
import postgres from 'postgres';
import * as schema from './schema';
const connectionString = process.env.DATABASE_URL!;
// Configure for CrunchyBridge
const sql = postgres(connectionString, {
ssl: 'require',
max: 10,
idle_timeout: 20,
connect_timeout: 30,
});
export const db = drizzle(sql, { schema });
export { sql };
4. Generate and Apply Migrations
# Generate migration files
npx drizzle-kit generate:pg
# Review generated migrations in src/db/migrations/
# Apply migrations to CrunchyBridge
npx drizzle-kit push:pg
📊 Phase 3: Data Migration
1. Export Data from Local PostgreSQL
// scripts/export-reference-data.ts
import { sql as localSql } from '../src/db/connection-local';
import fs from 'fs/promises';
const exportTables = [
'original_sources',
'country_iso',
'fao_major_areas',
'gear_types_fao',
'gear_types_cbp',
'vessel_types',
'vessel_hull_material',
'rfmos',
'gear_types_relationship_fao_cbp',
'country_iso_eu'
];
async function exportReferenceData() {
const exportData: Record<string, any[]> = {};
for (const table of exportTables) {
console.log(`📤 Exporting ${table}...`);
const result = await localSql.unsafe(`
SELECT * FROM ${table}
ORDER BY created_at ASC
`);
exportData[table] = result;
console.log(`✅ Exported ${result.length} records from ${table}`);
}
// Save to JSON file with timestamp
const timestamp = new Date().toISOString().split('T')[0];
const filename = `reference-data-export-${timestamp}.json`;
await fs.writeFile(filename, JSON.stringify(exportData, null, 2));
console.log(`💾 Export completed: ${filename}`);
return exportData;
}
exportReferenceData().catch(console.error);
2. Import Data to CrunchyBridge
// scripts/import-reference-data.ts
import { db } from '../src/db/connection';
import * as schema from '../src/db/schema';
import fs from 'fs/promises';
async function importReferenceData(filename: string) {
console.log(`📥 Loading data from ${filename}...`);
const data = JSON.parse(await fs.readFile(filename, 'utf-8'));
// Import in dependency order
const importOrder = [
{ table: 'original_sources', schema: schema.originalSources },
{ table: 'country_iso', schema: schema.countryIso },
{ table: 'fao_major_areas', schema: schema.faoMajorAreas },
{ table: 'gear_types_fao', schema: schema.gearTypesFao },
{ table: 'gear_types_cbp', schema: schema.gearTypesCbp },
{ table: 'vessel_types', schema: schema.vesselTypes },
{ table: 'vessel_hull_material', schema: schema.vesselHullMaterial },
{ table: 'rfmos', schema: schema.rfmos },
{ table: 'gear_types_relationship_fao_cbp', schema: schema.gearTypesRelationshipFaoCbp },
{ table: 'country_iso_eu', schema: schema.countryIsoEu },
];
for (const { table, schema: tableSchema } of importOrder) {
const records = data[table];
if (!records || records.length === 0) {
console.log(`⚠️ No data found for ${table}`);
continue;
}
console.log(`📤 Importing ${records.length} records to ${table}...`);
// Import in batches of 100
const batchSize = 100;
for (let i = 0; i < records.length; i += batchSize) {
const batch = records.slice(i, i + batchSize);
await db.insert(tableSchema).values(batch);
}
console.log(`✅ Successfully imported ${records.length} records to ${table}`);
}
console.log('🎉 Data import completed!');
}
// Run the import
const exportFile = process.argv[2] || `reference-data-export-${new Date().toISOString().split('T')[0]}.json`;
importReferenceData(exportFile).catch(console.error);
3. Verification Script
// scripts/verify-migration.ts
import { db } from '../src/db/connection';
import * as schema from '../src/db/schema';
import { count, eq } from 'drizzle-orm';
async function verifyMigration() {
console.log('🔍 Verifying migration...\n');
const tables = [
{ name: 'original_sources', schema: schema.originalSources },
{ name: 'country_iso', schema: schema.countryIso },
{ name: 'fao_major_areas', schema: schema.faoMajorAreas },
{ name: 'gear_types_fao', schema: schema.gearTypesFao },
{ name: 'gear_types_cbp', schema: schema.gearTypesCbp },
{ name: 'vessel_types', schema: schema.vesselTypes },
{ name: 'vessel_hull_material', schema: schema.vesselHullMaterial },
{ name: 'rfmos', schema: schema.rfmos },
{ name: 'gear_types_relationship_fao_cbp', schema: schema.gearTypesRelationshipFaoCbp },
{ name: 'country_iso_eu', schema: schema.countryIsoEu },
];
let totalRecords = 0;
for (const { name, schema: tableSchema } of tables) {
const [result] = await db.select({ count: count() }).from(tableSchema);
console.log(`📊 ${name}: ${result.count} records`);
totalRecords += result.count;
}
console.log(`\n📈 Total records: ${totalRecords}`);
// Test source tracking
console.log('\n🔗 Testing source relationships...');
const sourcesWithCounts = await db
.select({
shortname: schema.originalSources.sourceShortname,
status: schema.originalSources.status,
sizeApprox: schema.originalSources.sizeApprox,
refreshDate: schema.originalSources.refreshDate,
})
.from(schema.originalSources)
.where(eq(schema.originalSources.status, 'LOADED'));
sourcesWithCounts.forEach(source => {
console.log(`✅ ${source.shortname}: ${source.status} (${source.sizeApprox} records, refreshed: ${source.refreshDate})`);
});
// Test multi-type queries
console.log('\n🔍 Testing multi-type source queries...');
const gearSources = await db
.select({ shortname: schema.originalSources.sourceShortname })
.from(schema.originalSources)
.where(sql`'GEAR_DATA' = ANY(${schema.originalSources.sourceTypes})`);
console.log(`🔧 Sources with GEAR_DATA: ${gearSources.map(s => s.shortname).join(', ')}`);
console.log('\n🎉 Migration verification completed!');
}
verifyMigration().catch(console.error);
📈 Phase 4: Historical Change Tracking Setup
1. Historical Tables Schema
// src/db/schema/historical-tracking.ts
import { pgTable, uuid, text, timestamp, jsonb, index } from 'drizzle-orm/pg-core';
export const historicalChanges = pgTable('historical_changes', {
id: uuid('id').primaryKey().defaultRandom(),
tableName: text('table_name').notNull(),
recordId: uuid('record_id').notNull(),
operation: text('operation').notNull(), // 'INSERT', 'UPDATE', 'DELETE'
oldValues: jsonb('old_values'),
newValues: jsonb('new_values'),
changedBy: text('changed_by'),
changedAt: timestamp('changed_at').defaultNow(),
changeReason: text('change_reason'),
}, (table) => ({
tableNameIdx: index('idx_historical_table_name').on(table.tableName),
recordIdIdx: index('idx_historical_record_id').on(table.recordId),
changedAtIdx: index('idx_historical_changed_at').on(table.changedAt),
tableRecordIdx: index('idx_historical_table_record').on(table.tableName, table.recordId),
}));
export type HistoricalChange = typeof historicalChanges.$inferSelect;
export type NewHistoricalChange = typeof historicalChanges.$inferInsert;
2. Trigger Functions Setup
-- scripts/setup-historical-triggers.sql
-- Generic function to track changes
CREATE OR REPLACE FUNCTION track_historical_changes()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'DELETE' THEN
INSERT INTO historical_changes (table_name, record_id, operation, old_values, changed_at)
VALUES (TG_TABLE_NAME, OLD.id, 'DELETE', to_jsonb(OLD), NOW());
RETURN OLD;
ELSIF TG_OP = 'UPDATE' THEN
INSERT INTO historical_changes (table_name, record_id, operation, old_values, new_values, changed_at)
VALUES (TG_TABLE_NAME, NEW.id, 'UPDATE', to_jsonb(OLD), to_jsonb(NEW), NOW());
RETURN NEW;
ELSIF TG_OP = 'INSERT' THEN
INSERT INTO historical_changes (table_name, record_id, operation, new_values, changed_at)
VALUES (TG_TABLE_NAME, NEW.id, 'INSERT', to_jsonb(NEW), NOW());
RETURN NEW;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
-- Create triggers for each table you want to track
CREATE TRIGGER country_iso_history_trigger
AFTER INSERT OR UPDATE OR DELETE ON country_iso
FOR EACH ROW EXECUTE FUNCTION track_historical_changes();
CREATE TRIGGER fao_major_areas_history_trigger
AFTER INSERT OR UPDATE OR DELETE ON fao_major_areas
FOR EACH ROW EXECUTE FUNCTION track_historical_changes();
-- Add more triggers as needed...
3. Historical Tracking Service
// src/services/historical-tracking.ts
import { db } from '../db/connection';
import { historicalChanges } from '../db/schema/historical-tracking';
import { gte, lte, eq, and } from 'drizzle-orm';
export class HistoricalTrackingService {
async getTableHistory(tableName: string, recordId?: string, days: number = 30) {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - days);
const whereConditions = [
eq(historicalChanges.tableName, tableName),
gte(historicalChanges.changedAt, cutoffDate)
];
if (recordId) {
whereConditions.push(eq(historicalChanges.recordId, recordId));
}
return await db
.select()
.from(historicalChanges)
.where(and(...whereConditions))
.orderBy(historicalChanges.changedAt);
}
async getChangesByDateRange(startDate: Date, endDate: Date) {
return await db
.select()
.from(historicalChanges)
.where(and(
gte(historicalChanges.changedAt, startDate),
lte(historicalChanges.changedAt, endDate)
))
.orderBy(historicalChanges.changedAt);
}
async cleanupOldChanges(retentionDays: number = 365) {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - retentionDays);
const result = await db
.delete(historicalChanges)
.where(lte(historicalChanges.changedAt, cutoffDate));
console.log(`🧹 Cleaned up historical changes older than ${retentionDays} days`);
return result;
}
}
🔧 Phase 5: Production Setup
1. Connection Pooling
// src/db/connection-pool.ts
import { drizzle } from 'drizzle-orm/postgres-js';
import postgres from 'postgres';
import * as schema from './schema';
// Production connection with proper pooling
const sql = postgres(process.env.DATABASE_URL!, {
ssl: 'require',
max: 20, // Maximum connections
idle_timeout: 20, // Close idle connections after 20s
connect_timeout: 30, // Connection timeout
prepare: false, // Disable prepared statements for connection pooling
});
export const db = drizzle(sql, {
schema,
logger: process.env.NODE_ENV === 'development'
});
// Graceful shutdown
process.on('SIGINT', async () => {
console.log('🔌 Closing database connections...');
await sql.end();
process.exit(0);
});
2. Environment-Specific Configuration
// src/config/database.ts
interface DatabaseConfig {
connectionString: string;
ssl: boolean;
maxConnections: number;
idleTimeout: number;
connectTimeout: number;
}
const configs: Record<string, DatabaseConfig> = {
development: {
connectionString: process.env.DATABASE_URL_LOCAL!,
ssl: false,
maxConnections: 5,
idleTimeout: 20,
connectTimeout: 10,
},
production: {
connectionString: process.env.DATABASE_URL!,
ssl: true,
maxConnections: 20,
idleTimeout: 20,
connectTimeout: 30,
},
};
export const dbConfig = configs[process.env.NODE_ENV || 'development'];
3. Health Check Endpoint
// src/health/database.ts
import { db } from '../db/connection';
import { originalSources } from '../db/schema';
import { count } from 'drizzle-orm';
export async function checkDatabaseHealth() {
try {
// Test basic connectivity
const [result] = await db.select({ count: count() }).from(originalSources);
return {
status: 'healthy',
timestamp: new Date().toISOString(),
recordCount: result.count,
latency: 'measured_in_app', // Add actual latency measurement
};
} catch (error) {
return {
status: 'unhealthy',
timestamp: new Date().toISOString(),
error: error instanceof Error ? error.message : 'Unknown error',
};
}
}
📊 Phase 6: Monitoring & Maintenance
1. Monitoring Script
// scripts/monitor-reference-data.ts
import { db } from '../src/db/connection';
import * as schema from '../src/db/schema';
import { count, eq, lt } from 'drizzle-orm';
async function monitorReferenceData() {
console.log('📊 Reference Data Health Check');
console.log('==============================\n');
// Check source statuses
const sources = await db
.select({
shortname: schema.originalSources.sourceShortname,
status: schema.originalSources.status,
refreshDate: schema.originalSources.refreshDate,
sizeApprox: schema.originalSources.sizeApprox,
})
.from(schema.originalSources);
const pendingSources = sources.filter(s => s.status === 'PENDING');
const failedSources = sources.filter(s => s.status === 'FAILED');
const staleSources = sources.filter(s => {
if (!s.refreshDate) return true;
const daysSinceRefresh = (Date.now() - s.refreshDate.getTime()) / (1000 * 60 * 60 * 24);
return daysSinceRefresh > 90; // 3 months
});
console.log(`✅ Total sources: ${sources.length}`);
console.log(`⏳ Pending sources: ${pendingSources.length}`);
console.log(`❌ Failed sources: ${failedSources.length}`);
console.log(`⚠️ Stale sources (>90 days): ${staleSources.length}\n`);
if (failedSources.length > 0) {
console.log('❌ Failed Sources:');
failedSources.forEach(s => console.log(` - ${s.shortname}`));
console.log();
}
if (staleSources.length > 0) {
console.log('⚠️ Stale Sources:');
staleSources.forEach(s => console.log(` - ${s.shortname} (last refresh: ${s.refreshDate})`));
console.log();
}
// Check data integrity
const [countriesCount] = await db.select({ count: count() }).from(schema.countryIso);
const [gearRelCount] = await db.select({ count: count() }).from(schema.gearTypesRelationshipFaoCbp);
console.log('📈 Data Integrity:');
console.log(` Countries: ${countriesCount.count} (expected: ~249)`);
console.log(` Gear relationships: ${gearRelCount.count} (expected: ~69)`);
console.log('\n🎉 Health check completed!');
}
// Schedule this to run daily
monitorReferenceData().catch(console.error);
2. Backup Strategy
#!/bin/bash
# scripts/backup-reference-data.sh
# Daily backup of reference data
BACKUP_DIR="/backups/reference-data"
DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="$BACKUP_DIR/reference-data-$DATE.sql"
mkdir -p $BACKUP_DIR
# Use CrunchyBridge connection string
pg_dump $DATABASE_URL \
--verbose \
--no-owner \
--no-privileges \
--table="original_sources*" \
--table="country_iso*" \
--table="fao_major_areas*" \
--table="gear_types_*" \
--table="vessel_*" \
--table="rfmos*" \
--table="historical_changes*" \
> $BACKUP_FILE
# Compress backup
gzip $BACKUP_FILE
# Keep only last 30 days of backups
find $BACKUP_DIR -name "*.sql.gz" -mtime +30 -delete
echo "✅ Backup completed: $BACKUP_FILE.gz"
🚀 Deployment Checklist
Pre-Migration
- CrunchyBridge cluster created and configured
- Environment variables set up
- Local data export completed and verified
- Schema migrations generated and reviewed
Migration Day
- Apply schema migrations to CrunchyBridge
- Import reference data in correct order
- Run verification scripts
- Set up historical change tracking
- Configure monitoring and health checks
Post-Migration
- Update application connection strings
- Test all reference data queries
- Verify foreign key relationships
- Set up automated backups
- Configure alerts for failed sources
- Document rollback procedures
Ongoing Maintenance
- Daily health checks
- Weekly source status reviews
- Monthly historical data cleanup
- Quarterly performance analysis
🔄 Next Steps for Historical Change Tracking
Tomorrow's implementation should focus on:
- Deploy historical tracking schema to CrunchyBridge
- Set up trigger functions for change capture
- Implement change tracking service with TypeScript
- Create historical data API endpoints for querying changes
- Test change tracking with sample data updates
This foundation provides a robust, scalable reference data system ready for production use with full historical tracking capabilities! 🎉