Log Analytics Platform
CloudWatch + Lambda + Elasticsearch + Kibana
ℹ️
Project Difficulty: Advanced | Duration: 2-3 weeks | Cloud: AWS Build a production-grade log analytics platform processing millions of log entries daily with real-time search, visualization, and alerting capabilities.
Project Overview
Problem Statement
Applications generate massive volumes of logs that are difficult to search, analyze, and correlate. Without centralized log analytics, troubleshooting production issues takes hours, security incidents go undetected, and operational insights are lost.
Objectives
- Ingest 1M+ log entries per minute from multiple sources
- Provide sub-second search across all log data
- Enable real-time alerting on error patterns
- Support correlation across distributed systems
- Maintain 30-day log retention with cost optimization
Tech Stack
| Component | Technology | Purpose |
|---|---|---|
| Log Collection | CloudWatch + Fluentd | Log aggregation |
| Processing | Lambda + Kinesis | Stream processing |
| Search & Analytics | Elasticsearch | Full-text search |
| Visualization | Kibana | Dashboards and alerts |
| Storage | S3 + Glacier | Long-term retention |
Architecture Diagram
Data Source Setup and Schema
Log Schema Definition
# schemas/log_schema.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Dict, List
from enum import Enum
import json
class LogLevel(Enum):
DEBUG = "DEBUG"
INFO = "INFO"
WARN = "WARN"
ERROR = "ERROR"
FATAL = "FATAL"
class LogSource(Enum):
APPLICATION = "application"
ACCESS = "access"
SYSTEM = "system"
SECURITY = "security"
AUDIT = "audit"
@dataclass
class LogEntry:
timestamp: datetime
level: LogLevel
source: LogSource
message: str
service: str
environment: str
instance_id: Optional[str] = None
request_id: Optional[str] = None
user_id: Optional[str] = None
session_id: Optional[str] = None
trace_id: Optional[str] = None
span_id: Optional[str] = None
metadata: Dict = field(default_factory=dict)
stack_trace: Optional[str] = None
def to_dict(self) -> Dict:
return {
'timestamp': self.timestamp.isoformat(),
'level': self.level.value,
'source': self.source.value,
'message': self.message,
'service': self.service,
'environment': self.environment,
'instance_id': self.instance_id,
'request_id': self.request_id,
'user_id': self.user_id,
'session_id': self.session_id,
'trace_id': self.trace_id,
'span_id': self.span_id,
'metadata': self.metadata,
'stack_trace': self.stack_trace
}
def to_elasticsearch_doc(self) -> Dict:
"""Convert to Elasticsearch document format."""
doc = self.to_dict()
doc['@timestamp'] = doc.pop('timestamp')
doc['log_level'] = doc.pop('level')
return doc
@dataclass
class AccessLogEntry:
timestamp: datetime
method: str
path: str
status_code: int
response_time_ms: float
request_size_bytes: int
response_size_bytes: int
client_ip: str
user_agent: str
referer: Optional[str] = None
user_id: Optional[str] = None
request_id: Optional[str] = None
metadata: Dict = field(default_factory=dict)
def to_elasticsearch_doc(self) -> Dict:
return {
'@timestamp': self.timestamp.isoformat(),
'http_method': self.method,
'http_path': self.path,
'http_status_code': self.status_code,
'response_time_ms': self.response_time_ms,
'request_size_bytes': self.request_size_bytes,
'response_size_bytes': self.response_size_bytes,
'client_ip': self.client_ip,
'user_agent': self.user_agent,
'referer': self.referer,
'user_id': self.user_id,
'request_id': self.request_id,
'metadata': self.metadata,
'log_type': 'access'
}
class LogSchemaManager:
"""Manage Elasticsearch index mappings for log data."""
LOG_MAPPING = {
"mappings": {
"properties": {
"@timestamp": {"type": "date"},
"log_level": {
"type": "keyword"
},
"source": {
"type": "keyword"
},
"message": {
"type": "text",
"analyzer": "standard"
},
"service": {
"type": "keyword"
},
"environment": {
"type": "keyword"
},
"instance_id": {
"type": "keyword"
},
"request_id": {
"type": "keyword"
},
"user_id": {
"type": "keyword"
},
"session_id": {
"type": "keyword"
},
"trace_id": {
"type": "keyword"
},
"span_id": {
"type": "keyword"
},
"stack_trace": {
"type": "text",
"analyzer": "standard"
},
"metadata": {
"type": "object",
"dynamic": True
},
"host": {
"properties": {
"name": {"type": "keyword"},
"ip": {"type": "ip"},
"geo": {
"properties": {
"location": {"type": "geo_point"},
"country": {"type": "keyword"},
"city": {"type": "keyword"}
}
}
}
}
}
},
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"refresh_interval": "5s",
"index.lifecycle.name": "log-policy",
"index.lifecycle.rollover_alias": "logs"
}
}
ACCESS_LOG_MAPPING = {
"mappings": {
"properties": {
"@timestamp": {"type": "date"},
"http_method": {"type": "keyword"},
"http_path": {
"type": "text",
"fields": {
"keyword": {"type": "keyword"}
}
},
"http_status_code": {"type": "integer"},
"response_time_ms": {"type": "float"},
"request_size_bytes": {"type": "integer"},
"response_size_bytes": {"type": "integer"},
"client_ip": {"type": "ip"},
"user_agent": {
"type": "text",
"fields": {
"keyword": {"type": "keyword"}
}
},
"referer": {"type": "keyword"},
"user_id": {"type": "keyword"},
"request_id": {"type": "keyword"},
"metadata": {"type": "object", "dynamic": True},
"log_type": {"type": "keyword"}
}
},
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1,
"refresh_interval": "1s",
"index.lifecycle.name": "access-log-policy"
}
}
@classmethod
def get_ilm_policy(cls) -> Dict:
"""Get Index Lifecycle Management policy."""
return {
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_primary_shard_size": "30gb",
"max_age": "1d"
},
"set_priority": {
"priority": 100
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": {
"number_of_shards": 1
},
"forcemerge": {
"max_num_segments": 1
},
"set_priority": {
"priority": 50
}
}
},
"cold": {
"min_age": "30d",
"actions": {
"set_priority": {
"priority": 0
}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}
Step-by-Step Implementation Guide
Step 1: CloudWatch Log Subscription
# cloudwatch/log_subscription.py
import boto3
import json
import re
from datetime import datetime
from typing import Dict, List, Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CloudWatchLogSubscription:
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.cloudwatch = boto3.client('logs', region_name=region)
self.kinesis = boto3.client('kinesis', region_name=region)
def create_log_group(self, log_group_name: str, retention_days: int = 30):
"""Create CloudWatch log group with retention policy."""
try:
self.cloudwatch.create_log_group(
logGroupName=log_group_name,
tags={
'Environment': 'production',
'Project': 'log-analytics'
}
)
# Set retention policy
self.cloudwatch.put_retention_policy(
logGroupName=log_group_name,
retentionInDays=retention_days
)
logger.info(f"Created log group: {log_group_name}")
except self.cloudwatch.exceptions.ResourceAlreadyExistsException:
logger.info(f"Log group {log_group_name} already exists")
def create_subscription_filter(self, log_group_name: str,
destination_arn: str,
filter_pattern: str = ''):
"""Create subscription filter to forward logs to Kinesis."""
filter_name = f"{log_group_name.replace('/', '-')}-subscription"
try:
self.cloudwatch.put_subscription_filter(
logGroupName=log_group_name,
filterName=filter_name,
filterPattern=filter_pattern,
destinationArn=destination_arn,
distribution='ByLogStream'
)
logger.info(f"Created subscription filter: {filter_name}")
except Exception as e:
logger.error(f"Failed to create subscription filter: {e}")
raise
def setup_log_groups(self, config: Dict):
"""Set up all log groups and subscriptions."""
log_groups = [
'/app/errors',
'/app/access',
'/app/application',
'/system/metrics',
'/security/audit'
]
for log_group in log_groups:
full_path = f"{config['prefix']}{log_group}"
self.create_log_group(full_path, config.get('retention_days', 30))
self.create_subscription_filter(
full_path,
config['kinesis_stream_arn'],
config.get('filter_patterns', {}).get(log_group, '')
)
def create_metric_filter(self, log_group_name: str,
filter_pattern: str,
metric_namespace: str,
metric_name: str):
"""Create CloudWatch metric filter for monitoring."""
self.cloudwatch.put_metric_filter(
logGroupName=log_group_name,
filterName=f"{metric_name}-filter",
filterPattern=filter_pattern,
metricTransformations=[
{
'metricNamespace': metric_namespace,
'metricName': metric_name,
'metricValue': '1',
'defaultValue': 0
}
]
)
logger.info(f"Created metric filter: {metric_name}")
def create_log_anomaly_detector(self, log_group_name: str):
"""Create anomaly detection for log patterns."""
self.cloudwatch.put_anomaly_detector(
logGroupName=log_group_name,
metricTransformations=[
{
'metricNamespace': 'LogAnalytics/Anomalies',
'metricName': f'{log_group_name.split("/")[-1]}-anomaly',
'metricValue': '1'
}
]
)
Step 2: Lambda Log Processor
# lambda/log_processor.py
import json
import base64
import boto3
import re
from datetime import datetime
from typing import Dict, List, Optional
import gzip
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class LogProcessor:
def __init__(self):
self.es_client = None
self.s3_client = boto3.client('s3')
# Grok patterns for log parsing
self.grok_patterns = {
'access_log': r'(?<client_ip>\S+) \S+ \S+ \[(?<timestamp>[^\]]+)\] "(?<method>\S+) (?<path>\S+) (?<protocol>\S+)" (?<status_code>\d+) (?<response_size>\d+)',
'error_log': r'(?<timestamp>\S+ \S+) \[(?<level>\S+)\] \[(?<module>\S+)\] (?<message>.+)',
'application_log': r'(?<timestamp>\S+ \S+) (?<level>\S+) (?<service>\S+) \[(?<request_id>\S+)\] (?<message>.+)'
}
# Compile patterns
self.compiled_patterns = {
name: re.compile(pattern)
for name, pattern in self.grok_patterns.items()
}
def initialize_elasticsearch(self, endpoint: str,
username: str, password: str):
"""Initialize Elasticsearch client."""
from elasticsearch import Elasticsearch
self.es_client = Elasticsearch(
[endpoint],
http_auth=(username, password),
use_ssl=True,
verify_certs=True
)
logger.info(f"Connected to Elasticsearch: {endpoint}")
def parse_log_line(self, log_line: str, log_type: str) -> Optional[Dict]:
"""Parse a single log line using grok patterns."""
pattern = self.compiled_patterns.get(log_type)
if not pattern:
return None
match = pattern.match(log_line)
if not match:
return None
parsed = match.groupdict()
# Add metadata
parsed['raw_message'] = log_line
parsed['log_type'] = log_type
parsed['parsed_at'] = datetime.utcnow().isoformat()
return parsed
def enrich_log_entry(self, parsed_log: Dict) -> Dict:
"""Enrich log entry with additional context."""
enriched = parsed_log.copy()
# Add geo information from IP
if 'client_ip' in enriched:
enriched['geo'] = self._get_geo_from_ip(enriched['client_ip'])
# Add service name extraction
if 'path' in enriched:
enriched['service'] = self._extract_service_from_path(enriched['path'])
# Add severity scoring
if 'level' in enriched:
enriched['severity_score'] = self._calculate_severity(enriched['level'])
# Add timestamp parsing
if 'timestamp' in enriched:
enriched['parsed_timestamp'] = self._parse_timestamp(enriched['timestamp'])
return enriched
def _get_geo_from_ip(self, ip_address: str) -> Dict:
"""Get geographical information from IP address."""
# Simplified geo lookup - in production use MaxMind GeoIP
return {
'country': 'Unknown',
'city': 'Unknown',
'location': {'lat': 0.0, 'lon': 0.0}
}
def _extract_service_from_path(self, path: str) -> str:
"""Extract service name from URL path."""
parts = path.strip('/').split('/')
if len(parts) > 0:
return parts[0]
return 'unknown'
def _calculate_severity(self, level: str) -> int:
"""Calculate severity score for log level."""
severity_map = {
'DEBUG': 0,
'INFO': 1,
'WARN': 2,
'ERROR': 3,
'FATAL': 4
}
return severity_map.get(level.upper(), 0)
def _parse_timestamp(self, timestamp_str: str) -> str:
"""Parse various timestamp formats."""
formats = [
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S.%fZ',
'%d/%b/%Y:%H:%M:%S %z'
]
for fmt in formats:
try:
dt = datetime.strptime(timestamp_str, fmt)
return dt.isoformat()
except ValueError:
continue
return timestamp_str
def batch_index_to_elasticsearch(self, logs: List[Dict],
index_name: str):
"""Batch index logs to Elasticsearch."""
if not self.es_client or not logs:
return
# Prepare bulk request
actions = []
for log in logs:
action = {
"index": {
"_index": index_name,
"_type": "_doc"
}
}
actions.append(action)
actions.append(log)
# Execute bulk request
try:
response = self.es_client.bulk(operations=actions)
if response['errors']:
error_count = sum(1 for item in response['items']
if item['index']['status'] >= 400)
logger.warning(f"Bulk indexing had {error_count} errors")
else:
logger.info(f"Successfully indexed {len(logs)} documents")
except Exception as e:
logger.error(f"Bulk indexing failed: {e}")
raise
def process_kinesis_records(self, event: Dict) -> Dict:
"""Main Lambda handler for processing Kinesis records."""
processed_count = 0
error_count = 0
batch_logs = []
for record in event['Records']:
try:
# Decode Kinesis record
payload = base64.b64decode(record['kinesis']['data'])
# Handle compressed data
if record['kinesis'].get('compression') == 'gzip':
payload = gzip.decompress(payload)
log_data = json.loads(payload)
# Parse log line
log_line = log_data.get('message', '')
log_type = log_data.get('log_type', 'application_log')
parsed = self.parse_log_line(log_line, log_type)
if parsed:
# Enrich the log
enriched = self.enrich_log_entry(parsed)
# Add source metadata
enriched['source'] = {
'log_group': log_data.get('logGroup', ''),
'log_stream': log_data.get('logStream', ''),
'account_id': log_data.get('owner', '')
}
batch_logs.append(enriched)
processed_count += 1
except Exception as e:
error_count += 1
logger.error(f"Error processing record: {e}")
# Batch index to Elasticsearch
if batch_logs:
index_name = f"logs-{datetime.utcnow().strftime('%Y.%m.%d')}"
self.batch_index_to_elasticsearch(batch_logs, index_name)
return {
'statusCode': 200,
'body': {
'processed': processed_count,
'errors': error_count
}
}
Step 3: Elasticsearch Cluster Setup
# elasticsearch/cluster_setup.py
from elasticsearch import Elasticsearch
from elasticsearch.client import IndicesClient, ClusterClient
import json
from typing import Dict, List
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ElasticsearchClusterManager:
def __init__(self, endpoints: List[str],
username: str, password: str):
self.es = Elasticsearch(
endpoints,
http_auth=(username, password),
use_ssl=True,
verify_certs=True,
connection_class='RequestsHttpConnection'
)
self.indices = IndicesClient(self.es)
self.cluster = ClusterClient(self.es)
def create_log_indices(self):
"""Create index templates for log data."""
# Application logs template
app_logs_template = {
"index_patterns": ["logs-*"],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"refresh_interval": "5s",
"index.lifecycle.name": "log-policy",
"index.lifecycle.rollover_alias": "logs"
},
"mappings": {
"properties": {
"@timestamp": {"type": "date"},
"log_level": {"type": "keyword"},
"source": {"type": "keyword"},
"message": {"type": "text"},
"service": {"type": "keyword"},
"environment": {"type": "keyword"},
"host": {"type": "keyword"},
"trace_id": {"type": "keyword"},
"span_id": {"type": "keyword"},
"metadata": {"type": "object", "dynamic": True}
}
}
},
"priority": 100
}
self.indices.put_index_template(
name="logs-template",
body=app_logs_template
)
logger.info("Created logs index template")
# Access logs template
access_logs_template = {
"index_patterns": ["access-logs-*"],
"template": {
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1,
"refresh_interval": "1s"
},
"mappings": {
"properties": {
"@timestamp": {"type": "date"},
"client_ip": {"type": "ip"},
"http_method": {"type": "keyword"},
"http_path": {"type": "text"},
"http_status_code": {"type": "integer"},
"response_time_ms": {"type": "float"},
"user_agent": {"type": "text"}
}
}
},
"priority": 200
}
self.indices.put_index_template(
name="access-logs-template",
body=access_logs_template
)
def setup_ilm_policy(self):
"""Set up Index Lifecycle Management policy."""
ilm_policy = {
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_primary_shard_size": "30gb",
"max_age": "1d"
},
"set_priority": {"priority": 100}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": {"number_of_shards": 1},
"forcemerge": {"max_num_segments": 1},
"set_priority": {"priority": 50}
}
},
"cold": {
"min_age": "30d",
"actions": {
"set_priority": {"priority": 0}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}
self.es.ilm.put_lifecycle_policy(
name="log-policy",
body=ilm_policy
)
logger.info("Created ILM policy: log-policy")
def create_dashboards_and_visualizations(self):
"""Create Kibana saved objects."""
# This would typically be done via Kibana API or saved objects export
visualizations = [
{
"name": "Log Volume Over Time",
"type": "line",
"index": "logs-*",
"aggs": {
"time_buckets": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "5m"
}
}
}
},
{
"name": "Error Rate by Service",
"type": "pie",
"index": "logs-*",
"filters": {
"log_level": "ERROR"
},
"aggs": {
"by_service": {
"terms": {
"field": "service",
"size": 10
}
}
}
},
{
"name": "Response Time Distribution",
"type": "histogram",
"index": "access-logs-*",
"aggs": {
"response_time": {
"histogram": {
"field": "response_time_ms",
"interval": 100
}
}
}
}
]
return visualizations
def setup_alerting_rules(self):
"""Set up Elasticsearch alerting rules."""
alert_rules = [
{
"name": "High Error Rate",
"trigger": {
"condition": {
"script": {
"source": "ctx.payload.hits.total > 100"
}
},
"schedule": {
"interval": "5m"
}
},
"actions": [
{
"name": "Slack Notification",
"throttle_period": "15m",
"webhook": {
"scheme": "https",
"host": "hooks.slack.com",
"port": 443,
"path": "/services/xxx",
"body": '{"text": "High error rate detected: {{ctx.payload.hits.total}} errors"}'
}
}
]
},
{
"name": "Slow Response Times",
"trigger": {
"condition": {
"script": {
"source": "ctx.payload.aggregations.avg_response.value > 1000"
}
},
"schedule": {
"interval": "10m"
}
},
"actions": [
{
"name": "Email Alert",
"email": {
"to": ["ops@company.com"],
"subject": "Slow Response Times Detected",
"body": "Average response time: {{ctx.payload.aggregations.avg_response.value}}ms"
}
}
]
}
]
return alert_rules
def create_saved_queries(self):
"""Create saved queries for common log analysis."""
saved_queries = [
{
"name": "Errors in Last Hour",
"query": {
"bool": {
"filter": [
{"range": {"@timestamp": {"gte": "now-1h"}}},
{"term": {"log_level": "ERROR"}}
]
}
}
},
{
"name": "Slow API Calls",
"query": {
"bool": {
"filter": [
{"range": {"response_time_ms": {"gte": 1000}}},
{"term": {"log_type": "access"}}
]
}
}
},
{
"name": "Failed Login Attempts",
"query": {
"bool": {
"filter": [
{"match_phrase": {"message": "failed login"}},
{"range": {"@timestamp": {"gte": "now-24h"}}}
]
}
}
}
]
return saved_queries
def health_check(self) -> Dict:
"""Perform comprehensive cluster health check."""
health = self.cluster.health()
indices_stats = self.indices.stats(index="logs-*")
return {
'cluster_status': health['status'],
'node_count': health['number_of_nodes'],
'active_shards': health['active_primary_shards'],
'relocating_shards': health['relocating_shards'],
'pending_tasks': health['number_of_pending_tasks'],
'indices_count': len(indices_stats['indices']),
'total_docs': sum(
stats['primaries']['docs']['count']
for stats in indices_stats['indices'].values()
),
'total_size_bytes': sum(
stats['primaries']['store']['size_in_bytes']
for stats in indices_stats['indices'].values()
)
}
Infrastructure Setup (Terraform)
# infrastructure/log_analytics.tf
terraform {
required_version = ">= 1.5.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
elastic = {
source = "elastic/ec"
version = "~> 0.5.0"
}
}
}
# Variables
variable "environment" {
description = "Environment name"
type = string
default = "production"
}
variable "project_name" {
description = "Project name"
type = string
default = "log-analytics"
}
# VPC for Elasticsearch
module "vpc" {
source = "terraform-aws-modules/vpc/aws"
version = "5.0.0"
name = "${var.project_name}-vpc"
cidr = "10.0.0.0/16"
azs = ["us-east-1a", "us-east-1b", "us-east-1c"]
private_subnets = ["10.0.1.0/24", "10.0.2.0/24", "10.0.3.0/24"]
public_subnets = ["10.0.101.0/24", "10.0.102.0/24", "10.0.103.0/24"]
enable_nat_gateway = true
single_nat_gateway = var.environment != "production"
enable_dns_hostnames = true
enable_dns_support = true
tags = {
Project = var.project_name
}
}
# Elasticsearch Domain
resource "aws_elasticsearch_domain" "logs" {
domain_name = "${var.project_name}-logs"
elasticsearch_version = "7.17"
cluster_config {
instance_type = "m6g.large.elasticsearch"
instance_count = var.environment == "production" ? 6 : 3
dedicated_master_enabled = var.environment == "production"
master_instance_type = "m6g.large.elasticsearch"
master_instance_count = 3
zone_awareness_enabled = true
}
ebs_options {
ebs_enabled = true
volume_type = "gp3"
volume_size = 100
}
vpc_options {
subnet_ids = module.vpc.private_subnets
security_group_ids = [aws_security_group.elasticsearch.id]
}
encrypt_at_rest {
enabled = true
}
node_to_node_encryption {
enabled = true
}
domain_endpoint_options {
enforce_https = true
tls_security_policy = "Policy-Min-TLS-1-2-2019-07"
}
access_policies = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Principal = {
AWS = aws_iam_role.elasticsearch.arn
}
Action = "es:*"
Resource = "${aws_elasticsearch_domain.logs.arn}/*"
}
]
})
tags = {
Environment = var.environment
Project = var.project_name
}
}
# Security Group for Elasticsearch
resource "aws_security_group" "elasticsearch" {
name_prefix = "${var.project_name}-es-"
vpc_id = module.vpc.vpc_id
ingress {
from_port = 443
to_port = 443
protocol = "tcp"
cidr_blocks = [module.vpc.vpc_cidr_block]
description = "HTTPS"
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
tags = {
Name = "${var.project_name}-es-sg"
}
}
# IAM Role for Elasticsearch
resource "aws_iam_role" "elasticsearch" {
name = "${var.project_name}-elasticsearch-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "es.amazonaws.com"
}
}
]
})
}
resource "aws_iam_role_policy" "elasticsearch" {
name = "${var.project_name}-elasticsearch-policy"
role = aws_iam_role.elasticsearch.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"es:ESHttp*"
]
Resource = "${aws_elasticsearch_domain.logs.arn}/*"
}
]
})
}
# Kinesis Data Streams for log ingestion
resource "aws_kinesis_stream" "log_stream" {
name = "${var.project_name}-log-stream"
shard_count = var.environment == "production" ? 10 : 2
retention_period = 48
stream_mode_details {
stream_mode = "PROVISIONED"
}
tags = {
Environment = var.environment
Project = var.project_name
}
}
# Lambda function for log processing
resource "aws_lambda_function" "log_processor" {
function_name = "${var.project_name}-processor"
role = aws_iam_role.lambda_processor.arn
handler = "log_processor.process_kinesis_records"
runtime = "python3.9"
filename = data.archive_file.lambda_processor.output_path
source_code_hash = data.archive_file.lambda_processor.output_base64sha256
memory_size = 1024
timeout = 300
vpc_config {
subnet_ids = module.vpc.private_subnets
security_group_ids = [aws_security_group.lambda.id]
}
environment {
variables = {
ELASTICSEARCH_ENDPOINT = aws_elasticsearch_domain.logs.endpoint
ELASTICSEARCH_USERNAME = "master"
ELASTICSEARCH_PASSWORD = var.elasticsearch_password
}
}
tags = {
Environment = var.environment
Project = var.project_name
}
}
# Lambda event source mapping
resource "aws_lambda_event_source_mapping" "kinesis_trigger" {
event_source_arn = aws_kinesis_stream.log_stream.arn
function_name = aws_lambda_function.log_processor.arn
starting_position = "LATEST"
batch_size = 100
maximum_batching_window_in_seconds = 5
enabled = true
}
# IAM Role for Lambda
resource "aws_iam_role" "lambda_processor" {
name = "${var.project_name}-lambda-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
}
]
})
}
resource "aws_iam_role_policy" "lambda_processor" {
name = "${var.project_name}-lambda-policy"
role = aws_iam_role.lambda_processor.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:DescribeStream",
"kinesis:ListStreams"
]
Resource = aws_kinesis_stream.log_stream.arn
},
{
Effect = "Allow"
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
Resource = "arn:aws:logs:*:*:*"
},
{
Effect = "Allow"
Action = [
"ec2:CreateNetworkInterface",
"ec2:DescribeNetworkInterfaces",
"ec2:DeleteNetworkInterface"
]
Resource = "*"
}
]
})
}
# CloudWatch Log Groups
resource "aws_cloudwatch_log_group" "app_errors" {
name = "/app/errors"
retention_in_days = 30
tags = {
Environment = var.environment
Project = var.project_name
}
}
resource "aws_cloudwatch_log_group" "app_access" {
name = "/app/access"
retention_in_days = 30
tags = {
Environment = var.environment
Project = var.project_name
}
}
# Subscription filters
resource "aws_cloudwatch_log_subscription_filter" "errors_to_kinesis" {
name = "errors-to-kinesis"
log_group_name = aws_cloudwatch_log_group.app_errors.name
filter_pattern = ""
destination_arn = aws_kinesis_stream.log_stream.arn
}
resource "aws_cloudwatch_log_subscription_filter" "access_to_kinesis" {
name = "access-to-kinesis"
log_group_name = aws_cloudwatch_log_group.app_access.name
filter_pattern = ""
destination_arn = aws_kinesis_stream.log_stream.arn
}
# S3 bucket for long-term log storage
resource "aws_s3_bucket" "log_archive" {
bucket = "${var.project_name}-log-archive-${var.environment}"
tags = {
Environment = var.environment
Project = var.project_name
}
}
resource "aws_s3_bucket_lifecycle_configuration" "log_archive" {
bucket = aws_s3_bucket.log_archive.id
rule {
id = "transition-to-glacier"
status = "Enabled"
transition {
days = 30
storage_class = "STANDARD_IA"
}
transition {
days = 90
storage_class = "GLACIER"
}
expiration {
days = 365
}
}
}
# Outputs
output "elasticsearch_endpoint" {
description = "Elasticsearch endpoint"
value = aws_elasticsearch_domain.logs.endpoint
}
output "kinesis_stream_arn" {
description = "Kinesis stream ARN"
value = aws_kinesis_stream.log_stream.arn
}
output "lambda_function_name" {
description = "Lambda function name"
value = aws_lambda_function.log_processor.function_name
}
Testing and Validation
# tests/test_log_analytics.py
import pytest
import json
import time
from datetime import datetime, timedelta
from unittest.mock import Mock, patch, MagicMock
class TestLogAnalytics:
@pytest.fixture
def sample_logs(self):
return [
{
"timestamp": "2024-01-15 10:30:45",
"level": "ERROR",
"service": "user-service",
"message": "Failed to connect to database",
"request_id": "req-123",
"stack_trace": "java.sql.SQLException: Connection refused"
},
{
"timestamp": "2024-01-15 10:30:46",
"level": "INFO",
"service": "api-gateway",
"message": "GET /api/users 200 45ms",
"request_id": "req-124",
"http_method": "GET",
"http_path": "/api/users",
"response_time_ms": 45
}
]
@pytest.fixture
def log_processor(self):
from lambda.log_processor import LogProcessor
return LogProcessor()
def test_log_parsing(self, log_processor):
"""Test log line parsing."""
test_lines = {
'access_log': '192.168.1.1 - - [15/Jan/2024:10:30:45 +0000] "GET /api/users HTTP/1.1" 200 1234',
'error_log': '2024-01-15 10:30:45 [ERROR] [database] Connection failed',
'application_log': '2024-01-15 10:30:45 INFO user-service [req-123] User created successfully'
}
for log_type, log_line in test_lines.items():
parsed = log_processor.parse_log_line(log_line, log_type)
assert parsed is not None
assert 'timestamp' in parsed
assert 'message' in parsed
def test_log_enrichment(self, log_processor):
"""Test log entry enrichment."""
parsed_log = {
'timestamp': '2024-01-15 10:30:45',
'level': 'ERROR',
'message': 'Connection failed',
'client_ip': '192.168.1.1',
'path': '/api/users'
}
enriched = log_processor.enrich_log_entry(parsed_log)
assert 'geo' in enriched
assert 'service' in enriched
assert 'severity_score' in enriched
assert enriched['severity_score'] == 3 # ERROR level
def test_batch_indexing(self, log_processor):
"""Test batch indexing to Elasticsearch."""
mock_es = Mock()
log_processor.es_client = mock_es
mock_es.bulk.return_value = {'errors': False, 'items': []}
logs = [
{'@timestamp': datetime.now().isoformat(), 'message': 'Test log 1'},
{'@timestamp': datetime.now().isoformat(), 'message': 'Test log 2'}
]
log_processor.batch_index_to_elasticsearch(logs, 'test-logs')
mock_es.bulk.assert_called_once()
def test_kinesis_processing(self, log_processor):
"""Test Kinesis record processing."""
import base64
# Create mock Kinesis event
log_data = json.dumps({
'message': '2024-01-15 10:30:45 ERROR [app] Test error message',
'logGroup': '/app/errors',
'logStream': 'app-stream',
'owner': '123456789'
})
event = {
'Records': [
{
'kinesis': {
'data': base64.b64encode(log_data.encode()).decode(),
'sequenceNumber': '12345',
'partitionKey': 'test'
}
}
]
}
mock_es = Mock()
log_processor.es_client = mock_es
mock_es.bulk.return_value = {'errors': False, 'items': []}
result = log_processor.process_kinesis_records(event)
assert result['statusCode'] == 200
assert result['body']['processed'] == 1
def test_cloudwatch_metric_filter(self):
"""Test CloudWatch metric filter creation."""
import boto3
from moto import mock_logs
@mock_logs
def test_metric_filter():
client = boto3.client('logs', region_name='us-east-1')
# Create log group
client.create_log_group(logGroupName='/test/app')
# Create metric filter
client.put_metric_filter(
logGroupName='/test/app',
filterName='error-count',
filterPattern='[level=ERROR, message]',
metricTransformations=[{
'metricNamespace': 'LogAnalytics',
'metricName': 'ErrorCount',
'metricValue': '1'
}]
)
# Verify filter was created
filters = client.describe_metric_filters(
logGroupNamePrefix='/test/app'
)
assert len(filters['metricFilters']) == 1
test_metric_filter()
def test_elasticsearch_index_lifecycle(self):
"""Test ILM policy configuration."""
ilm_policy = {
"policy": {
"phases": {
"hot": {"min_age": "0ms"},
"warm": {"min_age": "7d"},
"cold": {"min_age": "30d"},
"delete": {"min_age": "90d"}
}
}
}
# Validate policy structure
assert 'policy' in ilm_policy
assert 'phases' in ilm_policy['policy']
assert len(ilm_policy['policy']['phases']) == 4
# Validate phase order
phases = list(ilm_policy['policy']['phases'].keys())
assert phases == ['hot', 'warm', 'cold', 'delete']
def test_alerting_rule_configuration(self):
"""Test alerting rule configuration."""
alert_rule = {
"name": "High Error Rate",
"condition": {
"threshold": 100,
"time_window": "5m"
},
"actions": [
{"type": "slack", "channel": "#alerts"},
{"type": "email", "recipients": ["ops@company.com"]}
]
}
# Validate rule structure
assert 'name' in alert_rule
assert 'condition' in alert_rule
assert 'actions' in alert_rule
assert len(alert_rule['actions']) == 2
def test_log_search_query(self):
"""Test Elasticsearch search query construction."""
search_query = {
"query": {
"bool": {
"must": [
{"match_phrase": {"service": "user-service"}},
{"range": {"@timestamp": {"gte": "now-1h"}}}
],
"filter": [
{"term": {"log_level": "ERROR"}}
]
}
},
"aggs": {
"errors_over_time": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "5m"
}
}
}
}
# Validate query structure
assert 'query' in search_query
assert 'bool' in search_query['query']
assert 'must' in search_query['query']['bool']
assert 'filter' in search_query['query']['bool']
assert 'aggs' in search_query
def test_log_aggregation_pipeline(self):
"""Test log aggregation pipeline configuration."""
aggregation_pipeline = [
{"$match": {"log_level": "ERROR"}},
{"$group": {
"_id": "$service",
"error_count": {"$sum": 1},
"first_error": {"$min": "$@timestamp"},
"last_error": {"$max": "$@timestamp"}
}},
{"$sort": {"error_count": -1}},
{"$limit": 10}
]
# Validate pipeline stages
stages = [list(stage.keys())[0] for stage in aggregation_pipeline]
assert stages == ['$match', '$group', '$sort', '$limit']
Cost Analysis
Monthly Cost Breakdown (Production)
| Component | Specification | Monthly Cost |
|---|---|---|
| Elasticsearch | 6x m6g.large | $1,200 |
| Kinesis | 10 shards | $300 |
| Lambda | 1M invocations | $50 |
| CloudWatch | 100GB logs | $50 |
| S3 Storage | 5TB archive | $115 |
| Data Transfer | Cross-AZ | $100 |
| Total | $1,815 |
Cost Optimization Strategies
💡
Tip: Optimize log analytics costs with these strategies:
- Log Sampling: Sample verbose logs at 10%
- Compression: Use gzip for 90% size reduction
- Index Lifecycle: Auto-delete old indices
- Hot-Warm-Cold: Tiered storage architecture
- Reserved Instances: 40% savings on Elasticsearch
Performance Metrics
| Metric | Before | After | Improvement |
|---|---|---|---|
| Search Latency | 30 seconds | 200ms | 150x faster |
| Ingestion Rate | 10K/min | 1M/min | 100x faster |
| Storage Cost | 100/TB | 80% cheaper | |
| Alert Latency | 15 minutes | 1 minute | 15x faster |
Interview Talking Points
Architecture Decisions
ℹ️
Best Practice: Highlight these log analytics concepts in interviews:
-
Why Elasticsearch over CloudWatch Insights?
- Better full-text search capabilities
- More flexible query language
- Custom visualizations and dashboards
- Cross-index correlation
-
Why Kinesis over direct CloudWatch subscription?
- Better buffering and replay capability
- More flexible processing with Lambda
- Cost-efficient for high volume
- Custom partitioning strategies
-
Why hot-warm-cold architecture?
- Cost optimization for tiered storage
- Performance optimization for recent data
- Compliance requirements for retention
- Automatic data lifecycle management
Common Interview Questions
Q: "How do you handle log rotation and retention?"
retention_strategy = {
"Hot": "7 days, full replicas, SSD storage",
"Warm": "30 days, reduced replicas, forced merge",
"Cold": "90 days, single replica, minimal resources",
"Delete": "90+ days, auto-delete via ILM"
}
Q: "How do you ensure log data quality?"
data_quality_checks = {
"Schema Validation": "Verify log format matches expected pattern",
"Completeness": "Check for missing required fields",
"Freshness": "Monitor ingestion lag and alert on delays",
"Deduplication": "Remove duplicate logs using request_id"
}
Q: "How do you scale for peak loads?"
scaling_strategies = {
"Kinesis": "Auto-scaling shards based on throughput",
"Lambda": "Concurrent execution limits",
"Elasticsearch": "Auto-scaling with warm nodes",
"Buffering": "S3 dead-letter queue for overflow"
}
Deployment Checklist
- Set up VPC and security groups
- Deploy Elasticsearch cluster
- Configure Kinesis streams
- Deploy Lambda processor
- Set up CloudWatch log groups
- Configure subscription filters
- Create index templates and ILM policies
- Set up Kibana dashboards
- Configure alerting rules
- Test end-to-end log flow
⚠️
Warning: Always test log parsing patterns thoroughly before production. Incorrect parsing can lead to data loss or incorrect analytics.
This project demonstrates log analytics platform skills and is highly relevant for DevOps and data engineering interviews at companies with large-scale logging requirements.