3-Tier IngestionData Quality GateTimescaleDBRedis Cache & Pub/Sub

Data Pipelines & Scheduler

Three-tier ingestion architecture feeds a five-check quality gate before storage. TimescaleDB holds time-series market data; Redis caches computed results and routes events between workers via pub/sub.

Data Pipeline Diagram

Ingestion → Quality Gate → TimescaleDB → Redis → Consumers

SuperIntel data pipeline diagram

3-Tier Ingestion Architecture

UnifiedIngestionService routes each request to the correct adapter based on asset class, then passes output through the Data Quality Gate before persisting to TimescaleDB.
TierAssetsSourceFrequencyTrigger
Tier 1CryptoBTC-USDT, ETH-USDTBinance CCXTEvery 5 minAPScheduler crypto_ingestion
Tier 2Macro / EquitySPY, QQQ, VIX, US10Y, DXYYFinanceHourly (market hours)APScheduler equity_ingestion
Tier 3Gap Fill (on-demand)Any asset with detected gapYFinance or CCXTOn-demandDataContinuityModule GAP_FILL_REQUEST

Data Quality Gate — 5 Checks

Any candle failing a check is quarantined and logged. Failed checks increment a metric counter; 3+ failures on the same ticker trigger a DATA_QUALITY_ALERT.
CheckValidates
1Completeness
All OHLCV fields present, no null values
2Freshness
Timestamp within expected recency window for asset class
3Validity
high ≥ low, volume > 0, no negative prices
4Consistency
close price between high and low bounds
5Volume
Volume meets minimum threshold; zero-volume candles flagged as suspicious

MarketData Value Object

Frozen dataclass — immutable after creation. Passed through the entire pipeline unchanged. Prevents accidental mutation in multi-stage processing.

@dataclass(frozen=True)
class MarketData:
ticker: str
timestamp: datetime
open: float
high: float
low: float
close: float
volume: float
asset_class: str # EQUITY | CRYPTO | FX | MACRO
source: str # binance | yfinance | databento

DataContinuityModule

Background service that periodically scans TimescaleDB hypertable partitions for temporal gaps in time-series data. When a gap is detected, it publishes a GAP_FILL_REQUEST event to Redis pub/sub, triggering a Tier 3 on-demand ingestion for the affected ticker and time window.

# Gap detection query (simplified)
SELECT ticker, bucket, LAG(bucket) OVER (PARTITION BY ticker ORDER BY bucket)
FROM time_bucket('5 minutes', timestamp) ...
HAVING DATEDIFF(bucket, prev_bucket) > expected_interval

TimescaleDB Configuration

Instance Configuration

base: PostgreSQL 17
extension: timescaledb
port: 5433
data_size: 9 GB+
chunks: 35,000+

Performance Settings

shared_buffers = 1GB
max_locks_per_transaction = 4096
timescaledb.max_background_workers = 8
effective_cache_size = 3GB

Redis — Cache & Event Bus

Cache Key Patterns & TTLs

Key PatternTTLSet By
sentiment:{ticker}1 hourPerplexitySentimentService
price:{ticker}:live30 secondsUnifiedIngestionService
cointegration:{pair}4 hoursStatisticalArbitrageService
correlation_matrix1 hourPortfolioOptimizer

Pub/Sub Channels

ChannelConsumerPayload
ARBITRAGE_DISCOVERYevent_workerNew arbitrage opportunity for immediate evaluation
SCHEDULED_RETRAINING_TRIGGERml_trainer_workerNightly ML model retraining kickoff
GAP_FILL_REQUESTingestion_workerDataContinuityModule detected a temporal gap
FILL_EVENTportfolio_workerOrder fill confirmation from broker adapter