🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Log Analytics Platform (CloudWatch + Lambda + Elasticsearch)

Data Engineering ProjectsLog Analytics⭐ Premium

Advertisement

Log Analytics Platform

CloudWatch + Lambda + Elasticsearch + Kibana

ℹ️

Project Difficulty: Advanced | Duration: 2-3 weeks | Cloud: AWS Build a production-grade log analytics platform processing millions of log entries daily with real-time search, visualization, and alerting capabilities.

Project Overview

Problem Statement

Applications generate massive volumes of logs that are difficult to search, analyze, and correlate. Without centralized log analytics, troubleshooting production issues takes hours, security incidents go undetected, and operational insights are lost.

Objectives

  1. Ingest 1M+ log entries per minute from multiple sources
  2. Provide sub-second search across all log data
  3. Enable real-time alerting on error patterns
  4. Support correlation across distributed systems
  5. Maintain 30-day log retention with cost optimization

Tech Stack

ComponentTechnologyPurpose
Log CollectionCloudWatch + FluentdLog aggregation
ProcessingLambda + KinesisStream processing
Search & AnalyticsElasticsearchFull-text search
VisualizationKibanaDashboards and alerts
StorageS3 + GlacierLong-term retention

Architecture Diagram

LOG SOURCESEC2/ECS LogsLambda LogsAPI Gateway LogsRDS/CloudWatch LogsCLOUDWATCH LOGS/app/errors Log Group/app/access Log Group/system/metrics Log GroupKINESIS DATA STREAMSRaw Logs StreamParsed Logs StreamEnriched Logs StreamLAMBDA PROCESSORSLog Parser (Regex/Grok)Log Enricher (GeoIP)Alert Detector (Patterns)ELASTICSEARCH CLUSTERlogs-YYYY.MM.DD (Daily)metrics-YYYY.MM (Daily)alerts-index (Alerts)KIBANADiscover (Search)Dashboards (Monitor)Visualizations (Charts)Alerts (Notify)

Data Source Setup and Schema

Log Schema Definition

# schemas/log_schema.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Dict, List
from enum import Enum
import json

class LogLevel(Enum):
    DEBUG = "DEBUG"
    INFO = "INFO"
    WARN = "WARN"
    ERROR = "ERROR"
    FATAL = "FATAL"

class LogSource(Enum):
    APPLICATION = "application"
    ACCESS = "access"
    SYSTEM = "system"
    SECURITY = "security"
    AUDIT = "audit"

@dataclass
class LogEntry:
    timestamp: datetime
    level: LogLevel
    source: LogSource
    message: str
    service: str
    environment: str
    instance_id: Optional[str] = None
    request_id: Optional[str] = None
    user_id: Optional[str] = None
    session_id: Optional[str] = None
    trace_id: Optional[str] = None
    span_id: Optional[str] = None
    metadata: Dict = field(default_factory=dict)
    stack_trace: Optional[str] = None
    
    def to_dict(self) -> Dict:
        return {
            'timestamp': self.timestamp.isoformat(),
            'level': self.level.value,
            'source': self.source.value,
            'message': self.message,
            'service': self.service,
            'environment': self.environment,
            'instance_id': self.instance_id,
            'request_id': self.request_id,
            'user_id': self.user_id,
            'session_id': self.session_id,
            'trace_id': self.trace_id,
            'span_id': self.span_id,
            'metadata': self.metadata,
            'stack_trace': self.stack_trace
        }
    
    def to_elasticsearch_doc(self) -> Dict:
        """Convert to Elasticsearch document format."""
        doc = self.to_dict()
        doc['@timestamp'] = doc.pop('timestamp')
        doc['log_level'] = doc.pop('level')
        return doc

@dataclass
class AccessLogEntry:
    timestamp: datetime
    method: str
    path: str
    status_code: int
    response_time_ms: float
    request_size_bytes: int
    response_size_bytes: int
    client_ip: str
    user_agent: str
    referer: Optional[str] = None
    user_id: Optional[str] = None
    request_id: Optional[str] = None
    metadata: Dict = field(default_factory=dict)
    
    def to_elasticsearch_doc(self) -> Dict:
        return {
            '@timestamp': self.timestamp.isoformat(),
            'http_method': self.method,
            'http_path': self.path,
            'http_status_code': self.status_code,
            'response_time_ms': self.response_time_ms,
            'request_size_bytes': self.request_size_bytes,
            'response_size_bytes': self.response_size_bytes,
            'client_ip': self.client_ip,
            'user_agent': self.user_agent,
            'referer': self.referer,
            'user_id': self.user_id,
            'request_id': self.request_id,
            'metadata': self.metadata,
            'log_type': 'access'
        }

