🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Geospatial Data Processing in PySpark

🟢 Free Lesson

Advertisement

🌍 Geospatial Data Processing in PySpark

Spatial Indexing & Join Strategies

Spatial Indexing SystemsH3HexagonalS2Spherical cellsGeohashRectangular gridEach system maps lat/lng to discrete cell IDs for efficient spatial queriesSpatial Join StrategiesNested LoopO(n×m) brute forceBroadcast IndexSmall side broadcastSort-MergeRange-based mergePartition PruneCell-based filterRecommended: H3/S2 + Partition PruningReduces join from O(n×m) to O(n+k) where k = matching cells

Architecture Diagram

Detailed Explanation

Fundamental Challenge

Spatial data has geometric properties (points, lines, polygons) that cannot be directly compared using standard relational operators.

Key Takeaway: Spatial operations require specialized indexing structures and join strategies for the two-dimensional nature of geographic data.


Coordinate Reference Systems (CRS)

SystemCodeUse Case
WGS84EPSG:4326Latitude/longitude coordinates
UTMVariousProjected coordinates in meters

Important: Converting between systems is essential for accurate distance calculations—using lat/lng directly introduces errors due to Earth's curvature.


Spatial Indexing Structures

IndexCreatorMethod
H3UberHierarchical hexagonal cells
S2GoogleSphere-to-cube projection with quadtree
GeohashSpace-filling curve (Morton order)

Benefit: Reduces spatial join complexity from O(n×m) to near-linear.


Spatial Join Strategies

StrategyWhen to UseLimitation
Broadcast JoinSmall dataset fits in memoryRequires small dataset
KD-Tree JoinMedium datasetsBuild index on each partition
Grid-based JoinUniform distributionFails with skewed data

Point-in-Polygon Operations

Most common spatial query—determines if a point lies within a polygon.

  • Uses ray-casting or winding number algorithm
  • O(n) complexity where n = number of polygon vertices
  • Spatial indexing reduces search space via bounding box pre-filtering

Trajectory Analysis

Processes sequences of location points to extract movement patterns.

Components:

  1. Temporal ordering — Sort by timestamp
  2. Stay-point detection — Identify stationary locations (distance + time thresholds)
  3. Trajectory segmentation — Divide movement into meaningful trips

Stay Point Definition: Entity remains within distance threshold for at least minimum duration.

Mathematical Foundations

Definition: Haversine Distance

The great-circle distance between two points (ϕ1,λ1)(\phi_1, \lambda_1) and (ϕ2,λ2)(\phi_2, \lambda_2) on a sphere of radius RR is:

d=2Rarcsin(sin2(ϕ2ϕ12)+cosϕ1cosϕ2sin2(λ2λ12))d = 2R \cdot \arcsin\left(\sqrt{\sin^2\left(\frac{\phi_2 - \phi_1}{2}\right) + \cos\phi_1 \cos\phi_2 \sin^2\left(\frac{\lambda_2 - \lambda_1}{2}\right)}\right)

Spatial Index Resolution

For H3 index resolution rr (0-15), the average hexagon area is:

ArAearth2×3r+10×3r1+20×3r2+A_r \approx \frac{A_{\text{earth}}}{2 \times 3^{r} + 10 \times 3^{r-1} + 20 \times 3^{r-2} + \cdots}

Resolution 8 gives \sim464m edge length; resolution 9 gives \sim174m.

Spatial Join Complexity

Without indexing, a spatial join between datasets of sizes mm and nn has complexity O(m×n)O(m \times n). With grid-based spatial indexing dividing the space into kk cells:

O(m×nk+(m+n)logk)O\left(\frac{m \times n}{k} + (m + n) \cdot \log k\right)

when data is uniformly distributed across cells.

Convex Hull Area

For polygon PP with vertices (x1,y1),,(xn,yn)(x_1, y_1), \ldots, (x_n, y_n), the shoelace formula gives area:

A=12i=1n(xiyi+1xi+1yi)A = \frac{1}{2} \left| \sum_{i=1}^{n} (x_i y_{i+1} - x_{i+1} y_i) \right|

Nearest Neighbor Search

