Filtering large catalogs#
Large astronomical surveys contain a massive volume of data. Billion-object, multi-terabyte-sized catalogs are challenging to store and manipulate because they demand state-of-the-art hardware. Processing them is expensive, both in terms of runtime and memory consumption, and doing so on a single machine has become impractical. LSDB is a solution that enables scalable algorithm execution. It handles loading, querying, filtering, and crossmatching astronomical data (of HATS format) in a distributed environment.
In this tutorial, we will demonstrate how to:
Set up a Dask client for distributed processing
Load an object catalog with a set of desired columns
Select data from regions of the sky
Filter data by column values
Preview catalog data
[1]:
import lsdb
Creating a Dask client#
Dask is a framework that allows us to take advantage of distributed computing capabilities.
With Dask, the operations defined in a workflow (e.g. this notebook) are added to a task graph which optimizes their order of execution. The operations are not immediately computed; that’s for us to decide. As soon as we trigger the computations, Dask distributes the workload across its multiple workers, and tasks are run efficiently in parallel. Usually, the later we kick off the computations, the better.
Dask creates a client by default, if we do not instantiate one. If we do, we may set options such as:
Specify the number of workers and the memory limit for each of them.
Adjust the address for the dashboard that profiles the operations while they run (by default, it serves on port 8787).
For additional information, please refer to the official Dask documentation and our Dask cluster configuration page for LSDB-specific tips.
[2]:
from dask.distributed import Client
client = Client(n_workers=4, memory_limit="auto")
client
[2]:
Client
Client-0ff2c56b-1b8d-11f0-8497-9e5630079e2b
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
09e0cae5
Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
Total threads: 4 | Total memory: 13.09 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-fb13dd1d-777b-4521-b35e-791e9fdb7075
Comm: tcp://127.0.0.1:33877 | Workers: 4 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 4 |
Started: Just now | Total memory: 13.09 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:46303 | Total threads: 1 |
Dashboard: http://127.0.0.1:38363/status | Memory: 3.27 GiB |
Nanny: tcp://127.0.0.1:33175 | |
Local directory: /tmp/dask-scratch-space/worker-cekknm9d |
Worker: 1
Comm: tcp://127.0.0.1:34403 | Total threads: 1 |
Dashboard: http://127.0.0.1:44099/status | Memory: 3.27 GiB |
Nanny: tcp://127.0.0.1:44311 | |
Local directory: /tmp/dask-scratch-space/worker-sujfjzvr |
Worker: 2
Comm: tcp://127.0.0.1:40611 | Total threads: 1 |
Dashboard: http://127.0.0.1:45589/status | Memory: 3.27 GiB |
Nanny: tcp://127.0.0.1:43579 | |
Local directory: /tmp/dask-scratch-space/worker-ish_hfb1 |
Worker: 3
Comm: tcp://127.0.0.1:45733 | Total threads: 1 |
Dashboard: http://127.0.0.1:40785/status | Memory: 3.27 GiB |
Nanny: tcp://127.0.0.1:41311 | |
Local directory: /tmp/dask-scratch-space/worker-9eilfu1k |
Loading a catalog#
We will load a small 5 degree radius cone from the ZTF DR14
object catalog.
Catalogs represent tabular data and are internally subdivided into partitions based on their positions in the sky. When processing a catalog, each worker is expected to be able to load a single partition at a time into memory for processing. Therefore, when loading a catalog, it’s crucial to specify the subset of columns we need for our science pipeline. Failure to specify these columns results in loading the entire partition table, which not only increases the usage of worker memory, but also impacts runtime performance significantly.
[3]:
surveys_path = "https://data.lsdb.io/hats/"
[4]:
ztf_object_path = f"{surveys_path}/ztf_dr14/ztf_object"
ztf_object = lsdb.read_hats(ztf_object_path, columns=["ps1_objid", "ra", "dec", "mean_mag_r"])
ztf_object
[4]:
ps1_objid | ra | dec | mean_mag_r | |
---|---|---|---|---|
npartitions=2352 | ||||
Order: 3, Pixel: 0 | int64[pyarrow] | double[pyarrow] | double[pyarrow] | double[pyarrow] |
Order: 3, Pixel: 1 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
Order: 4, Pixel: 3070 | ... | ... | ... | ... |
Order: 4, Pixel: 3071 | ... | ... | ... | ... |
The catalog has been loaded lazily: we can see its metadata, but no actual data is there yet. We will be defining more operations in this notebook. Only when we call compute()
on the resulting catalog are operations executed; i.e., data is loaded from disk into memory for processing.
Selecting a region of the sky#
We may use 3 types of spatial filters (cone, polygon and box) to select a portion of the sky.
Filtering consists of two main steps:
A coarse stage, in which we find what pixels cover our desired region in the sky. These may overlap with the region and only be partially contained within the region boundaries. This means that some data points inside that pixel may fall outside of the region.
A fine stage, where we filter the data points from each pixel to make sure they fall within the specified region.
The fine
parameter allows us to specify whether or not we desire to run the fine stage, for each search. It brings some overhead, so if your intention is to get a rough estimate of the data points for a region, you may disable it. It is always executed by default.
catalog.box_search(..., fine=False)
catalog.cone_search(..., fine=False)
catalog.polygon_search(..., fine=False)
Throughout this notebook, we will use the Catalog’s plot_pixels
method to display the HEALPix of each resulting catalog as filters are applied.
[5]:
ztf_object.plot_pixels(plot_title="ZTF_DR14 - pixel map")
[5]:
(<Figure size 1000x500 with 2 Axes>,
<WCSAxes: title={'center': 'ZTF_DR14 - pixel map'}>)

