Skip to main content

Source: ebisu/docs/adr/0058-phase1-ongoing-data-management.md | ✏️ Edit on GitHub

ADR-0058: Phase 1 Architecture for Ongoing Data Management

Status

Accepted

Context

We need to ensure our Phase 1 staged intelligence import architecture can handle:

  1. New data imports - Monthly/quarterly updates from RFMOs
  2. Historical preservation - Keep all versions for trend analysis
  3. Change detection - Identify what's new/changed/removed
  4. Incremental updates - Don't reprocess unchanged data
  5. Data lineage - Track source, version, and import history

Current state:

  • intelligence_reports stores raw data with timestamps
  • intelligence_import_batches tracks import runs
  • data_hash enables change detection and confirmation tracking (NOT duplicate removal)
  • Multiple identical reports increase confidence - they are confirmations, not duplicates
  • Missing: Version tracking, change detection, update strategies

Decision

Enhance Phase 1 architecture to support ongoing data management without disrupting existing imports.

1. Version Tracking Enhancement

-- Add versioning to intelligence_import_batches
ALTER TABLE intelligence_import_batches ADD COLUMN IF NOT EXISTS
source_version TEXT, -- "2025-09", "2025-Q3", version identifier
previous_batch_id UUID REFERENCES intelligence_import_batches(batch_id),
is_full_refresh BOOLEAN DEFAULT TRUE,
is_incremental BOOLEAN DEFAULT FALSE;

-- Create source version tracking
CREATE TABLE IF NOT EXISTS source_data_versions (
version_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_id UUID REFERENCES original_sources_vessels(source_id),
version_identifier TEXT NOT NULL, -- "2025-09", "v3.2", etc
version_date DATE NOT NULL,
file_path TEXT,
file_hash TEXT,
record_count INTEGER,
download_date TIMESTAMP DEFAULT NOW(),
is_current BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(source_id, version_identifier)
);

-- Index for finding current version
CREATE INDEX idx_source_versions_current
ON source_data_versions(source_id, is_current)
WHERE is_current = TRUE;

2. Change Detection System

-- Track changes between imports
CREATE TABLE IF NOT EXISTS intelligence_change_log (
change_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_id UUID REFERENCES original_sources_vessels(source_id),
current_batch_id UUID REFERENCES intelligence_import_batches(batch_id),
previous_batch_id UUID REFERENCES intelligence_import_batches(batch_id),
vessel_identifier JSONB, -- {imo: "1234567", name: "VESSEL NAME"}
change_type change_type_enum NOT NULL, -- NEW, UPDATED, REMOVED, UNCHANGED
changed_fields JSONB, -- ["owner_name", "flag", "status"]
previous_data JSONB,
current_data JSONB,
change_detected_at TIMESTAMP DEFAULT NOW()
);

CREATE TYPE change_type_enum AS ENUM ('NEW', 'UPDATED', 'REMOVED', 'UNCHANGED');

-- Index for change analysis
CREATE INDEX idx_change_log_vessel ON intelligence_change_log
USING gin(vessel_identifier);
CREATE INDEX idx_change_log_type ON intelligence_change_log(change_type);

3. Update Strategy Functions

-- Function to handle incremental updates
CREATE OR REPLACE FUNCTION process_incremental_import(
p_source_id UUID,
p_new_batch_id UUID,
p_previous_batch_id UUID
) RETURNS TABLE (
new_records INTEGER,
updated_records INTEGER,
removed_records INTEGER,
unchanged_records INTEGER
) AS $$
DECLARE
v_new_count INTEGER := 0;
v_updated_count INTEGER := 0;
v_removed_count INTEGER := 0;
v_unchanged_count INTEGER := 0;
BEGIN
-- Find NEW records (exist in new but not in previous)
INSERT INTO intelligence_change_log (
source_id, current_batch_id, previous_batch_id,
vessel_identifier, change_type, current_data
)
SELECT
p_source_id,
p_new_batch_id,
p_previous_batch_id,
jsonb_build_object(
'imo', new.raw_vessel_data->>'imo',
'name', new.raw_vessel_data->>'vessel_name'
),
'NEW'::change_type_enum,
new.raw_vessel_data
FROM intelligence_reports new
WHERE new.import_batch_id = p_new_batch_id
AND NOT EXISTS (
SELECT 1 FROM intelligence_reports old
WHERE old.import_batch_id = p_previous_batch_id
AND old.data_hash = new.data_hash
);

GET DIAGNOSTICS v_new_count = ROW_COUNT;

-- Find UPDATED records (hash changed)
WITH vessel_matches AS (
SELECT
new.report_id as new_report_id,
old.report_id as old_report_id,
new.raw_vessel_data as new_data,
old.raw_vessel_data as old_data
FROM intelligence_reports new
JOIN intelligence_reports old ON
-- Match by IMO if available
(new.raw_vessel_data->>'imo' = old.raw_vessel_data->>'imo'
AND new.raw_vessel_data->>'imo' IS NOT NULL)
OR
-- Otherwise match by name
(new.raw_vessel_data->>'vessel_name' = old.raw_vessel_data->>'vessel_name')
WHERE new.import_batch_id = p_new_batch_id
AND old.import_batch_id = p_previous_batch_id
AND new.data_hash != old.data_hash
)
INSERT INTO intelligence_change_log (
source_id, current_batch_id, previous_batch_id,
vessel_identifier, change_type,
changed_fields, previous_data, current_data
)
SELECT
p_source_id,
p_new_batch_id,
p_previous_batch_id,
jsonb_build_object(
'imo', COALESCE(new_data->>'imo', old_data->>'imo'),
'name', COALESCE(new_data->>'vessel_name', old_data->>'vessel_name')
),
'UPDATED'::change_type_enum,
-- Detect which fields changed
(SELECT jsonb_agg(key)
FROM jsonb_each(old_data) o
FULL OUTER JOIN jsonb_each(new_data) n USING (key)
WHERE o.value IS DISTINCT FROM n.value),
old_data,
new_data
FROM vessel_matches;