For kk-NN with spatial index, the search prunes cells whose minimum bounding rectangle distance exceeds the current kk-th nearest distance dkd_k:

Prune(C)    minpC.MBRdist(q,p)>dk\text{Prune}(C) \iff \min_{p \in C.\text{MBR}} \text{dist}(q, p) > d_k

Key Insight

Spark's ST_* functions from Sedona compute exact geometry operations, while H3/S2 provide approximate but indexable representations. Use exact geometry for correctness-critical joins, approximate indexing for exploratory analysis at scale.

Summary

Geospatial processing on Spark combines exact geometric operations (Haversine, polygon intersection) with hierarchical spatial indices (H3, S2). Spatial indexing reduces join complexity from O(mn)O(mn) to near-linear. Coordinate reference system transformations must precede any distance calculations.

Key Concepts Table

ConceptDescriptionUse Case
H3 IndexHexagonal hierarchical spatial indexLocation analytics, ride-sharing
S2 CellQuadtree cell on sphereMaps, street view, global coverage
GeohashPrefix-based 1D encodingProximity search, caching
Bounding BoxMinimum enclosing rectangleIndex filtering, quick rejection
Point-in-PolygonContainment test for pointsStore finder, zone assignment
KNN QueryK nearest neighborsNearest facility, POI search
BufferPoints within distanceService area, proximity analysis
Convex HullSmallest convex polygonArea of coverage, boundary
Spatial JoinJoin based on geometric predicatesRegion-based aggregation
TrajectoryOrdered sequence of locationsMovement analysis, route optimization

Code Examples

Coordinate Operations and Distance Calculations

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import math

spark = SparkSession.builder \
    .appName("GeospatialData") \
    .getOrCreate()