Cone search#
A cone search is defined by center (ra, dec)
, in degrees, and radius r
, in arcseconds.
[6]:
ztf_object_cone = ztf_object.cone_search(ra=-60.3, dec=20.5, radius_arcsec=1 * 3600)
ztf_object_cone
[6]:
ps1_objid | ra | dec | mean_mag_r | |
---|---|---|---|---|
npartitions=4 | ||||
Order: 5, Pixel: 3238 | int64[pyarrow] | double[pyarrow] | double[pyarrow] | double[pyarrow] |
Order: 5, Pixel: 3239 | ... | ... | ... | ... |
Order: 5, Pixel: 3244 | ... | ... | ... | ... |
Order: 5, Pixel: 3245 | ... | ... | ... | ... |
[7]:
ztf_object_cone.plot_pixels(plot_title="ZTF_DR14 - cone pixel map")
[7]:
(<Figure size 1000x500 with 2 Axes>,
<WCSAxes: title={'center': 'ZTF_DR14 - cone pixel map'}>)

Polygon search#
A polygon search is defined by convex polygon with vertices [(ra1, dec1), (ra2, dec2)...]
, in degrees.
[8]:
vertices = [(-60.5, 15.1), (-62.5, 18.5), (-65.2, 15.3), (-64.2, 12.1)]
ztf_object_polygon = ztf_object.polygon_search(vertices)
ztf_object_polygon
[8]:
ps1_objid | ra | dec | mean_mag_r | |
---|---|---|---|---|
npartitions=14 | ||||
Order: 5, Pixel: 3210 | int64[pyarrow] | double[pyarrow] | double[pyarrow] | double[pyarrow] |
Order: 5, Pixel: 3232 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
Order: 6, Pixel: 30686 | ... | ... | ... | ... |
Order: 6, Pixel: 30687 | ... | ... | ... | ... |
[9]:
ztf_object_polygon.plot_pixels(plot_title="ZTF_DR14 - polygon pixel map")
[9]:
(<Figure size 1000x500 with 2 Axes>,
<WCSAxes: title={'center': 'ZTF_DR14 - polygon pixel map'}>)