class LogSchemaManager:
    """Manage Elasticsearch index mappings for log data."""
    
    LOG_MAPPING = {
        "mappings": {
            "properties": {
                "@timestamp": {"type": "date"},
                "log_level": {
                    "type": "keyword"
                },
                "source": {
                    "type": "keyword"
                },
                "message": {
                    "type": "text",
                    "analyzer": "standard"
                },
                "service": {
                    "type": "keyword"
                },
                "environment": {
                    "type": "keyword"
                },
                "instance_id": {
                    "type": "keyword"
                },
                "request_id": {
                    "type": "keyword"
                },
                "user_id": {
                    "type": "keyword"
                },
                "session_id": {
                    "type": "keyword"
                },
                "trace_id": {
                    "type": "keyword"
                },
                "span_id": {
                    "type": "keyword"
                },
                "stack_trace": {
                    "type": "text",
                    "analyzer": "standard"
                },
                "metadata": {
                    "type": "object",
                    "dynamic": True
                },
                "host": {
                    "properties": {
                        "name": {"type": "keyword"},
                        "ip": {"type": "ip"},
                        "geo": {
                            "properties": {
                                "location": {"type": "geo_point"},
                                "country": {"type": "keyword"},
                                "city": {"type": "keyword"}
                            }
                        }
                    }
                }
            }
        },
        "settings": {
            "number_of_shards": 3,
            "number_of_replicas": 1,
            "refresh_interval": "5s",
            "index.lifecycle.name": "log-policy",
            "index.lifecycle.rollover_alias": "logs"
        }
    }
    
    ACCESS_LOG_MAPPING = {
        "mappings": {
            "properties": {
                "@timestamp": {"type": "date"},
                "http_method": {"type": "keyword"},
                "http_path": {
                    "type": "text",
                    "fields": {
                        "keyword": {"type": "keyword"}
                    }
                },
                "http_status_code": {"type": "integer"},
                "response_time_ms": {"type": "float"},
                "request_size_bytes": {"type": "integer"},
                "response_size_bytes": {"type": "integer"},
                "client_ip": {"type": "ip"},
                "user_agent": {
                    "type": "text",
                    "fields": {
                        "keyword": {"type": "keyword"}
                    }
                },
                "referer": {"type": "keyword"},
                "user_id": {"type": "keyword"},
                "request_id": {"type": "keyword"},
                "metadata": {"type": "object", "dynamic": True},
                "log_type": {"type": "keyword"}
            }
        },
        "settings": {
            "number_of_shards": 5,
            "number_of_replicas": 1,
            "refresh_interval": "1s",
            "index.lifecycle.name": "access-log-policy"
        }
    }
    
    @classmethod
    def get_ilm_policy(cls) -> Dict:
        """Get Index Lifecycle Management policy."""
        return {
            "policy": {
                "phases": {
                    "hot": {
                        "min_age": "0ms",
                        "actions": {
                            "rollover": {
                                "max_primary_shard_size": "30gb",
                                "max_age": "1d"
                            },
                            "set_priority": {
                                "priority": 100
                            }
                        }
                    },
                    "warm": {
                        "min_age": "7d",
                        "actions": {
                            "shrink": {
                                "number_of_shards": 1
                            },
                            "forcemerge": {
                                "max_num_segments": 1
                            },
                            "set_priority": {
                                "priority": 50
                            }
                        }
                    },
                    "cold": {
                        "min_age": "30d",
                        "actions": {
                            "set_priority": {
                                "priority": 0
                            }
                        }
                    },
                    "delete": {
                        "min_age": "90d",
                        "actions": {
                            "delete": {}
                        }
                    }
                }
            }
        }

Step-by-Step Implementation Guide

Step 1: CloudWatch Log Subscription

# cloudwatch/log_subscription.py
import boto3
import json
import re
from datetime import datetime
from typing import Dict, List, Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class CloudWatchLogSubscription:
    def __init__(self, region: str = 'us-east-1'):
        self.region = region
        self.cloudwatch = boto3.client('logs', region_name=region)
        self.kinesis = boto3.client('kinesis', region_name=region)
    
    def create_log_group(self, log_group_name: str, retention_days: int = 30):
        """Create CloudWatch log group with retention policy."""
        try:
            self.cloudwatch.create_log_group(
                logGroupName=log_group_name,
                tags={
                    'Environment': 'production',
                    'Project': 'log-analytics'
                }
            )
            
            # Set retention policy
            self.cloudwatch.put_retention_policy(
                logGroupName=log_group_name,
                retentionInDays=retention_days
            )
            
            logger.info(f"Created log group: {log_group_name}")
            
        except self.cloudwatch.exceptions.ResourceAlreadyExistsException:
            logger.info(f"Log group {log_group_name} already exists")
    
    def create_subscription_filter(self, log_group_name: str, 
                                  destination_arn: str,
                                  filter_pattern: str = ''):
        """Create subscription filter to forward logs to Kinesis."""
        filter_name = f"{log_group_name.replace('/', '-')}-subscription"
        
        try:
            self.cloudwatch.put_subscription_filter(
                logGroupName=log_group_name,
                filterName=filter_name,
                filterPattern=filter_pattern,
                destinationArn=destination_arn,
                distribution='ByLogStream'
            )
            
            logger.info(f"Created subscription filter: {filter_name}")
            
        except Exception as e:
            logger.error(f"Failed to create subscription filter: {e}")
            raise
    
    def setup_log_groups(self, config: Dict):
        """Set up all log groups and subscriptions."""
        log_groups = [
            '/app/errors',
            '/app/access',
            '/app/application',
            '/system/metrics',
            '/security/audit'
        ]
        
        for log_group in log_groups:
            full_path = f"{config['prefix']}{log_group}"
            self.create_log_group(full_path, config.get('retention_days', 30))
            self.create_subscription_filter(
                full_path,
                config['kinesis_stream_arn'],
                config.get('filter_patterns', {}).get(log_group, '')
            )
    
    def create_metric_filter(self, log_group_name: str, 
                            filter_pattern: str,
                            metric_namespace: str,
                            metric_name: str):
        """Create CloudWatch metric filter for monitoring."""
        self.cloudwatch.put_metric_filter(
            logGroupName=log_group_name,
            filterName=f"{metric_name}-filter",
            filterPattern=filter_pattern,
            metricTransformations=[
                {
                    'metricNamespace': metric_namespace,
                    'metricName': metric_name,
                    'metricValue': '1',
                    'defaultValue': 0
                }
            ]
        )
        
        logger.info(f"Created metric filter: {metric_name}")
    
    def create_log_anomaly_detector(self, log_group_name: str):
        """Create anomaly detection for log patterns."""
        self.cloudwatch.put_anomaly_detector(
            logGroupName=log_group_name,
            metricTransformations=[
                {
                    'metricNamespace': 'LogAnalytics/Anomalies',
                    'metricName': f'{log_group_name.split("/")[-1]}-anomaly',
                    'metricValue': '1'
                }
            ]
        )

