Source:
ebisu/docs/adr/0058-phase1-ongoing-data-management.md| ✏️ Edit on GitHub
ADR-0058: Phase 1 Architecture for Ongoing Data Management
Status
Accepted
Context
We need to ensure our Phase 1 staged intelligence import architecture can handle:
- New data imports - Monthly/quarterly updates from RFMOs
- Historical preservation - Keep all versions for trend analysis
- Change detection - Identify what's new/changed/removed
- Incremental updates - Don't reprocess unchanged data
- Data lineage - Track source, version, and import history
Current state:
intelligence_reportsstores raw data with timestampsintelligence_import_batchestracks import runsdata_hashenables change detection and confirmation tracking (NOT duplicate removal)- Multiple identical reports increase confidence - they are confirmations, not duplicates
- Missing: Version tracking, change detection, update strategies
Decision
Enhance Phase 1 architecture to support ongoing data management without disrupting existing imports.
1. Version Tracking Enhancement
-- Add versioning to intelligence_import_batches
ALTER TABLE intelligence_import_batches ADD COLUMN IF NOT EXISTS
source_version TEXT, -- "2025-09", "2025-Q3", version identifier
previous_batch_id UUID REFERENCES intelligence_import_batches(batch_id),
is_full_refresh BOOLEAN DEFAULT TRUE,
is_incremental BOOLEAN DEFAULT FALSE;
-- Create source version tracking
CREATE TABLE IF NOT EXISTS source_data_versions (
version_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_id UUID REFERENCES original_sources_vessels(source_id),
version_identifier TEXT NOT NULL, -- "2025-09", "v3.2", etc
version_date DATE NOT NULL,
file_path TEXT,
file_hash TEXT,
record_count INTEGER,
download_date TIMESTAMP DEFAULT NOW(),
is_current BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(source_id, version_identifier)
);
-- Index for finding current version
CREATE INDEX idx_source_versions_current
ON source_data_versions(source_id, is_current)
WHERE is_current = TRUE;
2. Change Detection System
-- Track changes between imports
CREATE TABLE IF NOT EXISTS intelligence_change_log (
change_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_id UUID REFERENCES original_sources_vessels(source_id),
current_batch_id UUID REFERENCES intelligence_import_batches(batch_id),
previous_batch_id UUID REFERENCES intelligence_import_batches(batch_id),
vessel_identifier JSONB, -- {imo: "1234567", name: "VESSEL NAME"}
change_type change_type_enum NOT NULL, -- NEW, UPDATED, REMOVED, UNCHANGED
changed_fields JSONB, -- ["owner_name", "flag", "status"]
previous_data JSONB,
current_data JSONB,
change_detected_at TIMESTAMP DEFAULT NOW()
);
CREATE TYPE change_type_enum AS ENUM ('NEW', 'UPDATED', 'REMOVED', 'UNCHANGED');
-- Index for change analysis
CREATE INDEX idx_change_log_vessel ON intelligence_change_log
USING gin(vessel_identifier);
CREATE INDEX idx_change_log_type ON intelligence_change_log(change_type);
3. Update Strategy Functions
-- Function to handle incremental updates
CREATE OR REPLACE FUNCTION process_incremental_import(
p_source_id UUID,
p_new_batch_id UUID,
p_previous_batch_id UUID
) RETURNS TABLE (
new_records INTEGER,
updated_records INTEGER,
removed_records INTEGER,
unchanged_records INTEGER
) AS $$
DECLARE
v_new_count INTEGER := 0;
v_updated_count INTEGER := 0;
v_removed_count INTEGER := 0;
v_unchanged_count INTEGER := 0;
BEGIN
-- Find NEW records (exist in new but not in previous)
INSERT INTO intelligence_change_log (
source_id, current_batch_id, previous_batch_id,
vessel_identifier, change_type, current_data
)
SELECT
p_source_id,
p_new_batch_id,
p_previous_batch_id,
jsonb_build_object(
'imo', new.raw_vessel_data->>'imo',
'name', new.raw_vessel_data->>'vessel_name'
),
'NEW'::change_type_enum,
new.raw_vessel_data
FROM intelligence_reports new
WHERE new.import_batch_id = p_new_batch_id
AND NOT EXISTS (
SELECT 1 FROM intelligence_reports old
WHERE old.import_batch_id = p_previous_batch_id
AND old.data_hash = new.data_hash
);
GET DIAGNOSTICS v_new_count = ROW_COUNT;
-- Find UPDATED records (hash changed)
WITH vessel_matches AS (
SELECT
new.report_id as new_report_id,
old.report_id as old_report_id,
new.raw_vessel_data as new_data,
old.raw_vessel_data as old_data
FROM intelligence_reports new
JOIN intelligence_reports old ON
-- Match by IMO if available
(new.raw_vessel_data->>'imo' = old.raw_vessel_data->>'imo'
AND new.raw_vessel_data->>'imo' IS NOT NULL)
OR
-- Otherwise match by name
(new.raw_vessel_data->>'vessel_name' = old.raw_vessel_data->>'vessel_name')
WHERE new.import_batch_id = p_new_batch_id
AND old.import_batch_id = p_previous_batch_id
AND new.data_hash != old.data_hash
)
INSERT INTO intelligence_change_log (
source_id, current_batch_id, previous_batch_id,
vessel_identifier, change_type,
changed_fields, previous_data, current_data
)
SELECT
p_source_id,
p_new_batch_id,
p_previous_batch_id,
jsonb_build_object(
'imo', COALESCE(new_data->>'imo', old_data->>'imo'),
'name', COALESCE(new_data->>'vessel_name', old_data->>'vessel_name')
),
'UPDATED'::change_type_enum,
-- Detect which fields changed
(SELECT jsonb_agg(key)
FROM jsonb_each(old_data) o
FULL OUTER JOIN jsonb_each(new_data) n USING (key)
WHERE o.value IS DISTINCT FROM n.value),
old_data,
new_data
FROM vessel_matches;
GET DIAGNOSTICS v_updated_count = ROW_COUNT;
-- Find REMOVED records
INSERT INTO intelligence_change_log (
source_id, current_batch_id, previous_batch_id,
vessel_identifier, change_type, previous_data
)
SELECT
p_source_id,
p_new_batch_id,
p_previous_batch_id,
jsonb_build_object(
'imo', old.raw_vessel_data->>'imo',
'name', old.raw_vessel_data->>'vessel_name'
),
'REMOVED'::change_type_enum,
old.raw_vessel_data
FROM intelligence_reports old
WHERE old.import_batch_id = p_previous_batch_id
AND NOT EXISTS (
SELECT 1 FROM intelligence_reports new
WHERE new.import_batch_id = p_new_batch_id
AND (
-- Match by IMO or name
(new.raw_vessel_data->>'imo' = old.raw_vessel_data->>'imo'
AND old.raw_vessel_data->>'imo' IS NOT NULL)
OR
new.raw_vessel_data->>'vessel_name' = old.raw_vessel_data->>'vessel_name'
)
);
GET DIAGNOSTICS v_removed_count = ROW_COUNT;
-- Calculate unchanged (same hash = no changes, still valuable confirmation)
SELECT COUNT(*) INTO v_unchanged_count
FROM intelligence_reports new
JOIN intelligence_reports old ON old.data_hash = new.data_hash
WHERE new.import_batch_id = p_new_batch_id
AND old.import_batch_id = p_previous_batch_id;
-- Update batch statistics
UPDATE intelligence_import_batches
SET metadata = jsonb_build_object(
'new_records', v_new_count,
'updated_records', v_updated_count,
'removed_records', v_removed_count,
'unchanged_records', v_unchanged_count
)
WHERE batch_id = p_new_batch_id;
RETURN QUERY SELECT v_new_count, v_updated_count, v_removed_count, v_unchanged_count;
END;
$$ LANGUAGE plpgsql;
4. Historical Data Preservation
-- All intelligence_reports are preserved forever
-- Add archival status to avoid reprocessing
ALTER TABLE intelligence_reports ADD COLUMN IF NOT EXISTS
is_archived BOOLEAN DEFAULT FALSE,
archived_at TIMESTAMP,
superseded_by_report_id UUID REFERENCES intelligence_reports(report_id);
-- View for current active data only
CREATE OR REPLACE VIEW current_intelligence_reports AS
SELECT * FROM intelligence_reports
WHERE is_archived = FALSE
AND import_batch_id IN (
SELECT batch_id
FROM intelligence_import_batches b1
WHERE is_current = TRUE
OR NOT EXISTS (
SELECT 1 FROM intelligence_import_batches b2
WHERE b2.rfmo_shortname = b1.rfmo_shortname
AND b2.import_date > b1.import_date
)
);
5. Import Workflow Enhancement
# Enhanced import script pattern
#!/bin/bash
# load_[source]_staged_update.sh
# 1. Check for previous import
PREVIOUS_BATCH=$(psql -c "
SELECT batch_id
FROM intelligence_import_batches
WHERE rfmo_shortname = '$RFMO'
ORDER BY import_date DESC
LIMIT 1
")
# 2. Create new batch with linkage
NEW_BATCH=$(psql -c "
INSERT INTO intelligence_import_batches (
rfmo_shortname,
import_date,
previous_batch_id,
is_incremental
) VALUES (
'$RFMO',
CURRENT_DATE,
'$PREVIOUS_BATCH',
TRUE
) RETURNING batch_id
")
# 3. Import new data as normal
# ... existing import process ...
# 4. Run change detection
psql -c "SELECT * FROM process_incremental_import(
'$SOURCE_ID'::uuid,
'$NEW_BATCH'::uuid,
'$PREVIOUS_BATCH'::uuid
)"
# 5. Mark old batch as superseded
psql -c "UPDATE intelligence_import_batches
SET is_current = FALSE
WHERE batch_id = '$PREVIOUS_BATCH'"
6. Benefits of This Approach
- No Data Loss: All historical imports preserved
- Change Tracking: Know exactly what changed between versions
- Efficient Updates: Only process changed records in Phase 2
- Audit Trail: Complete lineage of data changes
- Rollback Capability: Can revert to previous versions
- Trend Analysis: Track vessel changes over time
7. Integration with Phase 2
Phase 2 can leverage this by:
- Only processing records marked as NEW or UPDATED
- Maintaining vessel history through change log
- Updating trust scores based on consistency over time
- Detecting suspicious changes (flag hopping, name changes)
Consequences
Positive
- Full historical preservation
- Efficient incremental updates
- Complete audit trail
- Change intelligence for risk assessment
- Supports compliance requirements
Negative
- Increased storage over time
- More complex import scripts
- Need to manage data retention
Neutral
- Requires import script updates
- Additional indexes for performance
- Change detection adds processing time
Implementation Steps
- Add version tracking tables
- Update existing import scripts to link batches
- Create change detection functions
- Build incremental import templates
- Create data retention policies
Future Enhancements
- Automated Updates: Scheduled imports with change notifications
- API Integration: Direct API pulls where available
- Change Alerts: Notify on significant changes (sanctions, IUU listing)
- Data Quality Metrics: Track completeness/quality over time