Skip to main content

Source: ebisu/docs/adr/0059-phase1-postgresql17-native-architecture.md | ✏️ Edit on GitHub

ADR-0059: PostgreSQL 17 Native Architecture for Future-Proof Phase 1

Status

Accepted

Context

We need to redesign Phase 1 to leverage PostgreSQL 17's powerful native features for:

  • Modular data management
  • Comprehensive history tracking
  • Efficient change detection
  • Schema evolution without migrations
  • Native JSON operations and indexing

PostgreSQL 17 provides powerful features we should utilize:

  • JSONB with advanced path operations
  • Native partitioning for time-series data
  • Generated columns for materialized computations
  • Multirange types for temporal validity
  • Advanced JSON indexing (GIN, GIST, SP-GIST)
  • SQL/JSON path expressions
  • MERGE for upsert operations

Decision

Redesign Phase 1 architecture to be fully modular, leveraging PostgreSQL 17's native capabilities for a future-proof intelligence platform.

Core Architecture Principles

  1. JSONB-First Design: All vessel data stored as JSONB documents
  2. Event Sourcing: Immutable event stream of all changes
  3. Temporal Tables: Native time-travel queries
  4. Modular Sources: Each source is a self-contained module
  5. Schema-Free Evolution: Add fields without migrations
  6. Intelligence Preservation: EVERY report preserved - multiple reports = confirmations, NOT duplicates

1. Modular Source Registry

-- Source modules can be added/removed dynamically
CREATE TABLE source_modules (
module_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
module_name TEXT UNIQUE NOT NULL, -- 'rfmo_iccat', 'baddie_wro', etc.
module_type TEXT NOT NULL, -- 'vessel_list', 'sanctions', 'iuu', 'fleet_registry'
module_config JSONB NOT NULL DEFAULT '{}', -- Parser config, field mappings, etc.
-- Module capabilities
capabilities JSONB DEFAULT '[]'::jsonb, -- ['vessel_tracking', 'ownership', 'crew', 'sanctions']
-- Schema definition for this module's data
expected_schema JSONB, -- JSON Schema for validation
field_mappings JSONB, -- Map source fields to standard fields
-- Module status
is_active BOOLEAN DEFAULT TRUE,
last_import_at TIMESTAMPTZ,
last_import_version TEXT,
-- Native JSONB indexes for module registry
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- GIN index for capability searches
CREATE INDEX idx_source_modules_capabilities ON source_modules USING GIN (capabilities);
CREATE INDEX idx_source_modules_config ON source_modules USING GIN (module_config);

2. Universal Event Stream (Event Sourcing)

-- Immutable event stream - the source of truth
CREATE TABLE intelligence_events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_timestamp TIMESTAMPTZ DEFAULT NOW(),
event_type TEXT NOT NULL, -- 'vessel_reported', 'vessel_removed', 'status_changed'
source_module_id UUID REFERENCES source_modules(module_id),
-- The actual event data
event_data JSONB NOT NULL,
-- Metadata
import_run_id UUID,
file_source TEXT,
file_line_number INTEGER,
-- Hash for change detection and confirmation tracking (NOT duplicate removal!)
-- When multiple sources report same hash = increased confidence
data_hash TEXT GENERATED ALWAYS AS (digest(event_data::text, 'sha256')) STORED,
-- Temporal validity using multirange type (PG14+)
valid_period tstzrange DEFAULT tstzrange(NOW(), NULL),
-- Partitioning by month for performance
created_at TIMESTAMPTZ DEFAULT NOW()
) PARTITION BY RANGE (created_at);

-- Create monthly partitions automatically
CREATE TABLE intelligence_events_2024_09 PARTITION OF intelligence_events
FOR VALUES FROM ('2024-09-01') TO ('2024-10-01');

-- Advanced JSONB indexes
CREATE INDEX idx_events_data_gin ON intelligence_events USING GIN (event_data);
CREATE INDEX idx_events_data_path ON intelligence_events USING GIN (event_data jsonb_path_ops);
-- Specialized index for IMO lookups
CREATE INDEX idx_events_imo ON intelligence_events ((event_data->>'imo'))
WHERE event_data->>'imo' IS NOT NULL;
-- Index for vessel name searches
CREATE INDEX idx_events_vessel_name ON intelligence_events
USING GIN ((event_data->>'vessel_name') gin_trgm_ops);