# Define UDFs for spatial calculations
@udf(returnType=DoubleType())
def haversine_distance(lat1, lon1, lat2, lon2):
    """Calculate great-circle distance between two points using Haversine formula."""
    R = 6371000  # Earth's radius in meters
    
    lat1_rad = math.radians(lat1)
    lat2_rad = math.radians(lat2)
    delta_lat = math.radians(lat2 - lat1)
    delta_lon = math.radians(lon2 - lon1)
    
    a = (math.sin(delta_lat / 2) ** 2 +
         math.cos(lat1_rad) * math.cos(lat2_rad) *
         math.sin(delta_lon / 2) ** 2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    
    return R * c

@udf(returnType=StringType())
def geohash_encode(lat, lon, precision=12):
    """Encode coordinates to geohash."""
    BASE32 = '0123456789bcdefghjkmnpqrstuvwxyz'
    
    lat_range = [-90.0, 90.0]
    lon_range = [-180.0, 180.0]
    geohash = []
    bit = 0
    ch = 0
    is_lon = True
    
    while len(geohash) < precision:
        if is_lon:
            mid = (lon_range[0] + lon_range[1]) / 2
            if lon >= mid:
                ch |= (1 << (4 - bit))
                lon_range[0] = mid
            else:
                lon_range[1] = mid
        else:
            mid = (lat_range[0] + lat_range[1]) / 2
            if lat >= mid:
                ch |= (1 << (4 - bit))
                lat_range[0] = mid
            else:
                lat_range[1] = mid
        
        is_lon = not is_lon
        bit += 1
        
        if bit == 5:
            geohash.append(BASE32[ch])
            bit = 0
            ch = 0
    
    return ''.join(geohash)

# Create point data (restaurants)
restaurants = [
    ("R001", "Pizza Palace", 40.7128, -74.0060, "Italian"),
    ("R002", "Burger Barn", 40.7580, -73.9855, "American"),
    ("R003", "Sushi Spot", 40.7484, -73.9857, "Japanese"),
    ("R004", "Taco Town", 40.7614, -73.9776, "Mexican"),
    ("R005", "Noodle Nest", 40.7831, -73.9712, "Chinese"),
]
restaurants_df = spark.createDataFrame(restaurants, [
    "id", "name", "latitude", "longitude", "cuisine"
])

# Create polygon data (neighborhoods)
neighborhoods = [
    ("N001", "Manhattan", "POLYGON((-74.01 40.70, -73.97 40.70, -73.97 40.78, -74.01 40.78, -74.01 40.70))"),
    ("N002", "Brooklyn", "POLYGON((-73.99 40.57, -73.94 40.57, -73.94 40.70, -73.99 40.70, -73.99 40.57))"),
]
neighborhoods_df = spark.createDataFrame(neighborhoods, [
    "id", "name", "geometry"
])

# Add geohash to restaurants
restaurants_with_geohash = restaurants_df \
    .withColumn("geohash", geohash_encode(col("latitude"), col("longitude"), lit(8)))

# Find restaurants within 1km of a reference point
reference_lat = 40.7580
reference_lon = -73.9855

nearby_restaurants = restaurants_df \
    .withColumn("distance_meters", 
        haversine_distance(
            lit(reference_lat), lit(reference_lon),
            col("latitude"), col("longitude")
        )
    ) \
    .filter(col("distance_meters") <= 1000) \
    .orderBy("distance_meters")

print("Restaurants within 1km:")
nearby_restaurants.show(truncate=False)

Spatial Joins and Indexing

# Point-in-polygon using ray-casting algorithm
@udf(returnType=BooleanType())
def point_in_polygon(lat, lon, polygon_wkt):
    """Check if point is inside polygon using ray-casting."""
    # Parse WKT polygon (simplified)
    coords_str = polygon_wkt.replace("POLYGON((", "").replace("))", "")
    coords = []
    for pair in coords_str.split(","):
        lng, lt = pair.strip().split()
        coords.append((float(lng), float(lt)))
    
    # Ray-casting algorithm
    n = len(coords)
    inside = False
    j = n - 1
    
    for i in range(n):
        xi, yi = coords[i]
        xj, yj = coords[j]
        
        if ((yi > lat) != (yj > lat)) and (lon < (xj - xi) * (lat - yi) / (yj - yi) + xi):
            inside = not inside
        j = i
    
    return inside

# Spatial join: assign restaurants to neighborhoods
spatial_join_df = restaurants_df.crossJoin(neighborhoods_df) \
    .filter(point_in_polygon(col("latitude"), col("longitude"), col("geometry"))) \
    .select(
        restaurants_df.id,
        restaurants_df.name,
        neighborhoods_df.name.alias("neighborhood"),
        restaurants_df.cuisine
    )

print("Restaurants by neighborhood:")
spatial_join_df.show(truncate=False)

# KNN query: find 3 nearest restaurants to a point
from pyspark.sql.window import Window

knn_df = restaurants_df \
    .withColumn("distance", 
        haversine_distance(
            lit(40.7580), lit(-73.9855),
            col("latitude"), col("longitude")
        )
    ) \
    .withColumn("rank", rank().over(Window.orderBy("distance"))) \
    .filter(col("rank") <= 3)

print("3 nearest restaurants:")
knn_df.show(truncate=False)

H3 Index Operations

# H3 index functions (simplified implementation)
@udf(returnType=LongType())
def h3_latlng_to_cell(lat, lng, resolution):
    """Convert lat/lng to H3 cell index."""
    # This is a simplified version - use h3 library in production
    # H3 uses icosahedron projection and hexagonal tessellation
    return int(f"8{resolution}{abs(int(lat*1000))}{abs(int(lng*1000))}")

@udf(returnType=ArrayType(DoubleType()))
def h3_cell_to_boundary(cell_index):
    """Get polygon boundary of H3 cell."""
    # Simplified - returns approximate boundary
    lat = (cell_index % 10000) / 1000.0
    lng = ((cell_index // 10000) % 10000) / 1000.0
    return [lat - 0.001, lng - 0.001, lat + 0.001, lng + 0.001]

# Add H3 index at different resolutions
h3_restaurants = restaurants_df \
    .withColumn("h3_res8", h3_latlng_to_cell(col("latitude"), col("longitude"), lit(8))) \
    .withColumn("h3_res10", h3_latlng_to_cell(col("latitude"), col("longitude"), lit(10))) \
    .withColumn("h3_res12", h3_latlng_to_cell(col("latitude"), col("longitude"), lit(12)))

# Aggregate by H3 cell (spatial clustering)
h3_clusters = h3_restaurants \
    .groupBy("h3_res8") \
    .agg(
        count("*").alias("restaurant_count"),
        avg("latitude").alias("center_lat"),
        avg("longitude").alias("center_lng"),
        collect_list("cuisine").alias("cuisines")
    )

print("H3 clusters (resolution 8):")
h3_clusters.show(truncate=False)

# Find neighboring cells
@udf(returnType=ArrayType(LongType()))
def h3_grid_disk(cell_index, k):
    """Get cells within k steps of given cell."""
    # Simplified - returns grid disk neighbors
    return [cell_index + i for i in range(-k, k+1) if i != 0]

# Find all restaurants within 2 H3 cells of a reference
reference_cell = h3_latlng_to_cell(lit(40.7580), lit(-73.9855), lit(8))
neighbor_cells = h3_grid_disk(reference_cell, lit(2))

nearby_h3 = h3_restaurants.filter(
    col("h3_res8").isin(neighbor_cells)
)

Performance Metrics

Operation1K Points100K Points10M Points100M Points
Haversine Distance (all pairs)&lt; 1 sec5-10 sec5-10 min5-10 hours
Point-in-Polygon (100 polys)&lt; 1 sec2-5 sec30-60 sec5-10 min
Spatial Join (broadcast)&lt; 1 sec1-3 sec10-20 sec2-5 min
KNN Query (k=10)&lt; 1 sec1-2 sec5-10 sec1-2 min
H3 Index Build&lt; 1 sec1-2 sec10-20 sec2-5 min
Geohash Encode&lt; 1 sec1-2 sec10-20 sec2-5 min
Buffer Operation&lt; 1 sec3-8 sec60-120 sec10-20 min
Convex Hull&lt; 1 sec1-3 sec10-20 sec2-5 min
Trajectory Segmentation&lt; 1 sec2-5 sec20-40 sec3-8 min
Spatial Clustering (DBSCAN)&lt; 1 sec5-15 sec2-5 min30-60 min

Best Practices

  1. Use spatial indexing (H3, S2, Geohash) before spatial joins to reduce the number of exact geometry checks from O(n×m) to O(n×k) where k << m
  2. Broadcast small datasets when performing spatial joins—the broadcast threshold should be based on the polygon complexity, not just file size
  3. Convert to projected CRS (UTM) for accurate distance calculations—do not use latitude/longitude directly for Euclidean distance
  4. Use bounding box pre-filtering before expensive geometric operations to quickly reject non-candidate pairs
  5. Partition spatial data by geohash prefix to ensure nearby points land in the same partition, reducing shuffle during joins
  6. Cache H3 cell assignments when performing multiple spatial queries on the same dataset to avoid recomputing index values
  7. Implement spatial partitioning for large polygon datasets using R-tree or quadtree to enable efficient point-in-polygon queries
  8. Use st_point and st_contains from Sedona/Mosaic libraries for production spatial operations instead of custom UDFs
  9. Monitor spatial skew when using grid-based partitioning—urban areas have much higher point density than rural areas
  10. Implement trajectory compression (Douglas-Peucker algorithm) to reduce the number of points in trajectories while preserving shape
  11. Use geohash prefix matching for proximity search—two points within 1km will share a geohash prefix of length 5-6
  12. Avoid cross-join for spatial operations when possible—use broadcast or indexed joins to prevent quadratic explosion

Mathematical Foundation Summary

Geospatial indexing uses hierarchical space-filling curves to map 2D coordinates to 1D indices. H3 uses a hexagonal grid with resolution levels where each level subdivides cells by factor 7, achieving cell_area ≈ 0.108 × d² km² at resolution d. The Haversine formula computes great-circle distance: d = 2r × arcsin(√(sin²(Δφ/2) + cos(φ₁)cos(φ₂)sin²(Δλ/2))). Spatial join complexity reduces from O(n×m) brute force to O(n log m) using R-tree indexing where query performance follows cost = O(log n + k) for k nearest neighbors.

See also: Graph Processing (25), JSON/XML Parsing (28), ML Feature Engineering (32)

See Also

Premium Content

Geospatial Data Processing in PySpark

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
💼Interview Prep
📜Certificates
🤝Community Access

Already a member? Log in

Need Expert PySpark Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement