Filtering large catalogs#

Large astronomical surveys contain a massive volume of data. Billion-object, multi-terabyte-sized catalogs are challenging to store and manipulate because they demand state-of-the-art hardware. Processing them is expensive, both in terms of runtime and memory consumption, and doing so on a single machine has become impractical. LSDB is a solution that enables scalable algorithm execution. It handles loading, querying, filtering, and crossmatching astronomical data (of HATS format) in a distributed environment.

In this tutorial, we will demonstrate how to:

  1. Set up a Dask client for distributed processing

  2. Load an object catalog with a set of desired columns

  3. Select data from regions of the sky

  4. Filter data by column values

  5. Preview catalog data

[1]:
import lsdb

Creating a Dask client#

Dask is a framework that allows us to take advantage of distributed computing capabilities.

With Dask, the operations defined in a workflow (e.g. this notebook) are added to a task graph which optimizes their order of execution. The operations are not immediately computed; that’s for us to decide. As soon as we trigger the computations, Dask distributes the workload across its multiple workers, and tasks are run efficiently in parallel. Usually, the later we kick off the computations, the better.

Dask creates a client by default, if we do not instantiate one. If we do, we may set options such as:

  • Specify the number of workers and the memory limit for each of them.

  • Adjust the address for the dashboard that profiles the operations while they run (by default, it serves on port 8787).

For additional information, please refer to the official Dask documentation and our Dask cluster configuration page for LSDB-specific tips.

[2]:
from dask.distributed import Client

client = Client(n_workers=4, memory_limit="auto")
client
[2]:

Client

Client-0ff2c56b-1b8d-11f0-8497-9e5630079e2b

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

Loading a catalog#

We will load a small 5 degree radius cone from the ZTF DR14 object catalog.

Catalogs represent tabular data and are internally subdivided into partitions based on their positions in the sky. When processing a catalog, each worker is expected to be able to load a single partition at a time into memory for processing. Therefore, when loading a catalog, it’s crucial to specify the subset of columns we need for our science pipeline. Failure to specify these columns results in loading the entire partition table, which not only increases the usage of worker memory, but also impacts runtime performance significantly.

[3]:
surveys_path = "https://data.lsdb.io/hats/"
[4]:
ztf_object_path = f"{surveys_path}/ztf_dr14/ztf_object"
ztf_object = lsdb.read_hats(ztf_object_path, columns=["ps1_objid", "ra", "dec", "mean_mag_r"])
ztf_object
[4]:
lsdb Catalog ztf_dr14:
ps1_objid ra dec mean_mag_r
npartitions=2352
Order: 3, Pixel: 0 int64[pyarrow] double[pyarrow] double[pyarrow] double[pyarrow]
Order: 3, Pixel: 1 ... ... ... ...
... ... ... ... ...
Order: 4, Pixel: 3070 ... ... ... ...
Order: 4, Pixel: 3071 ... ... ... ...
The catalog has been loaded lazily, meaning no data has been read, only the catalog schema

The catalog has been loaded lazily: we can see its metadata, but no actual data is there yet. We will be defining more operations in this notebook. Only when we call compute() on the resulting catalog are operations executed; i.e., data is loaded from disk into memory for processing.

Selecting a region of the sky#

We may use 3 types of spatial filters (cone, polygon and box) to select a portion of the sky.

Filtering consists of two main steps:

  • A coarse stage, in which we find what pixels cover our desired region in the sky. These may overlap with the region and only be partially contained within the region boundaries. This means that some data points inside that pixel may fall outside of the region.

  • A fine stage, where we filter the data points from each pixel to make sure they fall within the specified region.

The fine parameter allows us to specify whether or not we desire to run the fine stage, for each search. It brings some overhead, so if your intention is to get a rough estimate of the data points for a region, you may disable it. It is always executed by default.

catalog.box_search(..., fine=False)
catalog.cone_search(..., fine=False)
catalog.polygon_search(..., fine=False)

Throughout this notebook, we will use the Catalog’s plot_pixels method to display the HEALPix of each resulting catalog as filters are applied.

[5]:
ztf_object.plot_pixels(plot_title="ZTF_DR14 - pixel map")
[5]:
(<Figure size 1000x500 with 2 Axes>,
 <WCSAxes: title={'center': 'ZTF_DR14 - pixel map'}>)
../_images/tutorials_filtering_large_catalogs_10_1.png

Selecting data by column query#

We may also filter by columns via query().

The expression inside () follows the same syntax accepted by Pandas .query(), which supports a subset of Python expressions for filtering DataFrames.

The column names that are not valid Python variables names should be wrapped in backticks, and any variable values can be injected using f-strings. The use of ‘@’ to reference variables is not supported.

More information about Pandas query strings is available here.

[12]:
ztf_object.query("mean_mag_i < 16")
[12]:
lsdb Catalog ztf_dr14:
ps1_objid ra dec mean_mag_r
npartitions=2352
Order: 3, Pixel: 0 int64[pyarrow] double[pyarrow] double[pyarrow] double[pyarrow]
Order: 3, Pixel: 1 ... ... ... ...
... ... ... ... ...
Order: 4, Pixel: 3070 ... ... ... ...
Order: 4, Pixel: 3071 ... ... ... ...
The catalog has been loaded lazily, meaning no data has been read, only the catalog schema

Previewing part of the data#

Computing an entire catalog requires loading all of its resulting data into memory, which is expensive and may lead to out-of-memory issues.

Often, our goal is to have a peek at a slice of data to make sure the workflow output is reasonable (e.g., to assess if some new created columns are present and their values have been properly processed). head() is a pandas-like method which allows us to preview part of the data for this purpose. It iterates over the existing catalog partitions, in sequence, and finds up to n number of rows.

Notice that this method implicitly calls compute().

[13]:
ztf_object_cone.head()
[13]:
ps1_objid ra dec mean_mag_r
_healpix_29
911529154909966282 131432999494754032 299.949481 19.527901 19.694005
911529155126223541 131432999482554346 299.948273 19.52819 20.303564
911529155242127522 131432999472704717 299.94726 19.528454 19.030125
911529155485125949 131432999480965958 299.948077 19.529494 20.368013
911529187674910393 131442999840264659 299.984096 19.536824 20.365271

5 rows × 4 columns

By default, the first 5 rows of data will be shown, but we can specify a higher number if we need.

[14]:
ztf_object_cone.head(n=10)
[14]:
ps1_objid ra dec mean_mag_r
_healpix_29
911529154909966282 131432999494754032 299.949481 19.527901 19.694005
911529155126223541 131432999482554346 299.948273 19.52819 20.303564
911529155242127522 131432999472704717 299.94726 19.528454 19.030125
911529155485125949 131432999480965958 299.948077 19.529494 20.368013
911529187674910393 131442999840264659 299.984096 19.536824 20.365271
911529188229036449 131442999792923350 299.979209 19.535608 19.88469
911529188263890495 131442999803743832 299.980368 19.536026 19.775237
911529188455674744 131442999770212613 299.977055 19.535172 20.007389
911529188571457488 131442999775374201 299.977557 19.536359 20.096558
911529188843237482 131442999790335818 299.97904 19.537692 20.824773

10 rows × 4 columns

Closing the Dask client#

[15]:
client.close()