3. Materialized Current State with History

-- Current state view materialized from events
CREATE TABLE vessel_current_state (
vessel_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- Composite of all source data
vessel_data JSONB NOT NULL,
-- Extracted identifiers for fast lookup
identifiers JSONB GENERATED ALWAYS AS (
jsonb_build_object(
'imo', vessel_data->>'imo',
'mmsi', vessel_data->'identifiers'->>'mmsi',
'ircs', vessel_data->'identifiers'->>'ircs',
'national_id', vessel_data->'identifiers'->>'national_id'
)
) STORED,
-- Source tracking
contributing_sources JSONB DEFAULT '[]'::jsonb, -- Array of source_module_ids
last_seen_by_source JSONB DEFAULT '{}'::jsonb, -- {source_id: timestamp}
-- Change tracking
version INTEGER DEFAULT 1,
previous_version_id UUID,
changed_fields TEXT[] DEFAULT '{}',
-- Temporal
first_seen_at TIMESTAMPTZ DEFAULT NOW(),
last_modified_at TIMESTAMPTZ DEFAULT NOW(),
-- Status
is_active BOOLEAN DEFAULT TRUE,
confidence_score NUMERIC GENERATED ALWAYS AS (
-- More sources = higher confidence (CONFIRMATIONS, not duplicates!)
jsonb_array_length(contributing_sources) * 0.1 +
-- Recent data = higher confidence
CASE
WHEN last_modified_at > NOW() - INTERVAL '30 days' THEN 0.3
WHEN last_modified_at > NOW() - INTERVAL '90 days' THEN 0.2
ELSE 0.1
END +
-- Complete data = higher confidence
(1 - (vessel_data->'data_quality'->>'missing_fields')::numeric / 100.0) * 0.3
) STORED
);

-- Multi-column GIN index for identifier searches
CREATE INDEX idx_vessel_identifiers ON vessel_current_state USING GIN (identifiers);
-- Trigram index for fuzzy name matching
CREATE INDEX idx_vessel_names_trgm ON vessel_current_state
USING GIN ((vessel_data->>'name') gin_trgm_ops);

4. Temporal History Table (Time Travel)

-- Full history with PostgreSQL temporal features
CREATE TABLE vessel_history (
history_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
vessel_id UUID NOT NULL,
vessel_data JSONB NOT NULL,
version INTEGER NOT NULL,
-- What changed
operation TEXT NOT NULL, -- 'INSERT', 'UPDATE', 'DELETE'
changed_fields TEXT[],
changed_by_source UUID REFERENCES source_modules(module_id),
-- System temporal columns
valid_from TIMESTAMPTZ NOT NULL,
valid_to TIMESTAMPTZ,
-- Create a multirange for complex validity periods
validity_periods tsmultirange,
-- Indexes
EXCLUDE USING GIST (vessel_id WITH =, tstzrange(valid_from, valid_to) WITH &&)
);

-- Index for time-travel queries
CREATE INDEX idx_vessel_history_temporal ON vessel_history
USING GIST (vessel_id, tstzrange(valid_from, valid_to));

-- Function for point-in-time queries
CREATE FUNCTION vessel_at_time(p_vessel_id UUID, p_timestamp TIMESTAMPTZ)
RETURNS JSONB AS $$
SELECT vessel_data
FROM vessel_history
WHERE vessel_id = p_vessel_id
AND valid_from <= p_timestamp
AND (valid_to IS NULL OR valid_to > p_timestamp)
ORDER BY valid_from DESC
LIMIT 1;
$$ LANGUAGE SQL STABLE;

5. Change Detection with JSONB Diff