Step 2: Lambda Log Processor

# lambda/log_processor.py
import json
import base64
import boto3
import re
from datetime import datetime
from typing import Dict, List, Optional
import gzip
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class LogProcessor:
    def __init__(self):
        self.es_client = None
        self.s3_client = boto3.client('s3')
        
        # Grok patterns for log parsing
        self.grok_patterns = {
            'access_log': r'(?<client_ip>\S+) \S+ \S+ \[(?<timestamp>[^\]]+)\] "(?<method>\S+) (?<path>\S+) (?<protocol>\S+)" (?<status_code>\d+) (?<response_size>\d+)',
            'error_log': r'(?<timestamp>\S+ \S+) \[(?<level>\S+)\] \[(?<module>\S+)\] (?<message>.+)',
            'application_log': r'(?<timestamp>\S+ \S+) (?<level>\S+) (?<service>\S+) \[(?<request_id>\S+)\] (?<message>.+)'
        }
        
        # Compile patterns
        self.compiled_patterns = {
            name: re.compile(pattern) 
            for name, pattern in self.grok_patterns.items()
        }
    
    def initialize_elasticsearch(self, endpoint: str, 
                                username: str, password: str):
        """Initialize Elasticsearch client."""
        from elasticsearch import Elasticsearch
        
        self.es_client = Elasticsearch(
            [endpoint],
            http_auth=(username, password),
            use_ssl=True,
            verify_certs=True
        )
        
        logger.info(f"Connected to Elasticsearch: {endpoint}")
    
    def parse_log_line(self, log_line: str, log_type: str) -> Optional[Dict]:
        """Parse a single log line using grok patterns."""
        pattern = self.compiled_patterns.get(log_type)
        if not pattern:
            return None
        
        match = pattern.match(log_line)
        if not match:
            return None
        
        parsed = match.groupdict()
        
        # Add metadata
        parsed['raw_message'] = log_line
        parsed['log_type'] = log_type
        parsed['parsed_at'] = datetime.utcnow().isoformat()
        
        return parsed
    
    def enrich_log_entry(self, parsed_log: Dict) -> Dict:
        """Enrich log entry with additional context."""
        enriched = parsed_log.copy()
        
        # Add geo information from IP
        if 'client_ip' in enriched:
            enriched['geo'] = self._get_geo_from_ip(enriched['client_ip'])
        
        # Add service name extraction
        if 'path' in enriched:
            enriched['service'] = self._extract_service_from_path(enriched['path'])
        
        # Add severity scoring
        if 'level' in enriched:
            enriched['severity_score'] = self._calculate_severity(enriched['level'])
        
        # Add timestamp parsing
        if 'timestamp' in enriched:
            enriched['parsed_timestamp'] = self._parse_timestamp(enriched['timestamp'])
        
        return enriched
    
    def _get_geo_from_ip(self, ip_address: str) -> Dict:
        """Get geographical information from IP address."""
        # Simplified geo lookup - in production use MaxMind GeoIP
        return {
            'country': 'Unknown',
            'city': 'Unknown',
            'location': {'lat': 0.0, 'lon': 0.0}
        }
    
    def _extract_service_from_path(self, path: str) -> str:
        """Extract service name from URL path."""
        parts = path.strip('/').split('/')
        if len(parts) > 0:
            return parts[0]
        return 'unknown'
    
    def _calculate_severity(self, level: str) -> int:
        """Calculate severity score for log level."""
        severity_map = {
            'DEBUG': 0,
            'INFO': 1,
            'WARN': 2,
            'ERROR': 3,
            'FATAL': 4
        }
        return severity_map.get(level.upper(), 0)
    
    def _parse_timestamp(self, timestamp_str: str) -> str:
        """Parse various timestamp formats."""
        formats = [
            '%Y-%m-%d %H:%M:%S',
            '%Y-%m-%dT%H:%M:%S.%fZ',
            '%d/%b/%Y:%H:%M:%S %z'
        ]
        
        for fmt in formats:
            try:
                dt = datetime.strptime(timestamp_str, fmt)
                return dt.isoformat()
            except ValueError:
                continue
        
        return timestamp_str
    
    def batch_index_to_elasticsearch(self, logs: List[Dict], 
                                    index_name: str):
        """Batch index logs to Elasticsearch."""
        if not self.es_client or not logs:
            return
        
        # Prepare bulk request
        actions = []
        for log in logs:
            action = {
                "index": {
                    "_index": index_name,
                    "_type": "_doc"
                }
            }
            actions.append(action)
            actions.append(log)
        
        # Execute bulk request
        try:
            response = self.es_client.bulk(operations=actions)
            
            if response['errors']:
                error_count = sum(1 for item in response['items'] 
                                if item['index']['status'] >= 400)
                logger.warning(f"Bulk indexing had {error_count} errors")
            else:
                logger.info(f"Successfully indexed {len(logs)} documents")
                
        except Exception as e:
            logger.error(f"Bulk indexing failed: {e}")
            raise
    
    def process_kinesis_records(self, event: Dict) -> Dict:
        """Main Lambda handler for processing Kinesis records."""
        processed_count = 0
        error_count = 0
        batch_logs = []
        
        for record in event['Records']:
            try:
                # Decode Kinesis record
                payload = base64.b64decode(record['kinesis']['data'])
                
                # Handle compressed data
                if record['kinesis'].get('compression') == 'gzip':
                    payload = gzip.decompress(payload)
                
                log_data = json.loads(payload)
                
                # Parse log line
                log_line = log_data.get('message', '')
                log_type = log_data.get('log_type', 'application_log')
                
                parsed = self.parse_log_line(log_line, log_type)
                if parsed:
                    # Enrich the log
                    enriched = self.enrich_log_entry(parsed)
                    
                    # Add source metadata
                    enriched['source'] = {
                        'log_group': log_data.get('logGroup', ''),
                        'log_stream': log_data.get('logStream', ''),
                        'account_id': log_data.get('owner', '')
                    }
                    
                    batch_logs.append(enriched)
                    processed_count += 1
                    
            except Exception as e:
                error_count += 1
                logger.error(f"Error processing record: {e}")
        
        # Batch index to Elasticsearch
        if batch_logs:
            index_name = f"logs-{datetime.utcnow().strftime('%Y.%m.%d')}"
            self.batch_index_to_elasticsearch(batch_logs, index_name)
        
        return {
            'statusCode': 200,
            'body': {
                'processed': processed_count,
                'errors': error_count
            }
        }

