πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Large-Scale Patterns: Window Functions, Approx Algorithms

Apache SparkAdvanced Patterns⭐ Premium

Advertisement

Large-Scale Patterns: Window Functions, Approx Algorithms

Difficulty: Expert | Companies: Meta, Google, Netflix, Uber, LinkedIn

ℹ️Interview Context

Large-scale patterns are essential for processing petabyte-scale datasets. Interviewers expect knowledge of window functions, approximate algorithms, and when to use each pattern for optimal performance.

Question

Explain window functions in Spark and their performance implications. What are approximate algorithms and when should you use them? Describe patterns for handling large-scale data processing: pagination, deduplication, and hierarchical data. Provide mathematical analysis of each pattern.


Detailed Answer

1. Window Functions β€” Complete Guide

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("LargeScalePatterns") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Window function types:
# 1. Ranking: ROW_NUMBER, RANK, DENSE_RANK, NTILE
# 2. Analytic: LAG, LEAD, FIRST_VALUE, LAST_VALUE
# 3. Aggregate: SUM, AVG, COUNT over window

# Example data
df = spark.createDataFrame([
    ("A", "2024-01-01", 100),
    ("A", "2024-01-02", 200),
    ("A", "2024-01-03", 150),
    ("B", "2024-01-01", 300),
    ("B", "2024-01-02", 250),
], ["group", "date", "value"])

# Define window
window_spec = Window.partitionBy("group").orderBy("date")

# Ranking functions
df_ranked = df.withColumn("row_number", F.row_number().over(window_spec)) \
    .withColumn("rank", F.rank().over(window_spec)) \
    .withColumn("dense_rank", F.dense_rank().over(window_spec)) \
    .withColumn("ntile", F.ntile(2).over(window_spec))

# Analytic functions
df_lagged = df.withColumn(
    "prev_value", F.lag("value", 1).over(window_spec)
).withColumn(
    "next_value", F.lead("value", 1).over(window_spec)
).withColumn(
    "first_value", F.first("value").over(window_spec)
).withColumn(
    "last_value", F.last("value").over(window_spec)
)

# Aggregate over window
window_agg = Window.partitionBy("group")
df_agg = df.withColumn(
    "running_sum", F.sum("value").over(window_spec)
).withColumn(
    "group_total", F.sum("value").over(window_agg)
).withColumn(
    "pct_of_group", F.col("value") / F.col("group_total")
)

# Window function performance analysis:
# Let N = total rows, P = partitions, W = window size
#
# Without window function:
#   Sort: O(N log N)
#   Process: O(N)
#   Total: O(N log N)
#
# With window function:
#   Partition by key: O(N) shuffle
#   Sort within partition: O(N/P Γ— log(N/P)) per partition
#   Compute window: O(N/P Γ— W) per partition
#   Total: O(N + N/P Γ— (log(N/P) + W))
#
# Memory per partition: O(N/P Γ— W) for window buffer

2. Approximate Algorithms

# Approximate algorithms: Trade accuracy for performance
# Useful when exact answers aren't required

# 1. Approximate Count Distinct (HyperLogLog)
df = spark.range(10000000).withColumn(
    "category", (F.rand() * 100).cast("int")
)

# Exact count distinct
exact_count = df.select(F.countDistinct("category")).collect()[0][0]

# Approximate count distinct (1% error)
approx_count = df.select(
    F.approx_count_distinct("category", 0.01)
).collect()[0][0]

print(f"Exact: {exact_count}, Approx: {approx_count}")

# 2. Approximate Quantiles (t-Digest)
df_quantiles = spark.range(1000000).withColumn(
    "value", F.randn()
)

# Exact quantiles (expensive)
exact_p50 = df_quantiles.approxQuantile("value", [0.5], 0.0)[0]

# Approximate quantiles (fast)
approx_p50 = df_quantiles.approxQuantile("value", [0.5], 0.01)[0]

# 3. Bloom Filter
from pyspark.sql.types import BooleanType

# Create Bloom filter
bloom_df = df.select(
    F.bloom_filter("category", 1000000, 0.03).alias("bloom")
)

# Check membership
@F.udf(returnType=BooleanType())
def bloom_check(bloom, value):
    return bloom.mightContain(value)

# 4. Count-Min Sketch
# Used for frequency estimation
df_cms = df.select(
    F.count_min_sketch("category", 0.01, 0.95).alias("cms")
)