-- Native JSONB diff functionality
CREATE OR REPLACE FUNCTION jsonb_diff_keys(old_data JSONB, new_data JSONB)
RETURNS TEXT[] AS $$
SELECT array_agg(DISTINCT key)
FROM (
SELECT key FROM jsonb_each(old_data)
EXCEPT
SELECT key FROM jsonb_each(new_data)
UNION
SELECT key FROM jsonb_each(new_data)
EXCEPT
SELECT key FROM jsonb_each(old_data)
UNION
SELECT key
FROM jsonb_each(old_data) o
JOIN jsonb_each(new_data) n USING (key)
WHERE o.value IS DISTINCT FROM n.value
) changes;
$$ LANGUAGE SQL IMMUTABLE;

-- Semantic diff with nested object support
CREATE OR REPLACE FUNCTION jsonb_deep_diff(old_data JSONB, new_data JSONB)
RETURNS JSONB AS $$
DECLARE
result JSONB := '{}';
key TEXT;
old_value JSONB;
new_value JSONB;
BEGIN
-- Find all keys in either object
FOR key IN
SELECT DISTINCT k FROM (
SELECT jsonb_object_keys(old_data) k
UNION
SELECT jsonb_object_keys(new_data) k
) keys
LOOP
old_value := old_data->key;
new_value := new_data->key;

IF old_value IS DISTINCT FROM new_value THEN
result := result || jsonb_build_object(
key, jsonb_build_object(
'old', old_value,
'new', new_value,
'operation', CASE
WHEN old_value IS NULL THEN 'added'
WHEN new_value IS NULL THEN 'removed'
ELSE 'modified'
END
)
);
END IF;
END LOOP;

RETURN result;
END;
$$ LANGUAGE plpgsql IMMUTABLE;

6. Modular Import Framework

-- Import runs with full traceability
CREATE TABLE import_runs (
run_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_module_id UUID REFERENCES source_modules(module_id),
run_type TEXT NOT NULL, -- 'full_refresh', 'incremental', 'recovery'
-- File tracking
source_files JSONB NOT NULL DEFAULT '[]', -- Array of {path, hash, size, modified}
-- Statistics
stats JSONB DEFAULT '{}'::jsonb,
-- State machine
status TEXT NOT NULL DEFAULT 'pending',
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
error_details JSONB,
-- Partitioning
created_at TIMESTAMPTZ DEFAULT NOW()
) PARTITION BY RANGE (created_at);

-- MERGE operation for efficient upserts (PG15+)
CREATE OR REPLACE FUNCTION import_vessel_event(
p_module_id UUID,
p_run_id UUID,
p_event_data JSONB
) RETURNS UUID AS $$
DECLARE
v_vessel_id UUID;
v_event_id UUID;
v_old_data JSONB;
v_identifiers JSONB;
BEGIN
-- Extract identifiers
v_identifiers := jsonb_build_object(
'imo', p_event_data->>'imo',
'mmsi', p_event_data->>'mmsi',
'ircs', p_event_data->>'ircs',
'name', p_event_data->>'vessel_name'
);

-- Store immutable event
INSERT INTO intelligence_events (
event_type,
source_module_id,
import_run_id,
event_data
) VALUES (
'vessel_reported',
p_module_id,
p_run_id,
p_event_data
) RETURNING event_id INTO v_event_id;

-- Update current state using MERGE
MERGE INTO vessel_current_state t
USING (
SELECT
v_identifiers,
p_event_data,
p_module_id
) s ON (
-- Match by IMO if available
(t.identifiers->>'imo' = s.v_identifiers->>'imo'
AND s.v_identifiers->>'imo' IS NOT NULL)
OR
-- Otherwise by name
(t.vessel_data->>'name' = s.v_identifiers->>'name')
)
WHEN MATCHED THEN
UPDATE SET
vessel_data = t.vessel_data || s.p_event_data,
contributing_sources =
CASE
WHEN t.contributing_sources @> to_jsonb(ARRAY[s.p_module_id])
THEN t.contributing_sources
ELSE t.contributing_sources || to_jsonb(ARRAY[s.p_module_id])
END,
last_seen_by_source = t.last_seen_by_source ||
jsonb_build_object(s.p_module_id::text, NOW()),
version = t.version + 1,
previous_version_id = t.vessel_id,
last_modified_at = NOW()
WHEN NOT MATCHED THEN
INSERT (vessel_data, contributing_sources, last_seen_by_source)
VALUES (
s.p_event_data,
to_jsonb(ARRAY[s.p_module_id]),
jsonb_build_object(s.p_module_id::text, NOW())
)
RETURNING vessel_id INTO v_vessel_id;