Box search#
A box search can be defined by right ascension and declination bands [(ra1, ra2), (dec1, dec2)]
.
[10]:
ztf_object_box = ztf_object.box_search(ra=[-65, -60], dec=[12, 15])
ztf_object_box
[10]:
ps1_objid | ra | dec | mean_mag_r | |
---|---|---|---|---|
npartitions=13 | ||||
Order: 5, Pixel: 3208 | int64[pyarrow] | double[pyarrow] | double[pyarrow] | double[pyarrow] |
Order: 5, Pixel: 3210 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
Order: 5, Pixel: 7670 | ... | ... | ... | ... |
Order: 6, Pixel: 30684 | ... | ... | ... | ... |
[11]:
ztf_object_box.plot_pixels(plot_title="ZTF_DR14 - box pixel map")
[11]:
(<Figure size 1000x500 with 2 Axes>,
<WCSAxes: title={'center': 'ZTF_DR14 - box pixel map'}>)

We can stack a several number of filters, which are applied in sequence. For example, catalog.box_search().polygon_search()
should result in a perfectly valid HATS catalog containing the objects that match both filters.
Selecting data by column query#
We may also filter by columns via query()
.
The expression inside () follows the same syntax accepted by Pandas .query()
, which supports a subset of Python expressions for filtering DataFrames.
The column names that are not valid Python variables names should be wrapped in backticks, and any variable values can be injected using f-strings. The use of ‘@’ to reference variables is not supported.
More information about Pandas query strings is available here.
[12]:
ztf_object.query("mean_mag_i < 16")
[12]:
ps1_objid | ra | dec | mean_mag_r | |
---|---|---|---|---|
npartitions=2352 | ||||
Order: 3, Pixel: 0 | int64[pyarrow] | double[pyarrow] | double[pyarrow] | double[pyarrow] |
Order: 3, Pixel: 1 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
Order: 4, Pixel: 3070 | ... | ... | ... | ... |
Order: 4, Pixel: 3071 | ... | ... | ... | ... |
Previewing part of the data#
Computing an entire catalog requires loading all of its resulting data into memory, which is expensive and may lead to out-of-memory issues.
Often, our goal is to have a peek at a slice of data to make sure the workflow output is reasonable (e.g., to assess if some new created columns are present and their values have been properly processed). head()
is a pandas-like method which allows us to preview part of the data for this purpose. It iterates over the existing catalog partitions, in sequence, and finds up to n
number of rows.
Notice that this method implicitly calls compute()
.
[13]:
ztf_object_cone.head()
[13]:
ps1_objid | ra | dec | mean_mag_r | |
---|---|---|---|---|
_healpix_29 | ||||
911529154909966282 | 131432999494754032 | 299.949481 | 19.527901 | 19.694005 |
911529155126223541 | 131432999482554346 | 299.948273 | 19.52819 | 20.303564 |
911529155242127522 | 131432999472704717 | 299.94726 | 19.528454 | 19.030125 |
911529155485125949 | 131432999480965958 | 299.948077 | 19.529494 | 20.368013 |
911529187674910393 | 131442999840264659 | 299.984096 | 19.536824 | 20.365271 |
5 rows × 4 columns
By default, the first 5 rows of data will be shown, but we can specify a higher number if we need.
[14]:
ztf_object_cone.head(n=10)
[14]:
ps1_objid | ra | dec | mean_mag_r | |
---|---|---|---|---|
_healpix_29 | ||||
911529154909966282 | 131432999494754032 | 299.949481 | 19.527901 | 19.694005 |
911529155126223541 | 131432999482554346 | 299.948273 | 19.52819 | 20.303564 |
911529155242127522 | 131432999472704717 | 299.94726 | 19.528454 | 19.030125 |
911529155485125949 | 131432999480965958 | 299.948077 | 19.529494 | 20.368013 |
911529187674910393 | 131442999840264659 | 299.984096 | 19.536824 | 20.365271 |
911529188229036449 | 131442999792923350 | 299.979209 | 19.535608 | 19.88469 |
911529188263890495 | 131442999803743832 | 299.980368 | 19.536026 | 19.775237 |
911529188455674744 | 131442999770212613 | 299.977055 | 19.535172 | 20.007389 |
911529188571457488 | 131442999775374201 | 299.977557 | 19.536359 | 20.096558 |
911529188843237482 | 131442999790335818 | 299.97904 | 19.537692 | 20.824773 |
10 rows × 4 columns
Closing the Dask client#
[15]:
client.close()