Source:
ebisu/docs/adr/0060-phase1-improvements-required.md| ✏️ Edit on GitHub
ADR-0060: Phase 1 Improvements Required Before Phase 2
Status
Accepted
Context
Before proceeding to Phase 2 (Cross-Source Identity Resolution), we need to address critical gaps in Phase 1 that will impact our ability to:
- Track vessel changes over time
- Identify confirming sources
- Handle incremental updates efficiently
- Maintain data lineage and audit trails
- Scale to hundreds of data sources
Decision
Implement these critical improvements to Phase 1 infrastructure before moving to Phase 2.
1. Add Temporal Tracking to Intelligence Tables
-- Add temporal columns to intelligence_reports
ALTER TABLE intelligence_reports
ADD COLUMN IF NOT EXISTS valid_from DATE,
ADD COLUMN IF NOT EXISTS valid_to DATE,
ADD COLUMN IF NOT EXISTS is_current BOOLEAN DEFAULT TRUE;
-- Add temporal columns to vessel_intelligence
ALTER TABLE vessel_intelligence
ADD COLUMN IF NOT EXISTS valid_from DATE,
ADD COLUMN IF NOT EXISTS valid_to DATE,
ADD COLUMN IF NOT EXISTS is_current BOOLEAN DEFAULT TRUE;
-- Index for temporal queries
CREATE INDEX idx_intelligence_reports_temporal
ON intelligence_reports(source_id, valid_from, valid_to)
WHERE is_current = TRUE;
2. Implement Change Detection System
-- Add change tracking to intelligence_import_batches
ALTER TABLE intelligence_import_batches
ADD COLUMN IF NOT EXISTS previous_batch_id UUID REFERENCES intelligence_import_batches(batch_id),
ADD COLUMN IF NOT EXISTS source_version TEXT,
ADD COLUMN IF NOT EXISTS source_file_hash TEXT,
ADD COLUMN IF NOT EXISTS changes_detected JSONB DEFAULT '{}',
ADD COLUMN IF NOT EXISTS is_incremental BOOLEAN DEFAULT FALSE;
-- Create change detection view
CREATE MATERIALIZED VIEW vessel_changes AS
WITH batch_pairs AS (
SELECT
current_batch.batch_id as current_batch_id,
current_batch.rfmo_shortname,
current_batch.import_date as current_date,
previous_batch.batch_id as previous_batch_id,
previous_batch.import_date as previous_date
FROM intelligence_import_batches current_batch
LEFT JOIN intelligence_import_batches previous_batch
ON current_batch.previous_batch_id = previous_batch.batch_id
)
SELECT
bp.*,
-- New vessels
(SELECT COUNT(*) FROM vessel_intelligence vi_new
WHERE vi_new.report_id IN (
SELECT report_id FROM intelligence_reports
WHERE import_batch_id = bp.current_batch_id
)
AND NOT EXISTS (
SELECT 1 FROM vessel_intelligence vi_old
WHERE vi_old.report_id IN (
SELECT report_id FROM intelligence_reports
WHERE import_batch_id = bp.previous_batch_id
)
AND vi_old.reported_imo = vi_new.reported_imo
)) as new_vessels,
-- Removed vessels
(SELECT COUNT(*) FROM vessel_intelligence vi_old
WHERE vi_old.report_id IN (
SELECT report_id FROM intelligence_reports
WHERE import_batch_id = bp.previous_batch_id
)
AND NOT EXISTS (
SELECT 1 FROM vessel_intelligence vi_new
WHERE vi_new.report_id IN (
SELECT report_id FROM intelligence_reports
WHERE import_batch_id = bp.current_batch_id
)
AND vi_new.reported_imo = vi_old.reported_imo
)) as removed_vessels
FROM batch_pairs bp;
3. Add Confirmation Tracking
-- Create confirmation tracking table
CREATE TABLE vessel_data_confirmations (
confirmation_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
data_hash TEXT NOT NULL,
data_type TEXT NOT NULL, -- 'full_vessel', 'imo', 'ownership', etc.
confirming_sources UUID[] NOT NULL,
confirmation_count INTEGER GENERATED ALWAYS AS (array_length(confirming_sources, 1)) STORED,
first_seen_date DATE NOT NULL,
last_confirmed_date DATE NOT NULL,
confidence_boost NUMERIC GENERATED ALWAYS AS (
CASE
WHEN array_length(confirming_sources, 1) >= 5 THEN 0.5
WHEN array_length(confirming_sources, 1) >= 3 THEN 0.3
WHEN array_length(confirming_sources, 1) >= 2 THEN 0.1
ELSE 0
END
) STORED,
created_at TIMESTAMP DEFAULT NOW()
);
-- Function to update confirmations after import
CREATE OR REPLACE FUNCTION update_vessel_confirmations(p_batch_id UUID)
RETURNS void AS $$
BEGIN
-- Track full vessel data confirmations
INSERT INTO vessel_data_confirmations (
data_hash,
data_type,
confirming_sources,
first_seen_date,
last_confirmed_date
)
SELECT
data_hash,
'full_vessel',
array_agg(DISTINCT source_id),
MIN(report_date),
MAX(report_date)
FROM intelligence_reports
WHERE data_hash IN (
SELECT data_hash
FROM intelligence_reports
WHERE import_batch_id = p_batch_id
)
GROUP BY data_hash
ON CONFLICT (data_hash) DO UPDATE SET
confirming_sources = array_unique(
vessel_data_confirmations.confirming_sources || EXCLUDED.confirming_sources
),
last_confirmed_date = GREATEST(
vessel_data_confirmations.last_confirmed_date,
EXCLUDED.last_confirmed_date
);
END;
$$ LANGUAGE plpgsql;
4. Create Modular Import Framework
-- Source import configurations
CREATE TABLE source_import_configs (
config_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_id UUID REFERENCES original_sources_vessels(source_id),
-- Import settings
import_type TEXT NOT NULL, -- 'csv', 'json', 'api', 'excel'
file_pattern TEXT, -- Regex for finding files
-- Column mappings
field_mappings JSONB NOT NULL, -- {"source_field": "target_field"}
-- Data quality rules
required_fields TEXT[] DEFAULT '{}',
validation_rules JSONB DEFAULT '{}',
-- Processing
pre_processing_script TEXT,
post_processing_script TEXT,
-- Schedule
update_frequency INTERVAL,
last_successful_import TIMESTAMP,
next_scheduled_import TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- Generic import function
CREATE OR REPLACE FUNCTION import_vessel_data(
p_source_id UUID,
p_file_path TEXT,
p_import_config JSONB
) RETURNS UUID AS $$
DECLARE
v_batch_id UUID;
v_field_mappings JSONB;
BEGIN
-- Create import batch
INSERT INTO intelligence_import_batches (
rfmo_shortname,
import_date,
source_file_path,
is_incremental
)
SELECT
source_shortname,
CURRENT_DATE,
p_file_path,
FALSE
FROM original_sources_vessels
WHERE source_id = p_source_id
RETURNING batch_id INTO v_batch_id;
-- Get field mappings
SELECT field_mappings INTO v_field_mappings
FROM source_import_configs
WHERE source_id = p_source_id;
-- Import logic here (simplified)
-- This would be expanded to handle different file types
RETURN v_batch_id;
END;
$$ LANGUAGE plpgsql;
5. Add Data Lineage Tracking
-- Track complete data lineage
CREATE TABLE data_lineage (
lineage_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_id UUID REFERENCES original_sources_vessels(source_id),
-- File tracking
source_file_path TEXT NOT NULL,
source_file_hash TEXT NOT NULL,
source_file_size BIGINT,
source_file_modified TIMESTAMP,
download_timestamp TIMESTAMP DEFAULT NOW(),
-- Processing tracking
import_batch_id UUID REFERENCES intelligence_import_batches(batch_id),
records_in_file INTEGER,
records_imported INTEGER,
records_failed INTEGER,
-- Lineage chain
derived_from_lineage_id UUID REFERENCES data_lineage(lineage_id),
transformation_applied TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
-- Index for lineage queries
CREATE INDEX idx_lineage_source ON data_lineage(source_id, created_at DESC);
CREATE INDEX idx_lineage_chain ON data_lineage(derived_from_lineage_id);
6. Implement Array Utility Functions
-- PostgreSQL 17 doesn't have array_unique built-in
CREATE OR REPLACE FUNCTION array_unique(arr anyarray)
RETURNS anyarray AS $$
SELECT array_agg(DISTINCT elem)
FROM unnest(arr) elem
$$ LANGUAGE SQL IMMUTABLE;
-- Function to calculate array intersection
CREATE OR REPLACE FUNCTION array_intersection(arr1 anyarray, arr2 anyarray)
RETURNS anyarray AS $$
SELECT array_agg(elem)
FROM (
SELECT unnest(arr1)
INTERSECT
SELECT unnest(arr2)
) t(elem)
$$ LANGUAGE SQL IMMUTABLE;
7. Update Import Scripts Template
#!/bin/bash
# Enhanced import template with Phase 1 improvements
# 1. Record file metadata
FILE_HASH=$(sha256sum "$INPUT_FILE" | cut -d' ' -f1)
FILE_SIZE=$(stat -f%z "$INPUT_FILE" 2>/dev/null || stat -c%s "$INPUT_FILE")
FILE_MODIFIED=$(stat -f%m "$INPUT_FILE" 2>/dev/null || stat -c%Y "$INPUT_FILE")
# 2. Check if this exact file was already imported
EXISTING_IMPORT=$(execute_sql "
SELECT import_batch_id
FROM data_lineage
WHERE source_file_hash = '$FILE_HASH'
LIMIT 1;
" "-t")
if [[ -n "$EXISTING_IMPORT" ]]; then
log_success "File already imported in batch: $EXISTING_IMPORT"
exit 0
fi
# 3. Get previous batch for change detection
PREVIOUS_BATCH=$(execute_sql "
SELECT batch_id
FROM intelligence_import_batches
WHERE rfmo_shortname = '$RFMO'
AND stage_1_raw_complete = TRUE
ORDER BY import_date DESC
LIMIT 1;
" "-t")
# 4. Create new batch with linkage
BATCH_ID=$(execute_sql "
INSERT INTO intelligence_import_batches (
rfmo_shortname,
import_date,
source_file_path,
source_file_hash,
previous_batch_id,
source_version
) VALUES (
'$RFMO',
CURRENT_DATE,
'$INPUT_FILE',
'$FILE_HASH',
$([ -z "$PREVIOUS_BATCH" ] && echo "NULL" || echo "'$PREVIOUS_BATCH'::uuid"),
'$(date +%Y-%m)'
) RETURNING batch_id;
" "-t")
# 5. Record lineage
execute_sql "
INSERT INTO data_lineage (
source_id,
source_file_path,
source_file_hash,
source_file_size,
source_file_modified,
import_batch_id,
records_in_file
) VALUES (
'$SOURCE_ID'::uuid,
'$INPUT_FILE',
'$FILE_HASH',
$FILE_SIZE,
to_timestamp($FILE_MODIFIED),
'$BATCH_ID'::uuid,
$INPUT_COUNT
);
"
# ... rest of import logic ...
# 6. Update confirmations
execute_sql "SELECT update_vessel_confirmations('$BATCH_ID'::uuid);"
# 7. Detect changes if incremental
if [[ -n "$PREVIOUS_BATCH" ]]; then
execute_sql "
UPDATE intelligence_import_batches
SET changes_detected = (
SELECT jsonb_build_object(
'new_vessels', COUNT(*) FILTER (WHERE change_type = 'NEW'),
'updated_vessels', COUNT(*) FILTER (WHERE change_type = 'UPDATED'),
'removed_vessels', COUNT(*) FILTER (WHERE change_type = 'REMOVED')
)
FROM detect_vessel_changes('$BATCH_ID'::uuid, '$PREVIOUS_BATCH'::uuid)
)
WHERE batch_id = '$BATCH_ID'::uuid;
"
fi
Consequences
Positive
- Complete data lineage and audit trail
- Efficient incremental updates
- Automatic confirmation tracking
- Change detection between imports
- Modular framework for new sources
- Ready for Phase 2 advanced features
Negative
- More complex import process
- Additional storage for tracking tables
- Need to update existing import scripts
Neutral
- Requires one-time migration
- More metadata to manage
- Performance impact minimal with proper indexes
Migration Steps
- Add new columns and tables (non-breaking)
- Update import script template
- Gradually update each source importer
- Backfill confirmation data from existing imports
- Create monitoring dashboards
Priority Order
- Critical: Temporal tracking and change detection
- Important: Confirmation tracking and lineage
- Nice to have: Modular framework (can evolve over time)
These improvements ensure Phase 1 provides a solid foundation for the complex vessel matching and trust scoring in Phase 2.