3-Tier IngestionData Quality GateTimescaleDBRedis Cache & Pub/Sub
Data Pipelines & Scheduler
Three-tier ingestion architecture feeds a five-check quality gate before storage. TimescaleDB holds time-series market data; Redis caches computed results and routes events between workers via pub/sub.
Data Pipeline Diagram
Ingestion → Quality Gate → TimescaleDB → Redis → Consumers
3-Tier Ingestion Architecture
UnifiedIngestionService routes each request to the correct adapter based on asset class, then passes output through the Data Quality Gate before persisting to TimescaleDB.| Tier | Assets | Source | Frequency | Trigger |
|---|---|---|---|---|
| Tier 1Crypto | BTC-USDT, ETH-USDT | Binance CCXT | Every 5 min | APScheduler crypto_ingestion |
| Tier 2Macro / Equity | SPY, QQQ, VIX, US10Y, DXY | YFinance | Hourly (market hours) | APScheduler equity_ingestion |
| Tier 3Gap Fill (on-demand) | Any asset with detected gap | YFinance or CCXT | On-demand | DataContinuityModule GAP_FILL_REQUEST |
Data Quality Gate — 5 Checks
Any candle failing a check is quarantined and logged. Failed checks increment a metric counter; 3+ failures on the same ticker trigger a
DATA_QUALITY_ALERT.| Check | Validates |
|---|---|
1Completeness | All OHLCV fields present, no null values |
2Freshness | Timestamp within expected recency window for asset class |
3Validity | high ≥ low, volume > 0, no negative prices |
4Consistency | close price between high and low bounds |
5Volume | Volume meets minimum threshold; zero-volume candles flagged as suspicious |
MarketData Value Object
Frozen dataclass — immutable after creation. Passed through the entire pipeline unchanged. Prevents accidental mutation in multi-stage processing.
@dataclass(frozen=True)
class MarketData:
ticker: str
timestamp: datetime
open: float
high: float
low: float
close: float
volume: float
asset_class: str # EQUITY | CRYPTO | FX | MACRO
source: str # binance | yfinance | databento
DataContinuityModule
Background service that periodically scans TimescaleDB hypertable partitions for temporal gaps in time-series data. When a gap is detected, it publishes a GAP_FILL_REQUEST event to Redis pub/sub, triggering a Tier 3 on-demand ingestion for the affected ticker and time window.
# Gap detection query (simplified)
SELECT ticker, bucket, LAG(bucket) OVER (PARTITION BY ticker ORDER BY bucket)
FROM time_bucket('5 minutes', timestamp) ...
HAVING DATEDIFF(bucket, prev_bucket) > expected_interval
TimescaleDB Configuration
Instance Configuration
base: PostgreSQL 17
extension: timescaledb
port: 5433
data_size: 9 GB+
chunks: 35,000+
Performance Settings
shared_buffers = 1GB
max_locks_per_transaction = 4096
timescaledb.max_background_workers = 8
effective_cache_size = 3GB
Redis — Cache & Event Bus
Cache Key Patterns & TTLs
| Key Pattern | TTL | Set By |
|---|---|---|
sentiment:{ticker} | 1 hour | PerplexitySentimentService |
price:{ticker}:live | 30 seconds | UnifiedIngestionService |
cointegration:{pair} | 4 hours | StatisticalArbitrageService |
correlation_matrix | 1 hour | PortfolioOptimizer |
Pub/Sub Channels
| Channel | Consumer | Payload |
|---|---|---|
ARBITRAGE_DISCOVERY | event_worker | New arbitrage opportunity for immediate evaluation |
SCHEDULED_RETRAINING_TRIGGER | ml_trainer_worker | Nightly ML model retraining kickoff |
GAP_FILL_REQUEST | ingestion_worker | DataContinuityModule detected a temporal gap |
FILL_EVENT | portfolio_worker | Order fill confirmation from broker adapter |