Parallel Spatial Joins with Dask-GeoPandas

A spatial join between two out-of-memory datasets is the operation people reach for Dask-GeoPandas to do — and the one that disappoints if you skip the spatial shuffle. This guide runs a points-in-polygons join across partitioned data correctly and in parallel. It is for analysts joining millions of features that won't fit in a single GeoDataFrame. It sits under Scaling with Dask-GeoPandas in Spatial Analysis & Advanced Query Techniques.

Why This Approach / What Goes Wrong

dask_geopandas.sjoin mirrors geopandas.sjoin, but parallel correctness depends on partition alignment. If the two inputs are partitioned arbitrarily, every partition of the left must be tested against every partition of the right — an all-pairs comparison that is slower than just doing it on one machine. spatial_shuffle repartitions both inputs along a Hilbert space-filling curve so spatially close features share a partition; then each left partition only needs the right partition covering the same region. Forgetting the shuffle is the difference between a join that scales and one that crawls.

The other classic mistake is a CRS mismatch between the two inputs, which produces an empty or wrong result with no error.

Prerequisites

conda install -c conda-forge "dask-geopandas=0.4.*" "geopandas=0.14.*" "pyarrow=15.*"

Step-by-Step Implementation

1. Read both partitioned inputs and align their CRS.

import dask_geopandas as dgpd

# ~12M GPS pings and ~50k administrative zones, both as partitioned GeoParquet
pings = dgpd.read_parquet("gps_pings/")
zones = dgpd.read_parquet("admin_zones/")

# Joins require a shared CRS — reproject both to the same metric system
pings = pings.to_crs(epsg=25832)
zones = zones.to_crs(epsg=25832)

2. Spatially shuffle so partitions align by location.

pings = pings.spatial_shuffle(npartitions=128)
zones = zones.spatial_shuffle(npartitions=128)

3. Run the spatial join lazily.

# Assign each ping the zone that contains it
joined = dgpd.sjoin(pings, zones, how="inner", predicate="within")

4. Aggregate and materialize. Stream the result to disk rather than into RAM.

counts = (
    joined.groupby("zone_id")
    .size()
    .rename("ping_count")
    .reset_index()
)
counts.to_parquet("ping_counts/", write_index=False)

5. (Optional) Use a distributed client for a real cluster.

# from dask.distributed import Client
# client = Client(n_workers=8, threads_per_worker=2, memory_limit="4GB")
# ...the same code above now runs across workers...

Verification

Confirm the join matched features and the totals reconcile.

import dask_geopandas as dgpd

result = dgpd.read_parquet("ping_counts/").compute()
print("Zones with pings:", len(result))         # Zones with pings: 47213
print("Total joined pings:", int(result["ping_count"].sum()))  # Total joined pings: 11984502

# Joined total should not exceed the input ping count (within = each ping once)
total_pings = dgpd.read_parquet("gps_pings/").shape[0].compute()
assert result["ping_count"].sum() <= total_pings

A joined total of zero almost always means the two inputs were in different CRSs.

Edge Cases & Debugging