πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Enterprise Sentiment Analysis System (BERT + Spark NLP)

AI/ML ProjectsNLP & Sentiment⭐ Premium

Advertisement

Enterprise Sentiment Analysis System

BERT + Spark NLP | Scalable Real-Time Analysis

Advanced14+ HoursProduction-Ready

Project Overview

Problem Statement

Enterprises receive millions of customer interactions daily across channels. Manual sentiment analysis cannot scale. This system uses fine-tuned BERT models with Spark NLP for distributed processing to analyze sentiment across emails, reviews, social media, and support tickets in real-time.

Objectives

  • Fine-tune BERT for multi-class sentiment (positive, negative, neutral, mixed)
  • Build scalable processing pipeline with Spark NLP
  • Implement aspect-based sentiment analysis
  • Deploy with real-time and batch inference modes
  • Create dashboards for sentiment trend monitoring
ComponentTechnology
Sentiment ModelBERT / RoBERTa (fine-tuned)
Scalable ProcessingSpark NLP
API FrameworkFastAPI + Triton Inference Server
Stream ProcessingApache Kafka + Flink
Data StorageElasticsearch + PostgreSQL
DashboardGrafana + Kibana
MonitoringPrometheus + ELK Stack

Architecture Diagram

Architecture Diagram
+-------------------------------------------------------------------+
|           Enterprise Sentiment Analysis Architecture              |
+-------------------------------------------------------------------+
|  +--------------+    +--------------+    +------------------+     |
|  | Data Sources  |--->| Kafka        |--->| Spark NLP        |     |
|  | (Multi-chan)  |    | Stream       |    | Pre-processing   |     |
|  +--------------+    +--------------+    +--------+---------+     |
|                                                  |               |
|                                                  v               |
|  +--------------+    +--------------+    +------------------+     |
|  |  Sentiment   |<---|  Triton      |<---|  BERT Model      |     |
|  |  Results     |    |  Inference   |    |  (Fine-tuned)    |     |
+--------------+    +--------------+    +------------------+     |
|        |                                                   |     |
|        v                                                   v     |
|  +--------------+    +--------------+    +------------------+     |
|  |  Real-time   |    |  Aspect      |    |  Trend           |     |
|  |  Dashboard   |    |  Analysis    |    |  Monitoring      |     |
|  +--------------+    +--------------+    +------------------+     |
+-------------------------------------------------------------------+

Step-by-Step Implementation

Step 1: Environment Setup

mkdir sentiment-system && cd sentiment-system
pip install transformers datasets torch spark-nlp
pip install fastapi uvicorn kafka-python elasticsearch
pip install scikit-learn mlflow prometheus-client
pip install pyspark

Step 2: BERT Fine-Tuning for Sentiment

Fine-tune BERT for multi-class sentiment classification on enterprise data.

# src/models/train_sentiment.py
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import (
    BertTokenizer, BertForSequenceClassification,
    AdamW, get_linear_schedule_with_warmup
)
from sklearn.metrics import classification_report, confusion_matrix
from typing import List, Dict, Tuple
import mlflow
import numpy as np


class SentimentDataset(Dataset):
    LABEL_MAP = {"negative": 0, "neutral": 1, "positive": 2, "mixed": 3}

    def __init__(self, texts: List[str], labels: List[str], tokenizer, max_len: int = 128):
        self.texts = texts
        self.labels = [self.LABEL_MAP[l] for l in labels]
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        encoding = self.tokenizer(
            self.texts[idx],
            max_length=self.max_len,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )
        return {
            "input_ids": encoding["input_ids"].squeeze(),
            "attention_mask": encoding["attention_mask"].squeeze(),
            "labels": torch.tensor(self.labels[idx], dtype=torch.long),
        }


class SentimentTrainer:
    def __init__(self, model_name: str = "bert-base-uncased", num_labels: int = 4):
        self.tokenizer = BertTokenizer.from_pretrained(model_name)
        self.model = BertForSequenceClassification.from_pretrained(
            model_name, num_labels=num_labels
        )
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(self.device)

    def train(
        self, train_texts, train_labels, eval_texts, eval_labels,
        epochs: int = 5, batch_size: int = 32, learning_rate: float = 2e-5
    ):
        train_dataset = SentimentDataset(train_texts, train_labels, self.tokenizer)
        eval_dataset = SentimentDataset(eval_texts, eval_labels, self.tokenizer)
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        eval_loader = DataLoader(eval_dataset, batch_size=batch_size)

        optimizer = AdamW(self.model.parameters(), lr=learning_rate)
        total_steps = len(train_loader) * epochs
        scheduler = get_linear_schedule_with_warmup(
            optimizer, num_warmup_steps=total_steps // 10, num_training_steps=total_steps
        )

        mlflow.set_experiment("sentiment-analysis")
        with mlflow.start_run():
            mlflow.log_params({"epochs": epochs, "lr": learning_rate, "batch_size": batch_size})

            for epoch in range(epochs):
                self.model.train()
                total_loss = 0
                for batch in train_loader:
                    optimizer.zero_grad()
                    outputs = self.model(
                        input_ids=batch["input_ids"].to(self.device),
                        attention_mask=batch["attention_mask"].to(self.device),
                        labels=batch["labels"].to(self.device),
                    )
                    loss = outputs.loss
                    loss.backward()
                    torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
                    optimizer.step()
                    scheduler.step()
                    total_loss += loss.item()

                avg_loss = total_loss / len(train_loader)
                eval_metrics = self.evaluate(eval_loader)
                mlflow.log_metrics({
                    "train_loss": avg_loss,
                    "eval_accuracy": eval_metrics["accuracy"],
                }, step=epoch)
                print(f"Epoch {epoch+1}: loss={avg_loss:.4f}, acc={eval_metrics['accuracy"]:.4f}")

            mlflow.pytorch.log_model(self.model, "sentiment-model")

    def evaluate(self, eval_loader) -> Dict:
        self.model.eval()
        all_preds, all_labels = [], []
        with torch.no_grad():
            for batch in eval_loader:
                outputs = self.model(
                    input_ids=batch["input_ids"].to(self.device),
                    attention_mask=batch["attention_mask"].to(self.device),
                )
                preds = torch.argmax(outputs.logits, dim=-1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(batch["labels"].numpy())

        accuracy = np.mean(np.array(all_preds) == np.array(all_labels))
        return {"accuracy": accuracy, "report": classification_report(all_labels, all_preds)}

Step 3: Spark NLP Preprocessing Pipeline

Build scalable text preprocessing with Spark NLP for batch analysis of millions of documents.

# src/processing/spark_pipeline.py
import sparknlp
from sparknlp.base import DocumentAssembler, Finisher
from sparknlp.annotator import (
    Tokenizer, Normalizer, StopWordsCleaner, LemmatizerModel,
    SentenceDetectorDLModel
)
from pyspark.ml import Pipeline
import pyspark.sql.functions as F


class SparkSentimentPipeline:
    def __init__(self):
        self.spark = sparknlp.start()
        self.pipeline = self._build_pipeline()

    def _build_pipeline(self):
        document_assembler = DocumentAssembler() \
            .setInputCol("text") \
            .setOutputCol("document")

        sentence_detector = SentenceDetectorDLModel.pretrained() \
            .setInputCols(["document"]) \
            .setOutputCol("sentences")

        tokenizer = Tokenizer() \
            .setInputCols(["sentences"]) \
            .setOutputCol("tokens")

        normalizer = Normalizer() \
            .setInputCols(["tokens"]) \
            .setOutputCol("normalized") \
            .setLowercase(True)

        stopwords_cleaner = StopWordsCleaner() \
            .setInputCols(["normalized"]) \
            .setOutputCol("clean_tokens")

        lemmatizer = LemmatizerModel.pretrained("lemma_antbnc") \
            .setInputCols(["clean_tokens"]) \
            .setOutputCol("lemmatized")

        finisher = Finisher() \
            .setInputCols(["lemmatized"]) \
            .setOutputCols(["processed_text"]) \
            .setCleanAnnotations(True)

        return Pipeline(stages=[
            document_assembler, sentence_detector, tokenizer,
            normalizer, stopwords_cleaner, lemmatizer, finisher
        ])

    def process_batch(self, df):
        model = self.pipeline.fit(df)
        return model.transform(df)

    def analyze_large_dataset(self, input_path: str, output_path: str, chunk_size: int = 100000):
        df = self.spark.read.parquet(input_path)
        total = df.count()
        print(f"Processing {total} documents...")
        processed = self.process_batch(df)
        processed.write.mode("overwrite").parquet(output_path)
        print(f"Results saved to {output_path}")

Step 4: Aspect-Based Sentiment Analysis

Extract sentiment toward specific aspects (e.g., "battery life", "customer service").

# src/models/aspect_sentiment.py
from transformers import pipeline as hf_pipeline
from typing import List, Dict, Tuple
import re


class AspectSentimentAnalyzer:
    ASPECT_KEYWORDS = {
        "product_quality": ["quality", "build", "material", "durable", "cheap"],
        "customer_service": ["support", "service", "help", "response", "staff"],
        "price": ["price", "cost", "expensive", "cheap", "value", "worth"],
        "delivery": ["shipping", "delivery", "arrived", "package", "fast"],
        "usability": ["easy", "difficult", "intuitive", "confusing", "user-friendly"],
    }

    def __init__(self, sentiment_model: str = "cardiffnlp/twitter-roberta-base-sentiment-latest"):
        self.sentiment_pipeline = hf_pipeline("sentiment-analysis", model=sentiment_model)

    def extract_aspects(self, text: str) -> List[str]:
        text_lower = text.lower()
        found_aspects = []
        for aspect, keywords in self.ASPECT_KEYWORDS.items():
            if any(kw in text_lower for kw in keywords):
                found_aspects.append(aspect)
        return found_aspects if found_aspects else ["general"]

    def analyze(self, text: str) -> Dict:
        aspects = self.extract_aspects(text)
        sentences = re.split(r"[.!?]+", text)

        aspect_results = {}
        for aspect in aspects:
            relevant_sentences = [
                s.strip() for s in sentences
                if any(kw in s.lower() for kw in self.ASPECT_KEYWORDS.get(aspect, []))
            ]
            if relevant_sentences:
                combined = " ".join(relevant_sentences)
                result = self.sentiment_pipeline(combined[:512])[0]
                aspect_results[aspect] = {
                    "sentiment": result["label"],
                    "confidence": result["score"],
                    "text_snippet": combined[:200],
                }
            else:
                overall = self.sentiment_pipeline(text[:512])[0]
                aspect_results[aspect] = {
                    "sentiment": overall["label"],
                    "confidence": overall["score"] * 0.7,
                    "text_snippet": text[:200],
                }

        return {"aspects": aspect_results, "overall": self.sentiment_pipeline(text[:512])[0]}

Step 5: Real-Time Inference API

# src/api/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
from contextlib import asynccontextmanager
import torch
from transformers import BertTokenizer, BertForSequenceClassification
import time

LABEL_MAP = {0: "negative", 1: "neutral", 2: "positive", 3: "mixed"}


@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.tokenizer = BertTokenizer.from_pretrained("./sentiment_model")
    app.state.model = BertForSequenceClassification.from_pretrained("./sentiment_model")
    app.state.model.eval()
    app.state.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    app.state.model.to(app.state.device)
    yield


app = FastAPI(title="Sentiment API", lifespan=lifespan)


class SentimentRequest(BaseModel):
    text: str = Field(..., min_length=1, max_length=5000)
    return_aspects: bool = False


class SentimentResponse(BaseModel):
    sentiment: str
    confidence: float
    all_scores: Dict[str, float]
    latency_ms: float
    aspects: Optional[Dict] = None


@app.post("/api/v1/sentiment", response_model=SentimentResponse)
async def analyze_sentiment(request: SentimentRequest):
    start = time.time()
    tokenizer = app.state.tokenizer
    model = app.state.model

    encoding = tokenizer(
        request.text, max_length=128, padding="max_length",
        truncation=True, return_tensors="pt"
    )

    with torch.no_grad():
        outputs = model(
            input_ids=encoding["input_ids"].to(app.state.device),
            attention_mask=encoding["attention_mask"].to(app.state.device),
        )

    probs = torch.softmax(outputs.logits, dim=-1)[0].cpu().numpy()
    pred_idx = int(np.argmax(probs))
    latency = (time.time() - start) * 1000

    return SentimentResponse(
        sentiment=LABEL_MAP[pred_idx],
        confidence=float(probs[pred_idx]),
        all_scores={LABEL_MAP[i]: float(p) for i, p in enumerate(probs)},
        latency_ms=latency,
    )


@app.post("/api/v1/sentiment/batch")
async def batch_sentiment(texts: List[str]):
    results = []
    for text in texts[:100]:
        result = await analyze_sentiment(SentimentRequest(text=text))
        results.append(result.dict())
    return {"results": results, "total": len(results)}

ℹ️

Use domain-specific fine-tuning data. Generic sentiment models perform poorly on specialized domains like healthcare or finance. Collect and annotate domain-specific training data.

πŸ’‘

Implement a fallback chain: if the fine-tuned model has low confidence, fall back to a general-purpose sentiment model. This improves coverage for edge cases.

Performance Metrics

MetricTargetDescription
Accuracy> 92%4-class sentiment
F1 Score> 0.90Weighted F1
Inference Latency< 30msSingle text (GPU)
Batch Throughput> 1000 texts/sSpark NLP
Aspect Precision> 0.85Aspect extraction

Interview Talking Points

  1. BERT Fine-Tuning: Adding a classification head to BERT and fine-tuning on domain data typically achieves 5-10% improvement over zero-shot.
  2. Aspect-Based Analysis: Decomposing sentiment by aspect provides actionable insights that overall sentiment misses.
  3. Spark NLP: Enables processing millions of documents with GPU-accelerated NLP pipelines at scale.
  4. Model Selection: RoBERTa often outperforms BERT for sentiment. Consider domain-specific models like FinBERT for finance.
  5. Multi-Label: Some texts express mixed sentiments. Multi-label classification captures this nuance.
  6. Deployment: ONNX export + Triton Inference Server provides optimal serving throughput.

⚠️

Sentiment models can encode biases from training data. Regularly audit model predictions across demographic groups and retrain with balanced datasets.

ℹ️

For production deployment, consider using DistilBERT for 2x faster inference with minimal accuracy loss (typically 1-2% drop).

Advertisement