Time Series Databases
Time series data is everywhere β metrics, IoT sensors, financial prices. Specialized databases handle the unique patterns of temporal data efficiently.
Time Series Data Patterns
InfluxDB
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import pandas as pd
from datetime import datetime, timedelta
class InfluxDBManager:
def __init__(self, url, token, org, bucket):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.query_api = self.client.query_api()
self.org = org
self.bucket = bucket
def write_point(self, measurement, tags, fields, timestamp=None):
"""Write a single data point"""
point = Point(measurement)
for tag_key, tag_value in tags.items():
point = point.tag(tag_key, tag_value)
for field_key, field_value in fields.items():
point = point.field(field_key, field_value)
if timestamp:
point = point.time(timestamp)
self.write_api.write(self.bucket, self.org, point)
def write_batch(self, measurement, data_list):
"""Write multiple points efficiently"""
points = []
for data in data_list:
point = Point(measurement)
for tag_key, tag_value in data.get('tags', {}).items():
point = point.tag(tag_key, tag_value)
for field_key, field_value in data['fields'].items():
point = point.field(field_key, field_value)
if 'timestamp' in data:
point = point.time(data['timestamp'])
points.append(point)
self.write_api.write(self.bucket, self.org, points)
def query(self, flux_query):
"""Execute a Flux query"""
tables = self.query_api.query(flux_query, org=self.org)
results = []
for table in tables:
for record in table.records:
results.append({
'time': record.get_time(),
'measurement': record.get_measurement(),
'field': record.get_field(),
'value': record.get_value(),
**{k: v for k, v in record.values.items()
if k not in ['time', '_measurement', '_field', '_value']}
})
return pd.DataFrame(results)
def get_cpu_metrics(self, host, hours=24):
"""Query CPU metrics for a specific host"""
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -{hours}h)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r.host == "{host}")
|> filter(fn: (r) => r._field == "usage_percent")
|> aggregateWindow(every: 1m, fn: mean)
'''
return self.query(query)
def downsample(self, measurement, from_interval, to_interval):
"""Downsample data to reduce storage"""
task_name = f"downsample_{measurement}_{from_interval}_{to_interval}"
flux = f'''
option task = {{name: "{task_name}", every: {to_interval}}}
from(bucket: "{self.bucket}")
|> range(start: -{from_interval})
|> filter(fn: (r) => r._measurement == "{measurement}")
|> aggregateWindow(every: {to_interval}, fn: mean, createEmpty: false)
|> to(bucket: "{self.bucket}", org: "{self.org}")
'''
return flux
# Usage
manager = InfluxDBManager(
url="http://localhost:8086",
token="my-token",
org="my-org",
bucket="metrics"
)
# Write metrics
manager.write_point(
measurement="cpu",
tags={"host": "server-01", "region": "us-east"},
fields={"usage_percent": 75.3, "cores": 8}
)
# Query data
df = manager.get_cpu_metrics("server-01", hours=24)
TimescaleDB
import psycopg2
from psycopg2 import sql
import pandas as pd
from datetime import datetime
class TimescaleDBManager:
def __init__(self, dbname, user, password, host, port):
self.conn = psycopg2.connect(
dbname=dbname, user=user, password=password,
host=host, port=port
)
self.conn.autocommit = True
def create_hypertable(self, table_name, time_column='time'):
"""Convert a table to a TimescaleDB hypertable"""
with self.conn.cursor() as cur:
cur.execute(f"""
SELECT create_hypertable('{table_name}', '{time_column}',
if_not_exists => TRUE);
""")
def create_continuous_aggregate(self, view_name, source_table,
time_bucket='1 hour'):
"""Create a continuous aggregate for efficient querying"""
query = f"""
CREATE MATERIALIZED VIEW {view_name}
WITH (timescaledb.continuous) AS
SELECT
time_bucket('{time_bucket}', time) AS bucket,
device_id,
AVG(temperature) AS avg_temp,
MAX(temperature) AS max_temp,
MIN(temperature) AS min_temp,
COUNT(*) AS sample_count
FROM {source_table}
GROUP BY bucket, device_id;
"""
with self.conn.cursor() as cur:
cur.execute(query)
def add_retention_policy(self, table_name, interval='30 days'):
"""Automatically drop old data"""
query = f"""
SELECT add_retention_policy('{table_name}', INTERVAL '{interval}');
"""
with self.conn.cursor() as cur:
cur.execute(query)
def add_compression(self, table_name, compress_after='7 days'):
"""Compress old data to save space"""
query = f"""
ALTER TABLE {table_name} SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'device_id',
timescaledb.compress_orderby = 'time DESC'
);
SELECT add_compression_policy('{table_name}',
INTERVAL '{compress_after}');
"""
with self.conn.cursor() as cur:
cur.execute(query)
def query_timeseries(self, device_id, start_time, end_time,
bucket='5 minutes'):
"""Query time-bucketed data"""
query = f"""
SELECT
time_bucket('{bucket}', time) AS bucket,
AVG(temperature) AS avg_temp,
MAX(humidity) AS max_humidity,
COUNT(*) AS samples
FROM sensor_data
WHERE device_id = %s
AND time BETWEEN %s AND %s
GROUP BY bucket
ORDER BY bucket;
"""
return pd.read_sql(query, self.conn, params=(device_id, start_time, end_time))
def detect_anomalies(self, table_name, z_threshold=3.0):
"""Detect anomalous values using z-score"""
query = f"""
WITH stats AS (
SELECT
AVG(temperature) AS mean_temp,
STDDEV(temperature) AS std_temp
FROM {table_name}
WHERE time > NOW() - INTERVAL '24 hours'
)
SELECT time, temperature, device_id,
(temperature - mean_temp) / std_temp AS z_score
FROM {table_name}, stats
WHERE ABS((temperature - mean_temp) / std_temp) > {z_threshold}
ORDER BY time DESC;
"""
return pd.read_sql(query, self.conn)
# Usage
db = TimescaleDBManager(
dbname="metrics", user="admin", password="secret",
host="localhost", port=5432
)
db.create_hypertable("sensor_data", "time")
db.add_retention_policy("sensor_data", "90 days")
db.add_compression("sensor_data", "7 days")
db.create_continuous_aggregate("hourly_stats", "sensor_data", "1 hour")
Prometheus
import requests
import json
from datetime import datetime, timedelta
class PrometheusClient:
def __init__(self, base_url):
self.base_url = base_url.rstrip('/')
def query_instant(self, promql):
"""Execute an instant query"""
response = requests.get(
f"{self.base_url}/api/v1/query",
params={"query": promql}
)
return response.json()
def query_range(self, promql, start, end, step="15s"):
"""Execute a range query"""
response = requests.get(
f"{self.base_url}/api/v1/query_range",
params={
"query": promql,
"start": start,
"end": end,
"step": step
}
)
return response.json()
def get_cpu_usage(self, instance, hours=1):
"""Get CPU usage for an instance"""
promql = f'100 - (avg by(instance) (irate(node_cpu_seconds_total{{instance="{instance}",mode="idle"}}[5m])) * 100)'
end = datetime.now()
start = end - timedelta(hours=hours)
result = self.query_range(promql, start.isoformat(), end.isoformat())
if result['status'] == 'success':
return result['data']['result']
return []
def get_memory_usage(self, instance):
"""Get memory usage percentage"""
promql = f'(1 - node_memory_MemAvailable_bytes{{instance="{instance}"}} / node_memory_MemTotal_bytes{{instance="{instance}"}}) * 100'
result = self.query_instant(promql)
return result
def get_disk_usage(self, instance, mountpoint='/'):
"""Get disk usage percentage"""
promql = f'(1 - node_filesystem_avail_bytes{{instance="{instance}",mountpoint="{mountpoint}"}} / node_filesystem_size_bytes{{instance="{instance}",mountpoint="{mountpoint}"}}) * 100'
result = self.query_instant(promql)
return result
# Alerting rules
ALERT_RULES = """
groups:
- name: ml_system_alerts
rules:
- alert: HighPredictionLatency
expr: histogram_quantile(0.99, rate(ml_prediction_duration_seconds_bucket[5m])) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "High prediction latency"
- alert: ModelAccuracyDrop
expr: ml_model_accuracy < 0.85
for: 10m
labels:
severity: critical
annotations:
summary: "Model accuracy below threshold"
- alert: DataDriftDetected
expr: ml_data_drift_score > 0.3
for: 5m
labels:
severity: warning
annotations:
summary: "Data drift detected"
"""
Query Patterns
# Time series specific queries
# Resampling
def resample_timeseries(df, rule='5T', method='mean'):
"""Resample time series to different frequency"""
df.index = pd.to_datetime(df.index)
return df.resample(rule).agg(method)
# Rolling statistics
def rolling_features(df, windows=[7, 14, 30]):
"""Compute rolling statistics"""
features = pd.DataFrame(index=df.index)
for window in windows:
features[f'rolling_mean_{window}'] = df['value'].rolling(window).mean()
features[f'rolling_std_{window}'] = df['value'].rolling(window).std()
features[f'rolling_min_{window}'] = df['value'].rolling(window).min()
features[f'rolling_max_{window}'] = df['value'].rolling(window).max()
return features
# Lag features
def lag_features(df, lags=[1, 7, 14, 30]):
"""Create lag features"""
features = pd.DataFrame(index=df.index)
for lag in lags:
features[f'lag_{lag}'] = df['value'].shift(lag)
return features
# Exponential moving average
def ewm_features(df, spans=[7, 14, 30]):
"""Exponential weighted moving average"""
features = pd.DataFrame(index=df.index)
for span in spans:
features[f'ewm_{span}'] = df['value'].ewm(span=span).mean()
return features
Best Practices
- Use hypertables in TimescaleDB for automatic partitioning
- Downsample aggressively for older data to save storage
- Set retention policies to automatically clean up old data
- Create continuous aggregates for common query patterns
- Monitor query performance and add indexes as needed