Step 3: Elasticsearch Cluster Setup

# elasticsearch/cluster_setup.py
from elasticsearch import Elasticsearch
from elasticsearch.client import IndicesClient, ClusterClient
import json
from typing import Dict, List
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ElasticsearchClusterManager:
    def __init__(self, endpoints: List[str], 
                 username: str, password: str):
        self.es = Elasticsearch(
            endpoints,
            http_auth=(username, password),
            use_ssl=True,
            verify_certs=True,
            connection_class='RequestsHttpConnection'
        )
        
        self.indices = IndicesClient(self.es)
        self.cluster = ClusterClient(self.es)
    
    def create_log_indices(self):
        """Create index templates for log data."""
        # Application logs template
        app_logs_template = {
            "index_patterns": ["logs-*"],
            "template": {
                "settings": {
                    "number_of_shards": 3,
                    "number_of_replicas": 1,
                    "refresh_interval": "5s",
                    "index.lifecycle.name": "log-policy",
                    "index.lifecycle.rollover_alias": "logs"
                },
                "mappings": {
                    "properties": {
                        "@timestamp": {"type": "date"},
                        "log_level": {"type": "keyword"},
                        "source": {"type": "keyword"},
                        "message": {"type": "text"},
                        "service": {"type": "keyword"},
                        "environment": {"type": "keyword"},
                        "host": {"type": "keyword"},
                        "trace_id": {"type": "keyword"},
                        "span_id": {"type": "keyword"},
                        "metadata": {"type": "object", "dynamic": True}
                    }
                }
            },
            "priority": 100
        }
        
        self.indices.put_index_template(
            name="logs-template",
            body=app_logs_template
        )
        
        logger.info("Created logs index template")
        
        # Access logs template
        access_logs_template = {
            "index_patterns": ["access-logs-*"],
            "template": {
                "settings": {
                    "number_of_shards": 5,
                    "number_of_replicas": 1,
                    "refresh_interval": "1s"
                },
                "mappings": {
                    "properties": {
                        "@timestamp": {"type": "date"},
                        "client_ip": {"type": "ip"},
                        "http_method": {"type": "keyword"},
                        "http_path": {"type": "text"},
                        "http_status_code": {"type": "integer"},
                        "response_time_ms": {"type": "float"},
                        "user_agent": {"type": "text"}
                    }
                }
            },
            "priority": 200
        }
        
        self.indices.put_index_template(
            name="access-logs-template",
            body=access_logs_template
        )
    
    def setup_ilm_policy(self):
        """Set up Index Lifecycle Management policy."""
        ilm_policy = {
            "policy": {
                "phases": {
                    "hot": {
                        "min_age": "0ms",
                        "actions": {
                            "rollover": {
                                "max_primary_shard_size": "30gb",
                                "max_age": "1d"
                            },
                            "set_priority": {"priority": 100}
                        }
                    },
                    "warm": {
                        "min_age": "7d",
                        "actions": {
                            "shrink": {"number_of_shards": 1},
                            "forcemerge": {"max_num_segments": 1},
                            "set_priority": {"priority": 50}
                        }
                    },
                    "cold": {
                        "min_age": "30d",
                        "actions": {
                            "set_priority": {"priority": 0}
                        }
                    },
                    "delete": {
                        "min_age": "90d",
                        "actions": {
                            "delete": {}
                        }
                    }
                }
            }
        }
        
        self.es.ilm.put_lifecycle_policy(
            name="log-policy",
            body=ilm_policy
        )
        
        logger.info("Created ILM policy: log-policy")
    
    def create_dashboards_and_visualizations(self):
        """Create Kibana saved objects."""
        # This would typically be done via Kibana API or saved objects export
        visualizations = [
            {
                "name": "Log Volume Over Time",
                "type": "line",
                "index": "logs-*",
                "aggs": {
                    "time_buckets": {
                        "date_histogram": {
                            "field": "@timestamp",
                            "fixed_interval": "5m"
                        }
                    }
                }
            },
            {
                "name": "Error Rate by Service",
                "type": "pie",
                "index": "logs-*",
                "filters": {
                    "log_level": "ERROR"
                },
                "aggs": {
                    "by_service": {
                        "terms": {
                            "field": "service",
                            "size": 10
                        }
                    }
                }
            },
            {
                "name": "Response Time Distribution",
                "type": "histogram",
                "index": "access-logs-*",
                "aggs": {
                    "response_time": {
                        "histogram": {
                            "field": "response_time_ms",
                            "interval": 100
                        }
                    }
                }
            }
        ]
        
        return visualizations
    
    def setup_alerting_rules(self):
        """Set up Elasticsearch alerting rules."""
        alert_rules = [
            {
                "name": "High Error Rate",
                "trigger": {
                    "condition": {
                        "script": {
                            "source": "ctx.payload.hits.total > 100"
                        }
                    },
                    "schedule": {
                        "interval": "5m"
                    }
                },
                "actions": [
                    {
                        "name": "Slack Notification",
                        "throttle_period": "15m",
                        "webhook": {
                            "scheme": "https",
                            "host": "hooks.slack.com",
                            "port": 443,
                            "path": "/services/xxx",
                            "body": '{"text": "High error rate detected: {{ctx.payload.hits.total}} errors"}'
                        }
                    }
                ]
            },
            {
                "name": "Slow Response Times",
                "trigger": {
                    "condition": {
                        "script": {
                            "source": "ctx.payload.aggregations.avg_response.value > 1000"
                        }
                    },
                    "schedule": {
                        "interval": "10m"
                    }
                },
                "actions": [
                    {
                        "name": "Email Alert",
                        "email": {
                            "to": ["ops@company.com"],
                            "subject": "Slow Response Times Detected",
                            "body": "Average response time: {{ctx.payload.aggregations.avg_response.value}}ms"
                        }
                    }
                ]
            }
        ]
        
        return alert_rules
    
    def create_saved_queries(self):
        """Create saved queries for common log analysis."""
        saved_queries = [
            {
                "name": "Errors in Last Hour",
                "query": {
                    "bool": {
                        "filter": [
                            {"range": {"@timestamp": {"gte": "now-1h"}}},
                            {"term": {"log_level": "ERROR"}}
                        ]
                    }
                }
            },
            {
                "name": "Slow API Calls",
                "query": {
                    "bool": {
                        "filter": [
                            {"range": {"response_time_ms": {"gte": 1000}}},
                            {"term": {"log_type": "access"}}
                        ]
                    }
                }
            },
            {
                "name": "Failed Login Attempts",
                "query": {
                    "bool": {
                        "filter": [
                            {"match_phrase": {"message": "failed login"}},
                            {"range": {"@timestamp": {"gte": "now-24h"}}}
                        ]
                    }
                }
            }
        ]
        
        return saved_queries
    
    def health_check(self) -> Dict:
        """Perform comprehensive cluster health check."""
        health = self.cluster.health()
        
        indices_stats = self.indices.stats(index="logs-*")
        
        return {
            'cluster_status': health['status'],
            'node_count': health['number_of_nodes'],
            'active_shards': health['active_primary_shards'],
            'relocating_shards': health['relocating_shards'],
            'pending_tasks': health['number_of_pending_tasks'],
            'indices_count': len(indices_stats['indices']),
            'total_docs': sum(
                stats['primaries']['docs']['count']
                for stats in indices_stats['indices'].values()
            ),
            'total_size_bytes': sum(
                stats['primaries']['store']['size_in_bytes']
                for stats in indices_stats['indices'].values()
            )
        }

