🌍 Geospatial Data Processing in PySpark
Spatial Indexing & Join Strategies
Architecture Diagram
Detailed Explanation
Fundamental Challenge
Spatial data has geometric properties (points, lines, polygons) that cannot be directly compared using standard relational operators.
Key Takeaway: Spatial operations require specialized indexing structures and join strategies for the two-dimensional nature of geographic data.
Coordinate Reference Systems (CRS)
| System | Code | Use Case |
|---|---|---|
| WGS84 | EPSG:4326 | Latitude/longitude coordinates |
| UTM | Various | Projected coordinates in meters |
Important: Converting between systems is essential for accurate distance calculations—using lat/lng directly introduces errors due to Earth's curvature.
Spatial Indexing Structures
| Index | Creator | Method |
|---|---|---|
| H3 | Uber | Hierarchical hexagonal cells |
| S2 | Sphere-to-cube projection with quadtree | |
| Geohash | — | Space-filling curve (Morton order) |
Benefit: Reduces spatial join complexity from O(n×m) to near-linear.
Spatial Join Strategies
| Strategy | When to Use | Limitation |
|---|---|---|
| Broadcast Join | Small dataset fits in memory | Requires small dataset |
| KD-Tree Join | Medium datasets | Build index on each partition |
| Grid-based Join | Uniform distribution | Fails with skewed data |
Point-in-Polygon Operations
Most common spatial query—determines if a point lies within a polygon.
- Uses ray-casting or winding number algorithm
- O(n) complexity where n = number of polygon vertices
- Spatial indexing reduces search space via bounding box pre-filtering
Trajectory Analysis
Processes sequences of location points to extract movement patterns.
Components:
- Temporal ordering — Sort by timestamp
- Stay-point detection — Identify stationary locations (distance + time thresholds)
- Trajectory segmentation — Divide movement into meaningful trips
Stay Point Definition: Entity remains within distance threshold for at least minimum duration.
Mathematical Foundations
Definition: Haversine Distance
The great-circle distance between two points and on a sphere of radius is:
Spatial Index Resolution
For H3 index resolution (0-15), the average hexagon area is:
Resolution 8 gives 464m edge length; resolution 9 gives 174m.
Spatial Join Complexity
Without indexing, a spatial join between datasets of sizes and has complexity . With grid-based spatial indexing dividing the space into cells:
when data is uniformly distributed across cells.
Convex Hull Area
For polygon with vertices , the shoelace formula gives area:
Nearest Neighbor Search
For -NN with spatial index, the search prunes cells whose minimum bounding rectangle distance exceeds the current -th nearest distance :
Key Insight
Spark's ST_* functions from Sedona compute exact geometry operations, while H3/S2 provide approximate but indexable representations. Use exact geometry for correctness-critical joins, approximate indexing for exploratory analysis at scale.
Summary
Geospatial processing on Spark combines exact geometric operations (Haversine, polygon intersection) with hierarchical spatial indices (H3, S2). Spatial indexing reduces join complexity from to near-linear. Coordinate reference system transformations must precede any distance calculations.
Key Concepts Table
| Concept | Description | Use Case |
|---|---|---|
| H3 Index | Hexagonal hierarchical spatial index | Location analytics, ride-sharing |
| S2 Cell | Quadtree cell on sphere | Maps, street view, global coverage |
| Geohash | Prefix-based 1D encoding | Proximity search, caching |
| Bounding Box | Minimum enclosing rectangle | Index filtering, quick rejection |
| Point-in-Polygon | Containment test for points | Store finder, zone assignment |
| KNN Query | K nearest neighbors | Nearest facility, POI search |
| Buffer | Points within distance | Service area, proximity analysis |
| Convex Hull | Smallest convex polygon | Area of coverage, boundary |
| Spatial Join | Join based on geometric predicates | Region-based aggregation |
| Trajectory | Ordered sequence of locations | Movement analysis, route optimization |
Code Examples
Coordinate Operations and Distance Calculations
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import math
spark = SparkSession.builder \
.appName("GeospatialData") \
.getOrCreate()
# Define UDFs for spatial calculations
@udf(returnType=DoubleType())
def haversine_distance(lat1, lon1, lat2, lon2):
"""Calculate great-circle distance between two points using Haversine formula."""
R = 6371000 # Earth's radius in meters
lat1_rad = math.radians(lat1)
lat2_rad = math.radians(lat2)
delta_lat = math.radians(lat2 - lat1)
delta_lon = math.radians(lon2 - lon1)
a = (math.sin(delta_lat / 2) ** 2 +
math.cos(lat1_rad) * math.cos(lat2_rad) *
math.sin(delta_lon / 2) ** 2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return R * c
@udf(returnType=StringType())
def geohash_encode(lat, lon, precision=12):
"""Encode coordinates to geohash."""
BASE32 = '0123456789bcdefghjkmnpqrstuvwxyz'
lat_range = [-90.0, 90.0]
lon_range = [-180.0, 180.0]
geohash = []
bit = 0
ch = 0
is_lon = True
while len(geohash) < precision:
if is_lon:
mid = (lon_range[0] + lon_range[1]) / 2
if lon >= mid:
ch |= (1 << (4 - bit))
lon_range[0] = mid
else:
lon_range[1] = mid
else:
mid = (lat_range[0] + lat_range[1]) / 2
if lat >= mid:
ch |= (1 << (4 - bit))
lat_range[0] = mid
else:
lat_range[1] = mid
is_lon = not is_lon
bit += 1
if bit == 5:
geohash.append(BASE32[ch])
bit = 0
ch = 0
return ''.join(geohash)
# Create point data (restaurants)
restaurants = [
("R001", "Pizza Palace", 40.7128, -74.0060, "Italian"),
("R002", "Burger Barn", 40.7580, -73.9855, "American"),
("R003", "Sushi Spot", 40.7484, -73.9857, "Japanese"),
("R004", "Taco Town", 40.7614, -73.9776, "Mexican"),
("R005", "Noodle Nest", 40.7831, -73.9712, "Chinese"),
]
restaurants_df = spark.createDataFrame(restaurants, [
"id", "name", "latitude", "longitude", "cuisine"
])
# Create polygon data (neighborhoods)
neighborhoods = [
("N001", "Manhattan", "POLYGON((-74.01 40.70, -73.97 40.70, -73.97 40.78, -74.01 40.78, -74.01 40.70))"),
("N002", "Brooklyn", "POLYGON((-73.99 40.57, -73.94 40.57, -73.94 40.70, -73.99 40.70, -73.99 40.57))"),
]
neighborhoods_df = spark.createDataFrame(neighborhoods, [
"id", "name", "geometry"
])
# Add geohash to restaurants
restaurants_with_geohash = restaurants_df \
.withColumn("geohash", geohash_encode(col("latitude"), col("longitude"), lit(8)))
# Find restaurants within 1km of a reference point
reference_lat = 40.7580
reference_lon = -73.9855
nearby_restaurants = restaurants_df \
.withColumn("distance_meters",
haversine_distance(
lit(reference_lat), lit(reference_lon),
col("latitude"), col("longitude")
)
) \
.filter(col("distance_meters") <= 1000) \
.orderBy("distance_meters")
print("Restaurants within 1km:")
nearby_restaurants.show(truncate=False)
Spatial Joins and Indexing
# Point-in-polygon using ray-casting algorithm
@udf(returnType=BooleanType())
def point_in_polygon(lat, lon, polygon_wkt):
"""Check if point is inside polygon using ray-casting."""
# Parse WKT polygon (simplified)
coords_str = polygon_wkt.replace("POLYGON((", "").replace("))", "")
coords = []
for pair in coords_str.split(","):
lng, lt = pair.strip().split()
coords.append((float(lng), float(lt)))
# Ray-casting algorithm
n = len(coords)
inside = False
j = n - 1
for i in range(n):
xi, yi = coords[i]
xj, yj = coords[j]
if ((yi > lat) != (yj > lat)) and (lon < (xj - xi) * (lat - yi) / (yj - yi) + xi):
inside = not inside
j = i
return inside
# Spatial join: assign restaurants to neighborhoods
spatial_join_df = restaurants_df.crossJoin(neighborhoods_df) \
.filter(point_in_polygon(col("latitude"), col("longitude"), col("geometry"))) \
.select(
restaurants_df.id,
restaurants_df.name,
neighborhoods_df.name.alias("neighborhood"),
restaurants_df.cuisine
)
print("Restaurants by neighborhood:")
spatial_join_df.show(truncate=False)
# KNN query: find 3 nearest restaurants to a point
from pyspark.sql.window import Window
knn_df = restaurants_df \
.withColumn("distance",
haversine_distance(
lit(40.7580), lit(-73.9855),
col("latitude"), col("longitude")
)
) \
.withColumn("rank", rank().over(Window.orderBy("distance"))) \
.filter(col("rank") <= 3)
print("3 nearest restaurants:")
knn_df.show(truncate=False)
H3 Index Operations
# H3 index functions (simplified implementation)
@udf(returnType=LongType())
def h3_latlng_to_cell(lat, lng, resolution):
"""Convert lat/lng to H3 cell index."""
# This is a simplified version - use h3 library in production
# H3 uses icosahedron projection and hexagonal tessellation
return int(f"8{resolution}{abs(int(lat*1000))}{abs(int(lng*1000))}")
@udf(returnType=ArrayType(DoubleType()))
def h3_cell_to_boundary(cell_index):
"""Get polygon boundary of H3 cell."""
# Simplified - returns approximate boundary
lat = (cell_index % 10000) / 1000.0
lng = ((cell_index // 10000) % 10000) / 1000.0
return [lat - 0.001, lng - 0.001, lat + 0.001, lng + 0.001]
# Add H3 index at different resolutions
h3_restaurants = restaurants_df \
.withColumn("h3_res8", h3_latlng_to_cell(col("latitude"), col("longitude"), lit(8))) \
.withColumn("h3_res10", h3_latlng_to_cell(col("latitude"), col("longitude"), lit(10))) \
.withColumn("h3_res12", h3_latlng_to_cell(col("latitude"), col("longitude"), lit(12)))
# Aggregate by H3 cell (spatial clustering)
h3_clusters = h3_restaurants \
.groupBy("h3_res8") \
.agg(
count("*").alias("restaurant_count"),
avg("latitude").alias("center_lat"),
avg("longitude").alias("center_lng"),
collect_list("cuisine").alias("cuisines")
)
print("H3 clusters (resolution 8):")
h3_clusters.show(truncate=False)
# Find neighboring cells
@udf(returnType=ArrayType(LongType()))
def h3_grid_disk(cell_index, k):
"""Get cells within k steps of given cell."""
# Simplified - returns grid disk neighbors
return [cell_index + i for i in range(-k, k+1) if i != 0]
# Find all restaurants within 2 H3 cells of a reference
reference_cell = h3_latlng_to_cell(lit(40.7580), lit(-73.9855), lit(8))
neighbor_cells = h3_grid_disk(reference_cell, lit(2))
nearby_h3 = h3_restaurants.filter(
col("h3_res8").isin(neighbor_cells)
)
Performance Metrics
| Operation | 1K Points | 100K Points | 10M Points | 100M Points |
|---|---|---|---|---|
| Haversine Distance (all pairs) | < 1 sec | 5-10 sec | 5-10 min | 5-10 hours |
| Point-in-Polygon (100 polys) | < 1 sec | 2-5 sec | 30-60 sec | 5-10 min |
| Spatial Join (broadcast) | < 1 sec | 1-3 sec | 10-20 sec | 2-5 min |
| KNN Query (k=10) | < 1 sec | 1-2 sec | 5-10 sec | 1-2 min |
| H3 Index Build | < 1 sec | 1-2 sec | 10-20 sec | 2-5 min |
| Geohash Encode | < 1 sec | 1-2 sec | 10-20 sec | 2-5 min |
| Buffer Operation | < 1 sec | 3-8 sec | 60-120 sec | 10-20 min |
| Convex Hull | < 1 sec | 1-3 sec | 10-20 sec | 2-5 min |
| Trajectory Segmentation | < 1 sec | 2-5 sec | 20-40 sec | 3-8 min |
| Spatial Clustering (DBSCAN) | < 1 sec | 5-15 sec | 2-5 min | 30-60 min |
Best Practices
- Use spatial indexing (H3, S2, Geohash) before spatial joins to reduce the number of exact geometry checks from O(n×m) to O(n×k) where k << m
- Broadcast small datasets when performing spatial joins—the broadcast threshold should be based on the polygon complexity, not just file size
- Convert to projected CRS (UTM) for accurate distance calculations—do not use latitude/longitude directly for Euclidean distance
- Use bounding box pre-filtering before expensive geometric operations to quickly reject non-candidate pairs
- Partition spatial data by geohash prefix to ensure nearby points land in the same partition, reducing shuffle during joins
- Cache H3 cell assignments when performing multiple spatial queries on the same dataset to avoid recomputing index values
- Implement spatial partitioning for large polygon datasets using R-tree or quadtree to enable efficient point-in-polygon queries
- Use
st_pointandst_containsfrom Sedona/Mosaic libraries for production spatial operations instead of custom UDFs - Monitor spatial skew when using grid-based partitioning—urban areas have much higher point density than rural areas
- Implement trajectory compression (Douglas-Peucker algorithm) to reduce the number of points in trajectories while preserving shape
- Use geohash prefix matching for proximity search—two points within 1km will share a geohash prefix of length 5-6
- Avoid cross-join for spatial operations when possible—use broadcast or indexed joins to prevent quadratic explosion
Mathematical Foundation Summary
Geospatial indexing uses hierarchical space-filling curves to map 2D coordinates to 1D indices. H3 uses a hexagonal grid with resolution levels where each level subdivides cells by factor 7, achieving cell_area ≈ 0.108 × d² km² at resolution d. The Haversine formula computes great-circle distance: d = 2r × arcsin(√(sin²(Δφ/2) + cos(φ₁)cos(φ₂)sin²(Δλ/2))). Spatial join complexity reduces from O(n×m) brute force to O(n log m) using R-tree indexing where query performance follows cost = O(log n + k) for k nearest neighbors.
See also: Graph Processing (25), JSON/XML Parsing (28), ML Feature Engineering (32)
See Also
- DataFrame Operations — DataFrame transformations for spatial data
- UDF Optimization — Custom UDFs for geospatial operations
- Advanced Aggregations — Spatial aggregation patterns
- JSON XML Parsing — Parsing geospatial data formats