# Mathematical analysis:
# HyperLogLog:
#   Space: O(log(log(N))) bytes (typically 1-10 KB)
#   Accuracy: 1.04 / sqrt(m) where m = number of registers
#   For 1% accuracy: m = 10816 registers β‰ˆ 10 KB
#
# t-Digest:
#   Space: O(1/Ξ΄) where Ξ΄ = accuracy parameter
#   Accuracy: Β±Ξ΄N for quantile queries
#   For Ξ΄=0.01: ~100 centroids β‰ˆ 1 KB
#
# Bloom Filter:
#   Space: O(-ln(p) Γ— N / (ln(2))^2) where p = false positive rate
#   For p=0.01, N=1M: ~1.14 MB
#   Accuracy: No false negatives, p false positives
#
# Count-Min Sketch:
#   Space: O(1/Ξ΅ Γ— log(1/Ξ΄)) where Ξ΅ = error, Ξ΄ = confidence
#   For Ξ΅=0.01, Ξ΄=0.05: ~60 counters β‰ˆ 480 bytes
# When to use approximate algorithms:
# 1. Exact answer not required (dashboards, monitoring)
# 2. Dataset too large for exact computation
# 3. Latency requirements strict
# 4. Memory constraints

# Example: Real-time dashboard
# Exact count distinct: O(N) memory, 100% accurate
# Approximate count distinct: O(1) memory, 99% accurate
# Use approximate for dashboards where 1% error is acceptable

3. Pagination Pattern

# Pagination: Process large datasets in batches
# Useful for: API integration, memory-constrained processing

def paginate_dataframe(df, page_size=1000):
    """Process DataFrame in pages."""
    total_rows = df.count()
    total_pages = (total_rows + page_size - 1) // page_size
    
    results = []
    for page in range(total_pages):
        offset = page * page_size
        page_df = df.limit(page_size).offset(offset)
        
        # Process page
        processed = process_page(page_df)
        results.append(processed)
    
    return reduce(unionByName, results)

# Window-based pagination (more efficient):
def paginate_with_window(df, page_size=1000):
    """Pagination using window functions."""
    window = Window.orderBy(F.monotonically_increasing_id())
    
    return df.withColumn(
        "row_num", F.row_number().over(window)
    ).withColumn(
        "page", F.floor((F.col("row_num") - 1) / page_size)
    )

# Mathematical analysis:
# Let N = total rows, P = page_size
# Sequential pagination: O(N Γ— P) β€” reads P rows N times
# Window pagination: O(N) β€” single pass
# Offset pagination: O(N^2) β€” scans N rows for each page
#
# Recommendation: Use window-based pagination

4. Deduplication Patterns

# Deduplication: Remove duplicate rows
# Critical for: idempotent processing, data quality

# Pattern 1: Drop duplicates by all columns
df_deduped = df.dropDuplicates()

# Pattern 2: Drop duplicates by specific columns
df_deduped = df.dropDuplicates(["user_id", "event_type"])

# Pattern 3: Keep latest record (by timestamp)
window = Window.partitionBy("user_id").orderBy(F.desc("timestamp"))
df_deduped = df.withColumn(
    "row_num", F.row_number().over(window)
).filter(F.col("row_num") == 1).drop("row_num")

# Pattern 4: Keep first non-null value
from pyspark.sql import Window

window = Window.partitionBy("user_id").orderBy(F.desc("timestamp"))
df_deduped = df.withColumn(
    "row_num", F.row_number().over(window)
).filter(F.col("row_num") == 1).drop("row_num")

# Deduplication performance:
# Let N = rows, D = duplicates, K = dedup keys
#
# dropDuplicates(): O(N Γ— K) β€” hash all columns
# dropDuplicates(["key"]): O(N Γ— K) β€” hash key column
# Window-based: O(N log N) β€” sort within partition
#
# For large datasets with many duplicates:
# 1. Filter obvious duplicates first: df.filter(F.col("is_duplicate") == False)
# 2. Use approximate algorithms for probabilistic dedup
# 3. Use Delta Lake MERGE for idempotent upserts

5. Hierarchical Data Patterns

# Hierarchical data: Tree/graph structures
# Examples: org charts, product categories, social networks

# Pattern 1: Flatten hierarchy with explode
df = spark.createDataFrame([
    (1, ["A", "B", "C"]),
    (2, ["D", "E"]),
], ["id", "categories"])

df_flat = df.withColumn("category", F.explode("categories"))