GET DIAGNOSTICS v_updated_count = ROW_COUNT;

-- Find REMOVED records
INSERT INTO intelligence_change_log (
source_id, current_batch_id, previous_batch_id,
vessel_identifier, change_type, previous_data
)
SELECT
p_source_id,
p_new_batch_id,
p_previous_batch_id,
jsonb_build_object(
'imo', old.raw_vessel_data->>'imo',
'name', old.raw_vessel_data->>'vessel_name'
),
'REMOVED'::change_type_enum,
old.raw_vessel_data
FROM intelligence_reports old
WHERE old.import_batch_id = p_previous_batch_id
AND NOT EXISTS (
SELECT 1 FROM intelligence_reports new
WHERE new.import_batch_id = p_new_batch_id
AND (
-- Match by IMO or name
(new.raw_vessel_data->>'imo' = old.raw_vessel_data->>'imo'
AND old.raw_vessel_data->>'imo' IS NOT NULL)
OR
new.raw_vessel_data->>'vessel_name' = old.raw_vessel_data->>'vessel_name'
)
);

GET DIAGNOSTICS v_removed_count = ROW_COUNT;

-- Calculate unchanged (same hash = no changes, still valuable confirmation)
SELECT COUNT(*) INTO v_unchanged_count
FROM intelligence_reports new
JOIN intelligence_reports old ON old.data_hash = new.data_hash
WHERE new.import_batch_id = p_new_batch_id
AND old.import_batch_id = p_previous_batch_id;

-- Update batch statistics
UPDATE intelligence_import_batches
SET metadata = jsonb_build_object(
'new_records', v_new_count,
'updated_records', v_updated_count,
'removed_records', v_removed_count,
'unchanged_records', v_unchanged_count
)
WHERE batch_id = p_new_batch_id;

RETURN QUERY SELECT v_new_count, v_updated_count, v_removed_count, v_unchanged_count;
END;
$$ LANGUAGE plpgsql;

4. Historical Data Preservation

-- All intelligence_reports are preserved forever
-- Add archival status to avoid reprocessing
ALTER TABLE intelligence_reports ADD COLUMN IF NOT EXISTS
is_archived BOOLEAN DEFAULT FALSE,
archived_at TIMESTAMP,
superseded_by_report_id UUID REFERENCES intelligence_reports(report_id);

-- View for current active data only
CREATE OR REPLACE VIEW current_intelligence_reports AS
SELECT * FROM intelligence_reports
WHERE is_archived = FALSE
AND import_batch_id IN (
SELECT batch_id
FROM intelligence_import_batches b1
WHERE is_current = TRUE
OR NOT EXISTS (
SELECT 1 FROM intelligence_import_batches b2
WHERE b2.rfmo_shortname = b1.rfmo_shortname
AND b2.import_date > b1.import_date
)
);

5. Import Workflow Enhancement

# Enhanced import script pattern
#!/bin/bash
# load_[source]_staged_update.sh

# 1. Check for previous import
PREVIOUS_BATCH=$(psql -c "
SELECT batch_id
FROM intelligence_import_batches
WHERE rfmo_shortname = '$RFMO'
ORDER BY import_date DESC
LIMIT 1
")

# 2. Create new batch with linkage
NEW_BATCH=$(psql -c "
INSERT INTO intelligence_import_batches (
rfmo_shortname,
import_date,
previous_batch_id,
is_incremental
) VALUES (
'$RFMO',
CURRENT_DATE,
'$PREVIOUS_BATCH',
TRUE
) RETURNING batch_id
")

# 3. Import new data as normal
# ... existing import process ...

# 4. Run change detection
psql -c "SELECT * FROM process_incremental_import(
'$SOURCE_ID'::uuid,
'$NEW_BATCH'::uuid,
'$PREVIOUS_BATCH'::uuid
)"

# 5. Mark old batch as superseded
psql -c "UPDATE intelligence_import_batches
SET is_current = FALSE
WHERE batch_id = '$PREVIOUS_BATCH'"

6. Benefits of This Approach

  1. No Data Loss: All historical imports preserved
  2. Change Tracking: Know exactly what changed between versions
  3. Efficient Updates: Only process changed records in Phase 2
  4. Audit Trail: Complete lineage of data changes
  5. Rollback Capability: Can revert to previous versions
  6. Trend Analysis: Track vessel changes over time

7. Integration with Phase 2

Phase 2 can leverage this by:

  • Only processing records marked as NEW or UPDATED
  • Maintaining vessel history through change log
  • Updating trust scores based on consistency over time
  • Detecting suspicious changes (flag hopping, name changes)

Consequences

Positive

  • Full historical preservation
  • Efficient incremental updates
  • Complete audit trail
  • Change intelligence for risk assessment
  • Supports compliance requirements

Negative

  • Increased storage over time
  • More complex import scripts
  • Need to manage data retention

Neutral

  • Requires import script updates
  • Additional indexes for performance
  • Change detection adds processing time

Implementation Steps

  1. Add version tracking tables
  2. Update existing import scripts to link batches
  3. Create change detection functions
  4. Build incremental import templates
  5. Create data retention policies

Future Enhancements

  1. Automated Updates: Scheduled imports with change notifications
  2. API Integration: Direct API pulls where available
  3. Change Alerts: Notify on significant changes (sanctions, IUU listing)
  4. Data Quality Metrics: Track completeness/quality over time