Infrastructure Setup (Terraform)

# infrastructure/log_analytics.tf
terraform {
  required_version = ">= 1.5.0"
  
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
    elastic = {
      source  = "elastic/ec"
      version = "~> 0.5.0"
    }
  }
}

# Variables
variable "environment" {
  description = "Environment name"
  type        = string
  default     = "production"
}

variable "project_name" {
  description = "Project name"
  type        = string
  default     = "log-analytics"
}

# VPC for Elasticsearch
module "vpc" {
  source  = "terraform-aws-modules/vpc/aws"
  version = "5.0.0"

  name = "${var.project_name}-vpc"
  cidr = "10.0.0.0/16"

  azs             = ["us-east-1a", "us-east-1b", "us-east-1c"]
  private_subnets = ["10.0.1.0/24", "10.0.2.0/24", "10.0.3.0/24"]
  public_subnets  = ["10.0.101.0/24", "10.0.102.0/24", "10.0.103.0/24"]

  enable_nat_gateway   = true
  single_nat_gateway   = var.environment != "production"
  enable_dns_hostnames = true
  enable_dns_support   = true

  tags = {
    Project = var.project_name
  }
}

# Elasticsearch Domain
resource "aws_elasticsearch_domain" "logs" {
  domain_name           = "${var.project_name}-logs"
  elasticsearch_version = "7.17"
  
  cluster_config {
    instance_type            = "m6g.large.elasticsearch"
    instance_count           = var.environment == "production" ? 6 : 3
    dedicated_master_enabled = var.environment == "production"
    master_instance_type     = "m6g.large.elasticsearch"
    master_instance_count    = 3
    zone_awareness_enabled   = true
  }
  
  ebs_options {
    ebs_enabled = true
    volume_type = "gp3"
    volume_size = 100
  }
  
  vpc_options {
    subnet_ids         = module.vpc.private_subnets
    security_group_ids = [aws_security_group.elasticsearch.id]
  }
  
  encrypt_at_rest {
    enabled = true
  }
  
  node_to_node_encryption {
    enabled = true
  }
  
  domain_endpoint_options {
    enforce_https       = true
    tls_security_policy = "Policy-Min-TLS-1-2-2019-07"
  }
  
  access_policies = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          AWS = aws_iam_role.elasticsearch.arn
        }
        Action = "es:*"
        Resource = "${aws_elasticsearch_domain.logs.arn}/*"
      }
    ]
  })
  
  tags = {
    Environment = var.environment
    Project     = var.project_name
  }
}