# Pattern 2: Recursive query with broadcast
# For small hierarchies, broadcast the lookup table
hierarchy = spark.createDataFrame([
    ("A", None),
    ("B", "A"),
    ("C", "B"),
    ("D", "A"),
], ["id", "parent_id"])

# Recursive traversal (limited depth)
def traverse_hierarchy(hierarchy, max_depth=10):
    """Traverse hierarchy using iterative broadcast joins."""
    result = hierarchy.filter(F.col("parent_id").isnull())
    
    for depth in range(max_depth):
        # Join with hierarchy to find children
        children = result.join(
            F.broadcast(hierarchy),
            result["id"] == hierarchy["parent_id"],
            "inner"
        ).select(hierarchy["id"], hierarchy["parent_id"])
        
        if children.count() == 0:
            break
        
        result = result.union(children)
    
    return result

# Pattern 3: Graph processing with GraphX (Scala)
# For complex graph algorithms, use GraphX
# Python equivalent: NetworkX for small graphs, custom for large

# Hierarchical data performance:
# Let D = depth, B = branching factor, N = total nodes
#
# Flatten: O(N) β€” single pass
# Recursive: O(N Γ— D) β€” D iterations, N nodes per iteration
# Broadcast join: O(N Γ— B) β€” B children per parent
#
# For deep hierarchies (D > 10):
# Use iterative approach with checkpointing
# Or use GraphX for complex traversals

6. Pivot and Unpivot Patterns

# Pivot: Convert rows to columns
df = spark.createDataFrame([
    ("Alice", "Math", 90),
    ("Alice", "Science", 85),
    ("Bob", "Math", 75),
    ("Bob", "Science", 80),
], ["student", "subject", "score"])

# Pivot
df_pivot = df.groupBy("student").pivot("subject").agg(F.first("score"))
# +-------+----+-------+
# |student|Math|Science|
# +-------+----+-------+
# |  Alice|  90|     85|
# |    Bob|  75|     80|
# +-------+----+-------+

# Unpivot (melt): Convert columns to rows
df_unpivoted = df_pivot.select(
    "student",
    F.expr("stack(2, 'Math', Math, 'Science', Science)").alias(
        "subject", "score"
    )
)

# Pivot performance:
# Let N = rows, K = pivot columns
# Pivot: O(N Γ— K) β€” group and aggregate
# Unpivot: O(N Γ— K) β€” stack operation
#
# For high cardinality pivot columns:
# Use mapReduce pattern or custom UDF

7. Sampling and Approximation Patterns

# Pattern 1: Stratified sampling
df_sampled = df.sampleBy(
    "category",
    fractions={"A": 0.1, "B": 0.2, "C": 0.3},
    seed=42
)

# Pattern 2: reservoir sampling
def reservoir_sample(df, k, seed=42):
    """Reservoir sampling for streaming data."""
    window = Window.orderBy(F.rand(seed))
    return df.withColumn(
        "row_num", F.row_number().over(window)
    ).filter(F.col("row_num") <= k).drop("row_num")

# Pattern 3: Weighted sampling
df_weighted = df.sampleBy(
    "weight",
    fractions={"high": 0.5, "medium": 0.3, "low": 0.1},
    seed=42
)

# Sampling performance:
# Let N = population, n = sample size
# Simple sampling: O(N) β€” full scan
# Stratified sampling: O(N) β€” full scan with strata
# Reservoir sampling: O(N) β€” single pass
#
# For approximate queries:
# Use approx_count_distinct, approxQuantile, etc.
# Trade accuracy for 10-100x speedup

⚠️Common Pitfall

Window functions without ORDER BY produce undefined results. Always specify ORDER BY for deterministic ranking and analytic functions. Without it, Spark may produce different results on different runs.

πŸ’‘Interview Tip

When discussing large-scale patterns, always mention the accuracy-performance tradeoff: exact algorithms guarantee correctness but may be slow; approximate algorithms are fast but may have errors. Choose based on use case requirements.


Summary

PatternPurposePerformanceAccuracy
Window FunctionsRanking, analytics over partitionsO(N/P Γ— W)100%
Approximate AlgorithmsFast estimationO(1) space95-99%
PaginationBatch processingO(N)100%
DeduplicationRemove duplicatesO(N Γ— K)100%
HierarchicalTree traversalO(N Γ— D)100%
Pivot/UnpivotReshape dataO(N Γ— K)100%

The key to large-scale patterns is understanding the space-time-accuracy tradeoff and choosing the right pattern for your specific requirements.

Advertisement