PySpark Advanced Interview Series
Module 04: Spark SQL β The Power of SQL on Distributed Data
Interview Question
"At Google, BigQuery and Spark SQL share many concepts. Walk us through how Spark SQL processes a complex query with subqueries, CTEs, and window functions. How does the Catalyst optimizer transform this query, and what control do you have over execution plans?" β Google Data Engineer Interview
"At Microsoft, we use Spark SQL extensively for Azure Synapse Analytics. Explain the difference between global temporary views, local temporary views, and managed tables. How do you register a UDF that works in both DataFrame API and SQL context?" β Microsoft Senior Data Engineer Interview
Spark SQL Architecture
Spark SQL is Spark's module for structured data processing. It provides:
- SQL Interface: Execute SQL queries on DataFrames and tables
- Catalyst Optimizer: Automatic query optimization
- Tungsten Execution Engine: Efficient code generation
- Catalog API: Metadata management
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("SparkSQLInterview") \
.enableHiveSupport() \
.config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
.getOrCreate()
Temporary Views
Local Temporary View
Scoped to the current SparkSession. Lost when session ends.
# Create DataFrame
df = spark.read.parquet("s3a://bucket/employees/")
# Register as local temporary view
df.createOrReplaceTempView("employees")
# Also works with createTempView (throws error if exists)
df.createTempView("employees_v2")
# Query using SQL
result = spark.sql("SELECT * FROM employees WHERE age > 30")
result.show()
Global Temporary View
Shared across SparkSessions within the same Spark application. Stored in the global_temp database.
# Register as global temporary view
df.createOrReplaceGlobalTempView("global_employees")
# Query using global_temp database prefix
spark.sql("SELECT * FROM global_temp.global_employees WHERE age > 30").show()
# Access from different SparkSession
spark2 = SparkSession.builder.appName("SecondSession").getOrCreate()
spark2.sql("SELECT * FROM global_temp.global_employees").show()
β οΈCommon Pitfall
Global temporary views are NOT persistent. They're stored in the in-memory catalog and lost when the Spark application stops. For persistent storage, use managed or external tables.
Managed vs External Tables
# Managed table: Spark manages both metadata and data
spark.sql("CREATE TABLE managed_employees (name STRING, age INT)")
# Data stored in spark.sql.warehouse.dir
# External table: Spark manages metadata, you manage data
spark.sql("""
CREATE EXTERNAL TABLE external_employees (
name STRING,
age INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION 's3a://bucket/external-employees/'
""")
# Insert data
spark.sql("INSERT INTO managed_employees SELECT * FROM employees")
SQL Query Fundamentals
Basic Queries
# Simple SELECT
spark.sql("SELECT name, age FROM employees").show()
# WHERE clause
spark.sql("""
SELECT name, age, department
FROM employees
WHERE age > 30 AND department = 'Engineering'
""").show()
# ORDER BY
spark.sql("""
SELECT name, age
FROM employees
ORDER BY age DESC
""").show()
# GROUP BY
spark.sql("""
SELECT department, COUNT(*) as count, AVG(age) as avg_age
FROM employees
GROUP BY department
ORDER BY count DESC
""").show()
Subqueries
# Scalar subquery
spark.sql("""
SELECT name, age,
(SELECT AVG(age) FROM employees) as company_avg_age
FROM employees
""").show()
# IN subquery
spark.sql("""
SELECT name, department
FROM employees
WHERE department IN (
SELECT department FROM employees WHERE age > 35
)
""").show()
# Correlated subquery
spark.sql("""
SELECT e.name, e.age, e.department
FROM employees e
WHERE e.age > (
SELECT AVG(e2.age)
FROM employees e2
WHERE e2.department = e.department
)
""").show()
Common Table Expressions (CTEs)
# Single CTE
spark.sql("""
WITH department_stats AS (
SELECT department, COUNT(*) as count, AVG(age) as avg_age
FROM employees
GROUP BY department
)
SELECT d.name, d.age, d.department, ds.avg_age
FROM employees d
JOIN department_stats ds ON d.department = ds.department
WHERE d.age > ds.avg_age
""").show()
# Multiple CTEs
spark.sql("""
WITH
senior_employees AS (
SELECT * FROM employees WHERE age >= 35
),
dept_counts AS (
SELECT department, COUNT(*) as senior_count
FROM senior_employees
GROUP BY department
)
SELECT e.name, e.age, e.department, dc.senior_count
FROM employees e
JOIN dept_counts dc ON e.department = dc.department
ORDER BY dc.senior_count DESC
""").show()
Window Functions in SQL
spark.sql("""
SELECT
name,
age,
department,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY age DESC) as rank,
LAG(age, 1) OVER (PARTITION BY department ORDER BY age) as prev_age,
LEAD(age, 1) OVER (PARTITION BY department ORDER BY age) as next_age,
AVG(age) OVER (PARTITION BY department) as dept_avg_age,
SUM(age) OVER (PARTITION BY department ORDER BY age ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_total
FROM employees
""").show()
Registering UDFs for SQL
from pyspark.sql.functions import udf, pandas_udf
from pyspark.sql.types import StringType, IntegerType
import pandas as pd
# Simple UDF for SQL
@udf(returnType=StringType())
def categorize_age(age):
if age is None:
return "Unknown"
elif age < 25:
return "Junior"
elif age < 35:
return "Mid-Level"
else:
return "Senior"
# Register UDF for SQL
spark.udf.register("categorize_age", categorize_age)
# Use in SQL
spark.sql("""
SELECT name, age, categorize_age(age) as category
FROM employees
""").show()
# Pandas UDF for SQL (Spark 3.0+)
@pandas_udf(StringType())
def upper_name(name: pd.Series) -> pd.Series:
return name.str.upper()
spark.udf.register("upper_name", upper_name)
spark.sql("SELECT upper_name(name) FROM employees").show()
βΉοΈGoogle Interview Insight
At Google, interviewers expect you to understand that UDFs bypass Catalyst optimization. When you register a UDF, the optimizer can't push predicates through it or optimize its execution. Always prefer built-in SQL functions when possible.
Catalog Operations
# List databases
spark.sql("SHOW DATABASES").show()
# Use database
spark.sql("USE my_database")
# List tables
spark.sql("SHOW TABLES").show()
# Describe table
spark.sql("DESCRIBE employees").show()
# Detailed table info
spark.sql("DESCRIBE EXTENDED employees").show()
# Show create table statement
spark.sql("SHOW CREATE TABLE employees").show()
# Drop table
spark.sql("DROP TABLE IF EXISTS temp_employees")
# Show partitions
spark.sql("SHOW PARTITIONS logs").show()
Real-World Scenario: Google Analytics Pipeline
Problem Statement
Build a complex SQL pipeline that analyzes user behavior across multiple sessions, computes cohort retention, and generates marketing insights using Spark SQL.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("GoogleAnalyticsPipeline") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# Register raw data as temporary views
page_views = spark.read.parquet("s3a://analytics/page-views/")
page_views.createOrReplaceTempView("page_views")
sessions = spark.read.parquet("s3a://analytics/sessions/")
sessions.createOrReplaceTempView("sessions")
users = spark.read.parquet("s3a://analytics/users/")
users.createOrReplaceTempView("users")
# Complex CTE-based analytics
cohort_analysis = spark.sql("""
WITH
-- Define cohorts by signup month
user_cohorts AS (
SELECT
user_id,
DATE_FORMAT(signup_date, 'yyyy-MM') as cohort_month,
signup_date
FROM users
),
-- Monthly activity per user
monthly_activity AS (
SELECT
pv.user_id,
DATE_FORMAT(pv.event_date, 'yyyy-MM') as activity_month,
COUNT(*) as page_views,
COUNT(DISTINCT pv.session_id) as sessions
FROM page_views pv
GROUP BY pv.user_id, DATE_FORMAT(pv.event_date, 'yyyy-MM')
),
-- Cohort retention matrix
cohort_retention AS (
SELECT
uc.cohort_month,
ma.activity_month,
COUNT(DISTINCT ma.user_id) as active_users,
DATEDIFF(
TO_DATE(ma.activity_month, 'yyyy-MM'),
TO_DATE(uc.cohort_month, 'yyyy-MM')
) / 30 as months_since_signup
FROM user_cohorts uc
JOIN monthly_activity ma ON uc.user_id = ma.user_id
GROUP BY uc.cohort_month, ma.activity_month
),
-- Cohort sizes
cohort_sizes AS (
SELECT
cohort_month,
COUNT(DISTINCT user_id) as cohort_size
FROM user_cohorts
GROUP BY cohort_month
)
-- Final retention calculation
SELECT
cr.cohort_month,
cr.activity_month,
cr.months_since_signup,
cr.active_users,
cs.cohort_size,
ROUND(cr.active_users / cs.cohort_size * 100, 2) as retention_rate
FROM cohort_retention cr
JOIN cohort_sizes cs ON cr.cohort_month = cs.cohort_month
WHERE cr.months_since_signup >= 0
ORDER BY cr.cohort_month, cr.months_since_signup
""")
cohort_analysis.show(50, truncate=False)
# Session-level funnel analysis
funnel_analysis = spark.sql("""
WITH
sessions_with_events AS (
SELECT
s.session_id,
s.user_id,
s.start_time,
MAX(CASE WHEN pv.page = '/home' THEN 1 ELSE 0 END) as visited_home,
MAX(CASE WHEN pv.page = '/products' THEN 1 ELSE 0 END) as visited_products,
MAX(CASE WHEN pv.page = '/cart' THEN 1 ELSE 0 END) as visited_cart,
MAX(CASE WHEN pv.page = '/checkout' THEN 1 ELSE 0 END) as visited_checkout,
MAX(CASE WHEN pv.page = '/confirmation' THEN 1 ELSE 0 END) as completed_purchase
FROM sessions s
LEFT JOIN page_views pv ON s.session_id = pv.session_id
GROUP BY s.session_id, s.user_id, s.start_time
)
SELECT
COUNT(DISTINCT session_id) as total_sessions,
SUM(visited_home) as home_visits,
SUM(visited_products) as product_views,
SUM(visited_cart) as cart_adds,
SUM(visited_checkout) as checkout_starts,
SUM(completed_purchase) as purchases,
ROUND(SUM(completed_purchase) / COUNT(DISTINCT session_id) * 100, 2) as conversion_rate,
ROUND(SUM(visited_cart) / NULLIF(SUM(visited_products), 0) * 100, 2) as product_to_cart_rate,
ROUND(SUM(completed_purchase) / NULLIF(SUM(visited_cart), 0) * 100, 2) as cart_to_purchase_rate
FROM sessions_with_events
""")
funnel_analysis.show(truncate=False)
spark.stop()
SQL Performance Optimization
Predicate Pushdown
# BAD: Reads all data, then filters
spark.sql("""
SELECT * FROM (
SELECT * FROM logs
) filtered
WHERE date = '2024-01-01'
""")
# GOOD: Predicate pushed down to scan
spark.sql("""
SELECT * FROM logs
WHERE date = '2024-01-01'
""")
# Verify pushdown with EXPLAIN
spark.sql("EXPLAIN SELECT * FROM logs WHERE date = '2024-01-01'").show(truncate=False)
Join Optimization Hints
# Broadcast hint
spark.sql("""
/*+ BROADCAST(small_dim) */
SELECT l.*, d.category
FROM large_fact l
JOIN small_dim d ON l.dim_id = d.id
""")
# Shuffle hash join hint
spark.sql("""
/*+ SHUFFLE_HASH(medium_table) */
SELECT * FROM a JOIN b ON a.id = b.id
""")
# Sort-merge join hint
spark.sql("""
/*+ SHUFFLE_MERGE(a, b) */
SELECT * FROM a JOIN b ON a.id = b.id
""")
AQE Integration
spark.conf.set("spark.sql.adaptive.enabled", "true")
# AQE automatically:
# 1. Coalesces small partitions after shuffle
# 2. Converts sort-merge join to broadcast join if one side is small
# 3. Handles skewed joins
# 4. Optimizes skew joins
# View AQE decisions
spark.sql("EXPLAIN FORMATTED SELECT ...").show(truncate=False)
π‘Microsoft Pro Tip
In Azure Synapse Analytics, Spark SQL integrates with serverless SQL pools. Use CTAS (CREATE TABLE AS SELECT) for materializing query results, and leverage distribution hints (ROUNDROBIN, HASH, REPLICATE) for optimal data distribution.
Dynamic SQL and Parameterized Queries
# Parameterized queries
table_name = "employees"
min_age = 30
# Using f-string (NOT recommended for production β SQL injection risk)
spark.sql(f"SELECT * FROM {table_name} WHERE age > {min_age}")
# Using temporary variables (safe)
spark.sql("SELECT * FROM {} WHERE age > {}".format(table_name, min_age))
# Using spark.sql.variable.substitute (Spark 3.0+)
spark.conf.set("spark.sql.variable.substitute", "true")
spark.sql("SELECT * FROM ${table_name} WHERE age > ${min_age}")
# Using Column objects (most Pythonic)
from pyspark.sql.functions import col
df = spark.table(table_name).filter(col("age") > min_age)
Edge Cases
1. Case Sensitivity
# Spark SQL is case-insensitive for table/column names by default
spark.sql("SELECT Name FROM EMPLOYEES") # Works
# Make it case-sensitive
spark.conf.set("spark.sql.caseSensitive", "true")
# Quote identifiers for case-sensitive names
spark.sql('SELECT `Name` FROM `Employees` WHERE `Name` = "Alice"')
2. Complex Types in SQL
# Array functions
spark.sql("""
SELECT
name,
skills[0] as first_skill,
SIZE(skills) as skill_count,
ARRAY_CONTAINS(skills, 'Python') as knows_python
FROM employees
""")
# Map functions
spark.sql("""
SELECT
name,
attributes['department'] as dept,
MAP_KEYS(attributes) as attr_keys
FROM employees
""")
# Struct functions
spark.sql("""
SELECT
name,
address.city,
address.zipcode
FROM employees
""")
3. Dynamic Partition Pruning
# Spark automatically prunes partitions based on WHERE clauses
# Ensure your partition column is used directly
spark.sql("""
SELECT * FROM logs
WHERE date = '2024-01-01' -- Prunes to single partition
""")
# Verify pruning in explain plan
spark.sql("EXPLAIN SELECT * FROM logs WHERE date = '2024-01-01'").show(truncate=False)
DataFrame API vs SQL
| Feature | DataFrame API | Spark SQL |
|---|---|---|
| Type Safety | Partial (runtime) | None |
| Readability | Code-based | Declarative |
| Optimization | Same (Catalyst) | Same (Catalyst) |
| UDF Support | Yes | Yes (registered) |
| IDE Support | Better | Limited |
| Complex Logic | More verbose | More concise |
| Team Collaboration | Better for devs | Better for analysts |
βΉοΈInterview Tip
At Google and Microsoft, interviewers often ask you to solve the same problem using both DataFrame API and SQL. Practice doing both fluently. SQL is often more concise for complex aggregations; DataFrame API is better for programmatic transformations.
Best Practices
π‘Production SQL Best Practices
- Use CTEs for readability instead of nested subqueries
- Always alias tables in joins to avoid ambiguity
- Use EXPLAIN to verify query plans before production deployment
- Register UDFs only when built-in functions don't suffice
- Prefer parameterized queries to prevent SQL injection
- Use broadcast hints for small dimension tables
- Enable AQE for automatic optimization
- Cache results for repeated queries
Summary
Spark SQL provides a powerful SQL interface to Spark's distributed computing engine. Mastering temporary views, CTEs, window functions, and UDF registration is essential for Google and Microsoft data engineering roles. Understanding how Catalyst optimizes SQL queries helps you write performant code and debug slow queries effectively.