RETURN v_vessel_id;
END;
$$ LANGUAGE plpgsql;

7. Confirmation Tracking System

-- Track confirmations across sources (NOT duplicates!)
CREATE OR REPLACE VIEW vessel_confirmations AS
WITH confirmation_groups AS (
SELECT
data_hash,
COUNT(DISTINCT source_module_id) as confirming_sources_count,
array_agg(DISTINCT sm.module_name) as confirming_sources,
COUNT(*) as total_reports,
MIN(event_timestamp) as first_reported,
MAX(event_timestamp) as last_confirmed
FROM intelligence_events ie
JOIN source_modules sm ON ie.source_module_id = sm.module_id
WHERE ie.event_type = 'vessel_reported'
GROUP BY data_hash
)
SELECT
cg.*,
-- Confirmation score: more sources = higher confidence
CASE
WHEN confirming_sources_count >= 5 THEN 'VERY_HIGH'
WHEN confirming_sources_count >= 3 THEN 'HIGH'
WHEN confirming_sources_count >= 2 THEN 'MODERATE'
ELSE 'LOW'
END as confirmation_level,
-- Intelligence value: multiple reports = valuable confirmation
confirming_sources_count * 0.2 as confirmation_score
FROM confirmation_groups cg;

-- Function to get vessel confirmation status
CREATE OR REPLACE FUNCTION get_vessel_confirmation_status(p_vessel_id UUID)
RETURNS TABLE (
data_point TEXT,
confirming_sources TEXT[],
confirmation_count INTEGER,
confidence_level TEXT
) AS $$
BEGIN
RETURN QUERY
WITH vessel_data AS (
SELECT vessel_data FROM vessel_current_state WHERE vessel_id = p_vessel_id
),
confirmations AS (
SELECT
jsonb_object_keys(vd.vessel_data) as field,
ie.source_module_id,
ie.data_hash
FROM vessel_data vd
CROSS JOIN LATERAL intelligence_events ie
WHERE ie.event_data @> jsonb_build_object(field, vd.vessel_data->field)
)
SELECT
c.field,
array_agg(DISTINCT sm.module_name),
COUNT(DISTINCT c.source_module_id),
CASE
WHEN COUNT(DISTINCT c.source_module_id) >= 3 THEN 'HIGH_CONFIDENCE'
WHEN COUNT(DISTINCT c.source_module_id) >= 2 THEN 'MODERATE_CONFIDENCE'
ELSE 'SINGLE_SOURCE'
END
FROM confirmations c
JOIN source_modules sm ON c.source_module_id = sm.module_id
GROUP BY c.field;
END;
$$ LANGUAGE plpgsql;

COMMENT ON VIEW vessel_confirmations IS
'Tracks how many sources confirm the same vessel data. More confirmations = higher confidence.
This is NOT duplicate detection - multiple reports are valuable intelligence confirmations!';

8. Query Interface

-- High-performance vessel search across all sources
CREATE OR REPLACE FUNCTION search_vessels(
p_criteria JSONB,
p_point_in_time TIMESTAMPTZ DEFAULT NOW()
) RETURNS TABLE (
vessel_id UUID,
vessel_data JSONB,
match_score NUMERIC
) AS $$
BEGIN
RETURN QUERY
WITH search_params AS (
SELECT
p_criteria->>'imo' as imo,
p_criteria->>'name' as name,
p_criteria->>'flag' as flag,
p_criteria->'sources' as sources
)
SELECT
v.vessel_id,
v.vessel_data,
-- Calculate match score
CASE
WHEN v.identifiers->>'imo' = sp.imo THEN 1.0
WHEN v.vessel_data->>'name' ILIKE '%' || sp.name || '%' THEN 0.8
ELSE similarity(v.vessel_data->>'name', sp.name)
END as match_score
FROM vessel_current_state v, search_params sp
WHERE
-- Use indexes efficiently
(sp.imo IS NULL OR v.identifiers->>'imo' = sp.imo)
AND (sp.name IS NULL OR v.vessel_data->>'name' % sp.name) -- Trigram similarity
AND (sp.flag IS NULL OR v.vessel_data->>'flag' = sp.flag)
AND (sp.sources IS NULL OR v.contributing_sources @> sp.sources)
ORDER BY match_score DESC;
END;
$$ LANGUAGE plpgsql;

