Source:
ebisu/docs/adr/0059-phase1-postgresql17-native-architecture.md| ✏️ Edit on GitHub
ADR-0059: PostgreSQL 17 Native Architecture for Future-Proof Phase 1
Status
Accepted
Context
We need to redesign Phase 1 to leverage PostgreSQL 17's powerful native features for:
- Modular data management
- Comprehensive history tracking
- Efficient change detection
- Schema evolution without migrations
- Native JSON operations and indexing
PostgreSQL 17 provides powerful features we should utilize:
- JSONB with advanced path operations
- Native partitioning for time-series data
- Generated columns for materialized computations
- Multirange types for temporal validity
- Advanced JSON indexing (GIN, GIST, SP-GIST)
- SQL/JSON path expressions
- MERGE for upsert operations
Decision
Redesign Phase 1 architecture to be fully modular, leveraging PostgreSQL 17's native capabilities for a future-proof intelligence platform.
Core Architecture Principles
- JSONB-First Design: All vessel data stored as JSONB documents
- Event Sourcing: Immutable event stream of all changes
- Temporal Tables: Native time-travel queries
- Modular Sources: Each source is a self-contained module
- Schema-Free Evolution: Add fields without migrations
- Intelligence Preservation: EVERY report preserved - multiple reports = confirmations, NOT duplicates
1. Modular Source Registry
-- Source modules can be added/removed dynamically
CREATE TABLE source_modules (
module_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
module_name TEXT UNIQUE NOT NULL, -- 'rfmo_iccat', 'baddie_wro', etc.
module_type TEXT NOT NULL, -- 'vessel_list', 'sanctions', 'iuu', 'fleet_registry'
module_config JSONB NOT NULL DEFAULT '{}', -- Parser config, field mappings, etc.
-- Module capabilities
capabilities JSONB DEFAULT '[]'::jsonb, -- ['vessel_tracking', 'ownership', 'crew', 'sanctions']
-- Schema definition for this module's data
expected_schema JSONB, -- JSON Schema for validation
field_mappings JSONB, -- Map source fields to standard fields
-- Module status
is_active BOOLEAN DEFAULT TRUE,
last_import_at TIMESTAMPTZ,
last_import_version TEXT,
-- Native JSONB indexes for module registry
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- GIN index for capability searches
CREATE INDEX idx_source_modules_capabilities ON source_modules USING GIN (capabilities);
CREATE INDEX idx_source_modules_config ON source_modules USING GIN (module_config);
2. Universal Event Stream (Event Sourcing)
-- Immutable event stream - the source of truth
CREATE TABLE intelligence_events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_timestamp TIMESTAMPTZ DEFAULT NOW(),
event_type TEXT NOT NULL, -- 'vessel_reported', 'vessel_removed', 'status_changed'
source_module_id UUID REFERENCES source_modules(module_id),
-- The actual event data
event_data JSONB NOT NULL,
-- Metadata
import_run_id UUID,
file_source TEXT,
file_line_number INTEGER,
-- Hash for change detection and confirmation tracking (NOT duplicate removal!)
-- When multiple sources report same hash = increased confidence
data_hash TEXT GENERATED ALWAYS AS (digest(event_data::text, 'sha256')) STORED,
-- Temporal validity using multirange type (PG14+)
valid_period tstzrange DEFAULT tstzrange(NOW(), NULL),
-- Partitioning by month for performance
created_at TIMESTAMPTZ DEFAULT NOW()
) PARTITION BY RANGE (created_at);
-- Create monthly partitions automatically
CREATE TABLE intelligence_events_2024_09 PARTITION OF intelligence_events
FOR VALUES FROM ('2024-09-01') TO ('2024-10-01');
-- Advanced JSONB indexes
CREATE INDEX idx_events_data_gin ON intelligence_events USING GIN (event_data);
CREATE INDEX idx_events_data_path ON intelligence_events USING GIN (event_data jsonb_path_ops);
-- Specialized index for IMO lookups
CREATE INDEX idx_events_imo ON intelligence_events ((event_data->>'imo'))
WHERE event_data->>'imo' IS NOT NULL;
-- Index for vessel name searches
CREATE INDEX idx_events_vessel_name ON intelligence_events
USING GIN ((event_data->>'vessel_name') gin_trgm_ops);
3. Materialized Current State with History
-- Current state view materialized from events
CREATE TABLE vessel_current_state (
vessel_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- Composite of all source data
vessel_data JSONB NOT NULL,
-- Extracted identifiers for fast lookup
identifiers JSONB GENERATED ALWAYS AS (
jsonb_build_object(
'imo', vessel_data->>'imo',
'mmsi', vessel_data->'identifiers'->>'mmsi',
'ircs', vessel_data->'identifiers'->>'ircs',
'national_id', vessel_data->'identifiers'->>'national_id'
)
) STORED,
-- Source tracking
contributing_sources JSONB DEFAULT '[]'::jsonb, -- Array of source_module_ids
last_seen_by_source JSONB DEFAULT '{}'::jsonb, -- {source_id: timestamp}
-- Change tracking
version INTEGER DEFAULT 1,
previous_version_id UUID,
changed_fields TEXT[] DEFAULT '{}',
-- Temporal
first_seen_at TIMESTAMPTZ DEFAULT NOW(),
last_modified_at TIMESTAMPTZ DEFAULT NOW(),
-- Status
is_active BOOLEAN DEFAULT TRUE,
confidence_score NUMERIC GENERATED ALWAYS AS (
-- More sources = higher confidence (CONFIRMATIONS, not duplicates!)
jsonb_array_length(contributing_sources) * 0.1 +
-- Recent data = higher confidence
CASE
WHEN last_modified_at > NOW() - INTERVAL '30 days' THEN 0.3
WHEN last_modified_at > NOW() - INTERVAL '90 days' THEN 0.2
ELSE 0.1
END +
-- Complete data = higher confidence
(1 - (vessel_data->'data_quality'->>'missing_fields')::numeric / 100.0) * 0.3
) STORED
);
-- Multi-column GIN index for identifier searches
CREATE INDEX idx_vessel_identifiers ON vessel_current_state USING GIN (identifiers);
-- Trigram index for fuzzy name matching
CREATE INDEX idx_vessel_names_trgm ON vessel_current_state
USING GIN ((vessel_data->>'name') gin_trgm_ops);
4. Temporal History Table (Time Travel)
-- Full history with PostgreSQL temporal features
CREATE TABLE vessel_history (
history_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
vessel_id UUID NOT NULL,
vessel_data JSONB NOT NULL,
version INTEGER NOT NULL,
-- What changed
operation TEXT NOT NULL, -- 'INSERT', 'UPDATE', 'DELETE'
changed_fields TEXT[],
changed_by_source UUID REFERENCES source_modules(module_id),
-- System temporal columns
valid_from TIMESTAMPTZ NOT NULL,
valid_to TIMESTAMPTZ,
-- Create a multirange for complex validity periods
validity_periods tsmultirange,
-- Indexes
EXCLUDE USING GIST (vessel_id WITH =, tstzrange(valid_from, valid_to) WITH &&)
);
-- Index for time-travel queries
CREATE INDEX idx_vessel_history_temporal ON vessel_history
USING GIST (vessel_id, tstzrange(valid_from, valid_to));
-- Function for point-in-time queries
CREATE FUNCTION vessel_at_time(p_vessel_id UUID, p_timestamp TIMESTAMPTZ)
RETURNS JSONB AS $$
SELECT vessel_data
FROM vessel_history
WHERE vessel_id = p_vessel_id
AND valid_from <= p_timestamp
AND (valid_to IS NULL OR valid_to > p_timestamp)
ORDER BY valid_from DESC
LIMIT 1;
$$ LANGUAGE SQL STABLE;
5. Change Detection with JSONB Diff
-- Native JSONB diff functionality
CREATE OR REPLACE FUNCTION jsonb_diff_keys(old_data JSONB, new_data JSONB)
RETURNS TEXT[] AS $$
SELECT array_agg(DISTINCT key)
FROM (
SELECT key FROM jsonb_each(old_data)
EXCEPT
SELECT key FROM jsonb_each(new_data)
UNION
SELECT key FROM jsonb_each(new_data)
EXCEPT
SELECT key FROM jsonb_each(old_data)
UNION
SELECT key
FROM jsonb_each(old_data) o
JOIN jsonb_each(new_data) n USING (key)
WHERE o.value IS DISTINCT FROM n.value
) changes;
$$ LANGUAGE SQL IMMUTABLE;
-- Semantic diff with nested object support
CREATE OR REPLACE FUNCTION jsonb_deep_diff(old_data JSONB, new_data JSONB)
RETURNS JSONB AS $$
DECLARE
result JSONB := '{}';
key TEXT;
old_value JSONB;
new_value JSONB;
BEGIN
-- Find all keys in either object
FOR key IN
SELECT DISTINCT k FROM (
SELECT jsonb_object_keys(old_data) k
UNION
SELECT jsonb_object_keys(new_data) k
) keys
LOOP
old_value := old_data->key;
new_value := new_data->key;
IF old_value IS DISTINCT FROM new_value THEN
result := result || jsonb_build_object(
key, jsonb_build_object(
'old', old_value,
'new', new_value,
'operation', CASE
WHEN old_value IS NULL THEN 'added'
WHEN new_value IS NULL THEN 'removed'
ELSE 'modified'
END
)
);
END IF;
END LOOP;
RETURN result;
END;
$$ LANGUAGE plpgsql IMMUTABLE;
6. Modular Import Framework
-- Import runs with full traceability
CREATE TABLE import_runs (
run_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_module_id UUID REFERENCES source_modules(module_id),
run_type TEXT NOT NULL, -- 'full_refresh', 'incremental', 'recovery'
-- File tracking
source_files JSONB NOT NULL DEFAULT '[]', -- Array of {path, hash, size, modified}
-- Statistics
stats JSONB DEFAULT '{}'::jsonb,
-- State machine
status TEXT NOT NULL DEFAULT 'pending',
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
error_details JSONB,
-- Partitioning
created_at TIMESTAMPTZ DEFAULT NOW()
) PARTITION BY RANGE (created_at);
-- MERGE operation for efficient upserts (PG15+)
CREATE OR REPLACE FUNCTION import_vessel_event(
p_module_id UUID,
p_run_id UUID,
p_event_data JSONB
) RETURNS UUID AS $$
DECLARE
v_vessel_id UUID;
v_event_id UUID;
v_old_data JSONB;
v_identifiers JSONB;
BEGIN
-- Extract identifiers
v_identifiers := jsonb_build_object(
'imo', p_event_data->>'imo',
'mmsi', p_event_data->>'mmsi',
'ircs', p_event_data->>'ircs',
'name', p_event_data->>'vessel_name'
);
-- Store immutable event
INSERT INTO intelligence_events (
event_type,
source_module_id,
import_run_id,
event_data
) VALUES (
'vessel_reported',
p_module_id,
p_run_id,
p_event_data
) RETURNING event_id INTO v_event_id;
-- Update current state using MERGE
MERGE INTO vessel_current_state t
USING (
SELECT
v_identifiers,
p_event_data,
p_module_id
) s ON (
-- Match by IMO if available
(t.identifiers->>'imo' = s.v_identifiers->>'imo'
AND s.v_identifiers->>'imo' IS NOT NULL)
OR
-- Otherwise by name
(t.vessel_data->>'name' = s.v_identifiers->>'name')
)
WHEN MATCHED THEN
UPDATE SET
vessel_data = t.vessel_data || s.p_event_data,
contributing_sources =
CASE
WHEN t.contributing_sources @> to_jsonb(ARRAY[s.p_module_id])
THEN t.contributing_sources
ELSE t.contributing_sources || to_jsonb(ARRAY[s.p_module_id])
END,
last_seen_by_source = t.last_seen_by_source ||
jsonb_build_object(s.p_module_id::text, NOW()),
version = t.version + 1,
previous_version_id = t.vessel_id,
last_modified_at = NOW()
WHEN NOT MATCHED THEN
INSERT (vessel_data, contributing_sources, last_seen_by_source)
VALUES (
s.p_event_data,
to_jsonb(ARRAY[s.p_module_id]),
jsonb_build_object(s.p_module_id::text, NOW())
)
RETURNING vessel_id INTO v_vessel_id;
RETURN v_vessel_id;
END;
$$ LANGUAGE plpgsql;
7. Confirmation Tracking System
-- Track confirmations across sources (NOT duplicates!)
CREATE OR REPLACE VIEW vessel_confirmations AS
WITH confirmation_groups AS (
SELECT
data_hash,
COUNT(DISTINCT source_module_id) as confirming_sources_count,
array_agg(DISTINCT sm.module_name) as confirming_sources,
COUNT(*) as total_reports,
MIN(event_timestamp) as first_reported,
MAX(event_timestamp) as last_confirmed
FROM intelligence_events ie
JOIN source_modules sm ON ie.source_module_id = sm.module_id
WHERE ie.event_type = 'vessel_reported'
GROUP BY data_hash
)
SELECT
cg.*,
-- Confirmation score: more sources = higher confidence
CASE
WHEN confirming_sources_count >= 5 THEN 'VERY_HIGH'
WHEN confirming_sources_count >= 3 THEN 'HIGH'
WHEN confirming_sources_count >= 2 THEN 'MODERATE'
ELSE 'LOW'
END as confirmation_level,
-- Intelligence value: multiple reports = valuable confirmation
confirming_sources_count * 0.2 as confirmation_score
FROM confirmation_groups cg;
-- Function to get vessel confirmation status
CREATE OR REPLACE FUNCTION get_vessel_confirmation_status(p_vessel_id UUID)
RETURNS TABLE (
data_point TEXT,
confirming_sources TEXT[],
confirmation_count INTEGER,
confidence_level TEXT
) AS $$
BEGIN
RETURN QUERY
WITH vessel_data AS (
SELECT vessel_data FROM vessel_current_state WHERE vessel_id = p_vessel_id
),
confirmations AS (
SELECT
jsonb_object_keys(vd.vessel_data) as field,
ie.source_module_id,
ie.data_hash
FROM vessel_data vd
CROSS JOIN LATERAL intelligence_events ie
WHERE ie.event_data @> jsonb_build_object(field, vd.vessel_data->field)
)
SELECT
c.field,
array_agg(DISTINCT sm.module_name),
COUNT(DISTINCT c.source_module_id),
CASE
WHEN COUNT(DISTINCT c.source_module_id) >= 3 THEN 'HIGH_CONFIDENCE'
WHEN COUNT(DISTINCT c.source_module_id) >= 2 THEN 'MODERATE_CONFIDENCE'
ELSE 'SINGLE_SOURCE'
END
FROM confirmations c
JOIN source_modules sm ON c.source_module_id = sm.module_id
GROUP BY c.field;
END;
$$ LANGUAGE plpgsql;
COMMENT ON VIEW vessel_confirmations IS
'Tracks how many sources confirm the same vessel data. More confirmations = higher confidence.
This is NOT duplicate detection - multiple reports are valuable intelligence confirmations!';
8. Query Interface
-- High-performance vessel search across all sources
CREATE OR REPLACE FUNCTION search_vessels(
p_criteria JSONB,
p_point_in_time TIMESTAMPTZ DEFAULT NOW()
) RETURNS TABLE (
vessel_id UUID,
vessel_data JSONB,
match_score NUMERIC
) AS $$
BEGIN
RETURN QUERY
WITH search_params AS (
SELECT
p_criteria->>'imo' as imo,
p_criteria->>'name' as name,
p_criteria->>'flag' as flag,
p_criteria->'sources' as sources
)
SELECT
v.vessel_id,
v.vessel_data,
-- Calculate match score
CASE
WHEN v.identifiers->>'imo' = sp.imo THEN 1.0
WHEN v.vessel_data->>'name' ILIKE '%' || sp.name || '%' THEN 0.8
ELSE similarity(v.vessel_data->>'name', sp.name)
END as match_score
FROM vessel_current_state v, search_params sp
WHERE
-- Use indexes efficiently
(sp.imo IS NULL OR v.identifiers->>'imo' = sp.imo)
AND (sp.name IS NULL OR v.vessel_data->>'name' % sp.name) -- Trigram similarity
AND (sp.flag IS NULL OR v.vessel_data->>'flag' = sp.flag)
AND (sp.sources IS NULL OR v.contributing_sources @> sp.sources)
ORDER BY match_score DESC;
END;
$$ LANGUAGE plpgsql;
-- Get vessel history with changes
CREATE OR REPLACE FUNCTION vessel_history_timeline(p_vessel_id UUID)
RETURNS TABLE (
version INTEGER,
valid_from TIMESTAMPTZ,
valid_to TIMESTAMPTZ,
changed_by TEXT,
changes JSONB
) AS $$
BEGIN
RETURN QUERY
SELECT
h1.version,
h1.valid_from,
h1.valid_to,
sm.module_name as changed_by,
jsonb_deep_diff(h2.vessel_data, h1.vessel_data) as changes
FROM vessel_history h1
LEFT JOIN vessel_history h2 ON h1.vessel_id = h2.vessel_id
AND h2.version = h1.version - 1
LEFT JOIN source_modules sm ON h1.changed_by_source = sm.module_id
WHERE h1.vessel_id = p_vessel_id
ORDER BY h1.version DESC;
END;
$$ LANGUAGE plpgsql;
8. Performance Optimizations
-- Parallel processing for large imports
ALTER TABLE intelligence_events SET (parallel_workers = 4);
ALTER TABLE vessel_current_state SET (parallel_workers = 4);
-- Enable JIT compilation for complex JSONB operations
SET jit = on;
SET jit_above_cost = 100000;
-- Statistics for query planning
CREATE STATISTICS vessel_imo_stats (ndistinct)
ON (vessel_data->>'imo') FROM vessel_current_state;
-- Bloom filters for existence checks (PG13+)
CREATE INDEX idx_vessel_imo_bloom ON vessel_current_state
USING bloom ((vessel_data->>'imo')) WITH (length=80);
Benefits
PostgreSQL 17 Native Features Utilized
- JSONB Performance: SP-GIST indexes, path operators
- Partitioning: Automatic monthly partitions for events
- Generated Columns: Materialized computed fields
- MERGE: Efficient upserts without procedures
- Multirange Types: Complex temporal validity
- Statistics Objects: Better query planning
- Parallel Processing: Native parallelization
Architecture Benefits
- Schema-free Evolution: Add fields without migrations
- Complete History: Every change preserved
- Time Travel: Query any point in time
- Modular Sources: Add/remove sources dynamically
- Change Intelligence: Automatic diff detection
- Performance: Native indexes and partitioning
Implementation Migration
-- Migrate existing data to new architecture
INSERT INTO source_modules (module_name, module_type, capabilities)
SELECT
source_shortname,
CASE
WHEN source_types[1] = 'RFMO' THEN 'vessel_list'
WHEN source_types[1] IN ('WRO', 'SDN_GLOMAG') THEN 'sanctions'
ELSE 'other'
END,
source_types::jsonb
FROM original_sources_vessels;
-- Convert existing intelligence_reports to events
INSERT INTO intelligence_events (
event_type,
source_module_id,
event_data,
created_at
)
SELECT
'vessel_reported',
sm.module_id,
ir.raw_vessel_data,
ir.created_at
FROM intelligence_reports ir
JOIN original_sources_vessels osv ON ir.source_id = osv.source_id
JOIN source_modules sm ON osv.source_shortname = sm.module_name;
Consequences
Positive
- Future-proof architecture using PG17 native features
- No schema migrations needed
- Complete audit trail
- High-performance queries
- Modular and extensible
Negative
- Initial complexity
- Storage requirements (mitigated by partitioning)
- Learning curve for JSONB operations
Neutral
- Different from traditional relational design
- Requires PostgreSQL 17
- Event sourcing mindset needed
Next Steps
- Create migration scripts from current schema
- Build import adapters for each source type
- Create monitoring dashboards
- Document query patterns
- Performance benchmark against current system