# Security Group for Elasticsearch
resource "aws_security_group" "elasticsearch" {
  name_prefix = "${var.project_name}-es-"
  vpc_id      = module.vpc.vpc_id
  
  ingress {
    from_port       = 443
    to_port         = 443
    protocol        = "tcp"
    cidr_blocks     = [module.vpc.vpc_cidr_block]
    description     = "HTTPS"
  }
  
  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
  
  tags = {
    Name = "${var.project_name}-es-sg"
  }
}

# IAM Role for Elasticsearch
resource "aws_iam_role" "elasticsearch" {
  name = "${var.project_name}-elasticsearch-role"
  
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "es.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy" "elasticsearch" {
  name = "${var.project_name}-elasticsearch-policy"
  role = aws_iam_role.elasticsearch.id
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "es:ESHttp*"
        ]
        Resource = "${aws_elasticsearch_domain.logs.arn}/*"
      }
    ]
  })
}

# Kinesis Data Streams for log ingestion
resource "aws_kinesis_stream" "log_stream" {
  name             = "${var.project_name}-log-stream"
  shard_count      = var.environment == "production" ? 10 : 2
  retention_period = 48
  
  stream_mode_details {
    stream_mode = "PROVISIONED"
  }
  
  tags = {
    Environment = var.environment
    Project     = var.project_name
  }
}

# Lambda function for log processing
resource "aws_lambda_function" "log_processor" {
  function_name = "${var.project_name}-processor"
  role          = aws_iam_role.lambda_processor.arn
  
  handler = "log_processor.process_kinesis_records"
  runtime = "python3.9"
  
  filename         = data.archive_file.lambda_processor.output_path
  source_code_hash = data.archive_file.lambda_processor.output_base64sha256
  
  memory_size = 1024
  timeout     = 300
  
  vpc_config {
    subnet_ids         = module.vpc.private_subnets
    security_group_ids = [aws_security_group.lambda.id]
  }
  
  environment {
    variables = {
      ELASTICSEARCH_ENDPOINT = aws_elasticsearch_domain.logs.endpoint
      ELASTICSEARCH_USERNAME = "master"
      ELASTICSEARCH_PASSWORD = var.elasticsearch_password
    }
  }
  
  tags = {
    Environment = var.environment
    Project     = var.project_name
  }
}

# Lambda event source mapping
resource "aws_lambda_event_source_mapping" "kinesis_trigger" {
  event_source_arn = aws_kinesis_stream.log_stream.arn
  function_name    = aws_lambda_function.log_processor.arn
  
  starting_position = "LATEST"
  batch_size        = 100
  
  maximum_batching_window_in_seconds = 5
  
  enabled = true
}

# IAM Role for Lambda
resource "aws_iam_role" "lambda_processor" {
  name = "${var.project_name}-lambda-role"
  
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy" "lambda_processor" {
  name = "${var.project_name}-lambda-policy"
  role = aws_iam_role.lambda_processor.id
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "kinesis:GetRecords",
          "kinesis:GetShardIterator",
          "kinesis:DescribeStream",
          "kinesis:ListStreams"
        ]
        Resource = aws_kinesis_stream.log_stream.arn
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "arn:aws:logs:*:*:*"
      },
      {
        Effect = "Allow"
        Action = [
          "ec2:CreateNetworkInterface",
          "ec2:DescribeNetworkInterfaces",
          "ec2:DeleteNetworkInterface"
        ]
        Resource = "*"
      }
    ]
  })
}

# CloudWatch Log Groups
resource "aws_cloudwatch_log_group" "app_errors" {
  name              = "/app/errors"
  retention_in_days = 30
  
  tags = {
    Environment = var.environment
    Project     = var.project_name
  }
}

resource "aws_cloudwatch_log_group" "app_access" {
  name              = "/app/access"
  retention_in_days = 30
  
  tags = {
    Environment = var.environment
    Project     = var.project_name
  }
}

# Subscription filters
resource "aws_cloudwatch_log_subscription_filter" "errors_to_kinesis" {
  name            = "errors-to-kinesis"
  log_group_name  = aws_cloudwatch_log_group.app_errors.name
  filter_pattern  = ""
  destination_arn = aws_kinesis_stream.log_stream.arn
}

resource "aws_cloudwatch_log_subscription_filter" "access_to_kinesis" {
  name            = "access-to-kinesis"
  log_group_name  = aws_cloudwatch_log_group.app_access.name
  filter_pattern  = ""
  destination_arn = aws_kinesis_stream.log_stream.arn
}

# S3 bucket for long-term log storage
resource "aws_s3_bucket" "log_archive" {
  bucket = "${var.project_name}-log-archive-${var.environment}"
  
  tags = {
    Environment = var.environment
    Project     = var.project_name
  }
}