-- Get vessel history with changes
CREATE OR REPLACE FUNCTION vessel_history_timeline(p_vessel_id UUID)
RETURNS TABLE (
version INTEGER,
valid_from TIMESTAMPTZ,
valid_to TIMESTAMPTZ,
changed_by TEXT,
changes JSONB
) AS $$
BEGIN
RETURN QUERY
SELECT
h1.version,
h1.valid_from,
h1.valid_to,
sm.module_name as changed_by,
jsonb_deep_diff(h2.vessel_data, h1.vessel_data) as changes
FROM vessel_history h1
LEFT JOIN vessel_history h2 ON h1.vessel_id = h2.vessel_id
AND h2.version = h1.version - 1
LEFT JOIN source_modules sm ON h1.changed_by_source = sm.module_id
WHERE h1.vessel_id = p_vessel_id
ORDER BY h1.version DESC;
END;
$$ LANGUAGE plpgsql;

8. Performance Optimizations

-- Parallel processing for large imports
ALTER TABLE intelligence_events SET (parallel_workers = 4);
ALTER TABLE vessel_current_state SET (parallel_workers = 4);

-- Enable JIT compilation for complex JSONB operations
SET jit = on;
SET jit_above_cost = 100000;

-- Statistics for query planning
CREATE STATISTICS vessel_imo_stats (ndistinct)
ON (vessel_data->>'imo') FROM vessel_current_state;

-- Bloom filters for existence checks (PG13+)
CREATE INDEX idx_vessel_imo_bloom ON vessel_current_state
USING bloom ((vessel_data->>'imo')) WITH (length=80);

Benefits

PostgreSQL 17 Native Features Utilized

  1. JSONB Performance: SP-GIST indexes, path operators
  2. Partitioning: Automatic monthly partitions for events
  3. Generated Columns: Materialized computed fields
  4. MERGE: Efficient upserts without procedures
  5. Multirange Types: Complex temporal validity
  6. Statistics Objects: Better query planning
  7. Parallel Processing: Native parallelization

Architecture Benefits

  1. Schema-free Evolution: Add fields without migrations
  2. Complete History: Every change preserved
  3. Time Travel: Query any point in time
  4. Modular Sources: Add/remove sources dynamically
  5. Change Intelligence: Automatic diff detection
  6. Performance: Native indexes and partitioning

Implementation Migration

-- Migrate existing data to new architecture
INSERT INTO source_modules (module_name, module_type, capabilities)
SELECT
source_shortname,
CASE
WHEN source_types[1] = 'RFMO' THEN 'vessel_list'
WHEN source_types[1] IN ('WRO', 'SDN_GLOMAG') THEN 'sanctions'
ELSE 'other'
END,
source_types::jsonb
FROM original_sources_vessels;

-- Convert existing intelligence_reports to events
INSERT INTO intelligence_events (
event_type,
source_module_id,
event_data,
created_at
)
SELECT
'vessel_reported',
sm.module_id,
ir.raw_vessel_data,
ir.created_at
FROM intelligence_reports ir
JOIN original_sources_vessels osv ON ir.source_id = osv.source_id
JOIN source_modules sm ON osv.source_shortname = sm.module_name;

Consequences

Positive

  • Future-proof architecture using PG17 native features
  • No schema migrations needed
  • Complete audit trail
  • High-performance queries
  • Modular and extensible

Negative

  • Initial complexity
  • Storage requirements (mitigated by partitioning)
  • Learning curve for JSONB operations

Neutral

  • Different from traditional relational design
  • Requires PostgreSQL 17
  • Event sourcing mindset needed

Next Steps

  1. Create migration scripts from current schema
  2. Build import adapters for each source type
  3. Create monitoring dashboards
  4. Document query patterns
  5. Performance benchmark against current system