Importing catalogs to HATS format#
This notebook presents two modes of importing catalogs to HATS format:
The
lsdb.from_dataframe()
method is useful for loading smaller catalogs from a single DataFrame. The data should have fewer than 1-2 million rows, and the pandas DataFrame should occupy less than 1-2 GB in memory. If your data is larger, has a complex format, requires greater flexibility, or if you encounter performance issues with this method, consider using the next mode.The hats-import package is designed for large datasets (from 1 GB to hundreds of terabytes). This is a purpose-built map-reduce pipeline for creating HATS catalogs from various datasets. in this notebook, we use a very basic dataset and simple import options. Please see the full package documentation if you need to do anything more complicated.
[1]:
import lsdb
import pandas as pd
import tempfile
from pathlib import Path
We will be importing small_sky
from a single CSV file. (If you did not install lsdb
from source, you can find the file here and modify the paths accordingly.)
Let’s define the input and output paths:
[2]:
# Input paths
test_data_dir = Path.cwd() / ".." / ".." / "tests" / "data"
catalog_csv_path = test_data_dir / "raw" / "small_sky" / "small_sky.csv"
# Temporary directory for the intermediate/output files
tmp_dir = tempfile.TemporaryDirectory()
tmp_path = Path(tmp_dir.name)
lsdb.from_dataframe#
[3]:
%%time
# Read simple catalog from its CSV file
catalog = lsdb.from_dataframe(
pd.read_csv(catalog_csv_path),
catalog_name="from_dataframe",
catalog_type="object",
lowest_order=2,
highest_order=5,
threshold=100,
)
# Save it to disk in HATS format
catalog.to_hats(tmp_path / "from_dataframe")
CPU times: user 5.4 s, sys: 342 ms, total: 5.74 s
Wall time: 5.74 s
HATS import pipeline#
Please uncomment the next line to install the latest release of hats-import:
[4]:
#!pip install git+https://github.com/astronomy-commons/hats-import.git@main --quiet
[5]:
from dask.distributed import Client
from hats_import.catalog.arguments import ImportArguments
from hats_import.pipeline import pipeline_with_client
[6]:
%%time
args = ImportArguments(
ra_column="ra",
dec_column="dec",
lowest_healpix_order=2,
highest_healpix_order=5,
pixel_threshold=100,
file_reader="csv",
input_file_list=[catalog_csv_path],
output_artifact_name="from_import_pipeline",
output_path=tmp_path,
resume=False,
)
with Client(n_workers=1) as client:
pipeline_with_client(args, client)
CPU times: user 562 ms, sys: 43.9 ms, total: 605 ms
Wall time: 2.68 s
See Dask cluster configuration page for Dask configuration tips.
Let’s read both catalogs (from disk) and check that the two methods produced the same output:
[7]:
from_dataframe_catalog = lsdb.read_hats(tmp_path / "from_dataframe")
from_dataframe_catalog
[7]:
id | ra | dec | ra_error | dec_error | |
---|---|---|---|---|---|
npartitions=14 | |||||
Order: 3, Pixel: 707 | int64[pyarrow] | double[pyarrow] | double[pyarrow] | int64[pyarrow] | int64[pyarrow] |
Order: 2, Pixel: 177 | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | ... |
Order: 2, Pixel: 188 | ... | ... | ... | ... | ... |
Order: 5, Pixel: 12165 | ... | ... | ... | ... | ... |
[8]:
from_import_pipeline_catalog = lsdb.read_hats(tmp_path / "from_import_pipeline")
from_import_pipeline_catalog
[8]:
id | ra | dec | ra_error | dec_error | |
---|---|---|---|---|---|
npartitions=14 | |||||
Order: 3, Pixel: 707 | int64[pyarrow] | double[pyarrow] | double[pyarrow] | int64[pyarrow] | int64[pyarrow] |
Order: 2, Pixel: 177 | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | ... |
Order: 2, Pixel: 188 | ... | ... | ... | ... | ... |
Order: 5, Pixel: 12165 | ... | ... | ... | ... | ... |
[9]:
# Verify that the pixels they contain are similar
assert from_dataframe_catalog.get_healpix_pixels() == from_import_pipeline_catalog.get_healpix_pixels()
# Verify that resulting dataframes contain the same data
sorted_from_dataframe = from_dataframe_catalog.compute().sort_index()
sorted_from_import_pipeline = from_import_pipeline_catalog.compute().sort_index()
pd.testing.assert_frame_equal(sorted_from_dataframe, sorted_from_import_pipeline)
Finally, tear down the directory used for the intermediate / output files:
[10]:
tmp_dir.cleanup()