resource "aws_s3_bucket_lifecycle_configuration" "log_archive" {
  bucket = aws_s3_bucket.log_archive.id
  
  rule {
    id     = "transition-to-glacier"
    status = "Enabled"
    
    transition {
      days          = 30
      storage_class = "STANDARD_IA"
    }
    
    transition {
      days          = 90
      storage_class = "GLACIER"
    }
    
    expiration {
      days = 365
    }
  }
}

# Outputs
output "elasticsearch_endpoint" {
  description = "Elasticsearch endpoint"
  value       = aws_elasticsearch_domain.logs.endpoint
}

output "kinesis_stream_arn" {
  description = "Kinesis stream ARN"
  value       = aws_kinesis_stream.log_stream.arn
}

output "lambda_function_name" {
  description = "Lambda function name"
  value       = aws_lambda_function.log_processor.function_name
}

Testing and Validation

# tests/test_log_analytics.py
import pytest
import json
import time
from datetime import datetime, timedelta
from unittest.mock import Mock, patch, MagicMock

class TestLogAnalytics:
    @pytest.fixture
    def sample_logs(self):
        return [
            {
                "timestamp": "2024-01-15 10:30:45",
                "level": "ERROR",
                "service": "user-service",
                "message": "Failed to connect to database",
                "request_id": "req-123",
                "stack_trace": "java.sql.SQLException: Connection refused"
            },
            {
                "timestamp": "2024-01-15 10:30:46",
                "level": "INFO",
                "service": "api-gateway",
                "message": "GET /api/users 200 45ms",
                "request_id": "req-124",
                "http_method": "GET",
                "http_path": "/api/users",
                "response_time_ms": 45
            }
        ]
    
    @pytest.fixture
    def log_processor(self):
        from lambda.log_processor import LogProcessor
        return LogProcessor()
    
    def test_log_parsing(self, log_processor):
        """Test log line parsing."""
        test_lines = {
            'access_log': '192.168.1.1 - - [15/Jan/2024:10:30:45 +0000] "GET /api/users HTTP/1.1" 200 1234',
            'error_log': '2024-01-15 10:30:45 [ERROR] [database] Connection failed',
            'application_log': '2024-01-15 10:30:45 INFO user-service [req-123] User created successfully'
        }
        
        for log_type, log_line in test_lines.items():
            parsed = log_processor.parse_log_line(log_line, log_type)
            
            assert parsed is not None
            assert 'timestamp' in parsed
            assert 'message' in parsed
    
    def test_log_enrichment(self, log_processor):
        """Test log entry enrichment."""
        parsed_log = {
            'timestamp': '2024-01-15 10:30:45',
            'level': 'ERROR',
            'message': 'Connection failed',
            'client_ip': '192.168.1.1',
            'path': '/api/users'
        }
        
        enriched = log_processor.enrich_log_entry(parsed_log)
        
        assert 'geo' in enriched
        assert 'service' in enriched
        assert 'severity_score' in enriched
        assert enriched['severity_score'] == 3  # ERROR level
    
    def test_batch_indexing(self, log_processor):
        """Test batch indexing to Elasticsearch."""
        mock_es = Mock()
        log_processor.es_client = mock_es
        
        mock_es.bulk.return_value = {'errors': False, 'items': []}
        
        logs = [
            {'@timestamp': datetime.now().isoformat(), 'message': 'Test log 1'},
            {'@timestamp': datetime.now().isoformat(), 'message': 'Test log 2'}
        ]
        
        log_processor.batch_index_to_elasticsearch(logs, 'test-logs')
        
        mock_es.bulk.assert_called_once()
    
    def test_kinesis_processing(self, log_processor):
        """Test Kinesis record processing."""
        import base64
        
        # Create mock Kinesis event
        log_data = json.dumps({
            'message': '2024-01-15 10:30:45 ERROR [app] Test error message',
            'logGroup': '/app/errors',
            'logStream': 'app-stream',
            'owner': '123456789'
        })
        
        event = {
            'Records': [
                {
                    'kinesis': {
                        'data': base64.b64encode(log_data.encode()).decode(),
                        'sequenceNumber': '12345',
                        'partitionKey': 'test'
                    }
                }
            ]
        }
        
        mock_es = Mock()
        log_processor.es_client = mock_es
        mock_es.bulk.return_value = {'errors': False, 'items': []}
        
        result = log_processor.process_kinesis_records(event)
        
        assert result['statusCode'] == 200
        assert result['body']['processed'] == 1
    
    def test_cloudwatch_metric_filter(self):
        """Test CloudWatch metric filter creation."""
        import boto3
        from moto import mock_logs
        
        @mock_logs
        def test_metric_filter():
            client = boto3.client('logs', region_name='us-east-1')
            
            # Create log group
            client.create_log_group(logGroupName='/test/app')
            
            # Create metric filter
            client.put_metric_filter(
                logGroupName='/test/app',
                filterName='error-count',
                filterPattern='[level=ERROR, message]',
                metricTransformations=[{
                    'metricNamespace': 'LogAnalytics',
                    'metricName': 'ErrorCount',
                    'metricValue': '1'
                }]
            )
            
            # Verify filter was created
            filters = client.describe_metric_filters(
                logGroupNamePrefix='/test/app'
            )
            
            assert len(filters['metricFilters']) == 1
        
        test_metric_filter()
    
    def test_elasticsearch_index_lifecycle(self):
        """Test ILM policy configuration."""
        ilm_policy = {
            "policy": {
                "phases": {
                    "hot": {"min_age": "0ms"},
                    "warm": {"min_age": "7d"},
                    "cold": {"min_age": "30d"},
                    "delete": {"min_age": "90d"}
                }
            }
        }
        
        # Validate policy structure
        assert 'policy' in ilm_policy
        assert 'phases' in ilm_policy['policy']
        assert len(ilm_policy['policy']['phases']) == 4
        
        # Validate phase order
        phases = list(ilm_policy['policy']['phases'].keys())
        assert phases == ['hot', 'warm', 'cold', 'delete']
    
    def test_alerting_rule_configuration(self):
        """Test alerting rule configuration."""
        alert_rule = {
            "name": "High Error Rate",
            "condition": {
                "threshold": 100,
                "time_window": "5m"
            },
            "actions": [
                {"type": "slack", "channel": "#alerts"},
                {"type": "email", "recipients": ["ops@company.com"]}
            ]
        }
        
        # Validate rule structure
        assert 'name' in alert_rule
        assert 'condition' in alert_rule
        assert 'actions' in alert_rule
        assert len(alert_rule['actions']) == 2
    
    def test_log_search_query(self):
        """Test Elasticsearch search query construction."""
        search_query = {
            "query": {
                "bool": {
                    "must": [
                        {"match_phrase": {"service": "user-service"}},
                        {"range": {"@timestamp": {"gte": "now-1h"}}}
                    ],
                    "filter": [
                        {"term": {"log_level": "ERROR"}}
                    ]
                }
            },
            "aggs": {
                "errors_over_time": {
                    "date_histogram": {
                        "field": "@timestamp",
                        "fixed_interval": "5m"
                    }
                }
            }
        }
        
        # Validate query structure
        assert 'query' in search_query
        assert 'bool' in search_query['query']
        assert 'must' in search_query['query']['bool']
        assert 'filter' in search_query['query']['bool']
        assert 'aggs' in search_query
    
    def test_log_aggregation_pipeline(self):
        """Test log aggregation pipeline configuration."""
        aggregation_pipeline = [
            {"$match": {"log_level": "ERROR"}},
            {"$group": {
                "_id": "$service",
                "error_count": {"$sum": 1},
                "first_error": {"$min": "$@timestamp"},
                "last_error": {"$max": "$@timestamp"}
            }},
            {"$sort": {"error_count": -1}},
            {"$limit": 10}
        ]
        
        # Validate pipeline stages
        stages = [list(stage.keys())[0] for stage in aggregation_pipeline]
        assert stages == ['$match', '$group', '$sort', '$limit']

Cost Analysis

Monthly Cost Breakdown (Production)

ComponentSpecificationMonthly Cost
Elasticsearch6x m6g.large$1,200
Kinesis10 shards$300
Lambda1M invocations$50
CloudWatch100GB logs$50
S3 Storage5TB archive$115
Data TransferCross-AZ$100
Total$1,815

Cost Optimization Strategies

💡

Tip: Optimize log analytics costs with these strategies:

  1. Log Sampling: Sample verbose logs at 10%
  2. Compression: Use gzip for 90% size reduction
  3. Index Lifecycle: Auto-delete old indices
  4. Hot-Warm-Cold: Tiered storage architecture
  5. Reserved Instances: 40% savings on Elasticsearch

Performance Metrics

MetricBeforeAfterImprovement
Search Latency30 seconds200ms150x faster
Ingestion Rate10K/min1M/min100x faster
Storage Cost500/TB500/TB |100/TB80% cheaper
Alert Latency15 minutes1 minute15x faster

Interview Talking Points

Architecture Decisions

ℹ️

Best Practice: Highlight these log analytics concepts in interviews:

  1. Why Elasticsearch over CloudWatch Insights?

    • Better full-text search capabilities
    • More flexible query language
    • Custom visualizations and dashboards
    • Cross-index correlation
  2. Why Kinesis over direct CloudWatch subscription?

    • Better buffering and replay capability
    • More flexible processing with Lambda
    • Cost-efficient for high volume
    • Custom partitioning strategies
  3. Why hot-warm-cold architecture?

    • Cost optimization for tiered storage
    • Performance optimization for recent data
    • Compliance requirements for retention
    • Automatic data lifecycle management

Common Interview Questions

Q: "How do you handle log rotation and retention?"

retention_strategy = {
    "Hot": "7 days, full replicas, SSD storage",
    "Warm": "30 days, reduced replicas, forced merge",
    "Cold": "90 days, single replica, minimal resources",
    "Delete": "90+ days, auto-delete via ILM"
}

Q: "How do you ensure log data quality?"

data_quality_checks = {
    "Schema Validation": "Verify log format matches expected pattern",
    "Completeness": "Check for missing required fields",
    "Freshness": "Monitor ingestion lag and alert on delays",
    "Deduplication": "Remove duplicate logs using request_id"
}

Q: "How do you scale for peak loads?"

scaling_strategies = {
    "Kinesis": "Auto-scaling shards based on throughput",
    "Lambda": "Concurrent execution limits",
    "Elasticsearch": "Auto-scaling with warm nodes",
    "Buffering": "S3 dead-letter queue for overflow"
}

Deployment Checklist

  • Set up VPC and security groups
  • Deploy Elasticsearch cluster
  • Configure Kinesis streams
  • Deploy Lambda processor
  • Set up CloudWatch log groups
  • Configure subscription filters
  • Create index templates and ILM policies
  • Set up Kibana dashboards
  • Configure alerting rules
  • Test end-to-end log flow

⚠️

Warning: Always test log parsing patterns thoroughly before production. Incorrect parsing can lead to data loss or incorrect analytics.


This project demonstrates log analytics platform skills and is highly relevant for DevOps and data engineering interviews at companies with large-scale logging requirements.

Advertisement