from __future__ import annotations
import re
from copy import copy
from pathlib import Path
from typing import Any
import dask
import hats as hc
import nested_pandas as npd
import numpy as np
import pyarrow.parquet as pq
from hats.catalog import CatalogType, PartitionInfo
from hats.catalog.healpix_dataset.healpix_dataset import HealpixDataset as HCHealpixDataset
from hats.io import file_io
from hats.io.skymap import write_skymap
from hats.pixel_math import HealpixPixel, spatial_index_to_healpix
from hats.pixel_math.sparse_histogram import HistogramAggregator, SparseHistogram
from tqdm.dask import TqdmCallback
from upath import UPath
from lsdb.catalog.dataset.healpix_dataset import HealpixDataset
from lsdb.io.common import new_provenance_properties, set_default_write_table_kwargs
DONE_DIR_NAME = "done"
HISTOGRAM_DIR_NAME = "hists"
@dask.delayed
def perform_write(
df: npd.NestedFrame,
hp_pixel: HealpixPixel,
base_catalog_dir: UPath,
histogram_order: int,
npix_suffix: str = ".parquet",
npix_parquet_name: str | None = None,
**kwargs,
) -> tuple[int, SparseHistogram]:
"""Writes a pandas dataframe to a single parquet file and returns the total count
for the partition as well as a count histogram at the specified order.
Parameters
----------
df : npd.NestedFrame
Dataframe to write to file
hp_pixel : HealpixPixel
HEALPix pixel of file to be written
base_catalog_dir : path-like
Location of the base catalog directory to write to
histogram_order : int
Order of the count histogram
npix_suffix : str, default '.parquet'
Extension for the parquet file (or `/` if a directory)
npix_parquet_name : str | None, default None
Name of the pixel parquet file to be used when npix_suffix=/.
By default, it will be named after the pixel with a .parquet
extension (e.g. 'Npix=10.parquet').
**kwargs
Other kwargs to pass to pq.write_table method
Returns
-------
tuple[int, SparseHistogram]
The total number of points on the partition and the sparse count histogram
at the specified order.
"""
if len(df) == 0:
return 0, SparseHistogram([], [], histogram_order)
pixel_path = hc.io.paths.new_pixel_catalog_file(
base_catalog_dir,
hp_pixel,
create_dirs=True,
npix_suffix=npix_suffix,
npix_parquet_name=npix_parquet_name,
)
df.to_parquet(pixel_path.path, filesystem=pixel_path.fs, **kwargs)
histogram = calculate_histogram(df, histogram_order)
write_histogram(histogram, base_catalog_dir, hp_pixel)
write_done_pixel(base_catalog_dir, hp_pixel)
return len(df), histogram
def calculate_histogram(df: npd.NestedFrame, histogram_order: int) -> SparseHistogram:
"""Splits a partition into pixels at a specified order and computes
the sparse histogram with the respective counts.
Parameters
----------
df : npd.NestedFrame
Partition data frame
histogram_order : int
Order of the count histogram
Returns
-------
SparseHistogram
The sparse count histogram for the partition, at the specified order.
"""
order_pixels = spatial_index_to_healpix(df.index.to_numpy(), target_order=histogram_order)
gb = df.groupby(order_pixels, sort=False).apply(len)
indexes, counts_at_indexes = gb.index.to_numpy(), gb.to_numpy(na_value=0)
return SparseHistogram(indexes, counts_at_indexes, histogram_order)
def _read_pixels_from_files(path: UPath, glob_pattern: str, regex_pattern: str) -> list[HealpixPixel]:
"""Helper function to read HEALPix pixels from files in a directory from a glob pattern and regex pattern.
Parameters
----------
path : path-like
Location of the directory to read from
glob_pattern : str
Glob pattern to find relevant files in the directory
regex_pattern : str
Regex pattern with two capture groups for order and pixel, to extract the HEALPix pixels from the
file names
Returns
-------
list[HealpixPixel]
List of HEALPix pixels extracted from the file names in the directory that match the glob pattern.
"""
file_pattern = re.compile(regex_pattern)
def get_matches(path: UPath):
match = file_pattern.match(path.name)
if match is None:
raise ValueError(f"File {path} does not match the expected pattern {regex_pattern}")
return match.group(1, 2)
pixel_tuples = [get_matches(path) for path in (path.glob(glob_pattern))]
return [HealpixPixel(int(match[0]), int(match[1])) for match in pixel_tuples]
def write_histogram(histogram: SparseHistogram, base_catalog_path: UPath, pixel: HealpixPixel):
"""Writes the sparse histogram for a partition to a file in the respective pixel directory.
Parameters
----------
histogram : SparseHistogram
The sparse count histogram for the partition, at the specified order.
base_catalog_path : path-like
Location of the base catalog directory to write to
pixel : HealpixPixel
HEALPix pixel of file to be written
"""
histogram_path = base_catalog_path / HISTOGRAM_DIR_NAME
histogram_path.mkdir(exist_ok=True)
hist_file = histogram_path / f"{pixel.order}_{pixel.pixel}_histogram.npz"
histogram.to_file(hist_file)
def read_histograms(base_catalog_path: UPath):
"""Reads the sparse histograms for all partitions from the respective files in the histogram directory.
Parameters
----------
base_catalog_path : path-like
Location of the base catalog directory to read from
Returns
-------
tuple[list[HealpixPixel], list[SparseHistogram], list[int]]
A tuple with the list of pixels for which histograms were read, the list of respective
sparse histograms, and the list of total counts for each partition
(i.e. the sum of the histogram counts).
"""
histogram_path = base_catalog_path / HISTOGRAM_DIR_NAME
pixels = _read_pixels_from_files(histogram_path, "*_histogram.npz", r"(\d+)_(\d+)_histogram.npz")
histograms = [
SparseHistogram.from_file(histogram_path / f"{pixel.order}_{pixel.pixel}_histogram.npz")
for pixel in pixels
]
lens = [sum(hist.counts) for hist in histograms]
return pixels, histograms, lens
def remove_histogram_files(base_catalog_path: UPath):
"""Removes the histogram files from the histogram directory.
Parameters
----------
base_catalog_path : path-like
Location of the base catalog directory to remove histogram files from
"""
histogram_path = base_catalog_path / HISTOGRAM_DIR_NAME
file_io.remove_directory(histogram_path, ignore_errors=True)
def write_done_pixel(base_catalog_path: UPath, pixel: HealpixPixel):
"""Writes an empty file in the respective pixel directory to indicate that the partition has been written.
Parameters
----------
base_catalog_path : path-like
Location of the base catalog directory to write to
pixel : HealpixPixel
HEALPix pixel of file to be written
"""
done_path = base_catalog_path / DONE_DIR_NAME
done_path.mkdir(exist_ok=True)
done_file = done_path / f"{pixel.order}_{pixel.pixel}_done"
done_file.touch()
def read_done_pixels(base_catalog_path: UPath):
"""Reads the done files in the done directory to get the list of pixels that partitions have been written.
Parameters
----------
base_catalog_path : path-like
Location of the base catalog directory to read from
Returns
-------
list[HealpixPixel]
List of HEALPix pixels for which done files were found in the done directory, indicating
that the respective partitions have been written.
"""
done_path = base_catalog_path / DONE_DIR_NAME
return _read_pixels_from_files(done_path, "*_done", r"(\d+)_(\d+)_done")
def remove_done_files(base_catalog_path: UPath):
"""Removes the done files from the done directory.
Parameters
----------
base_catalog_path : path-like
Location of the base catalog directory to remove done files from
"""
done_path = base_catalog_path / DONE_DIR_NAME
file_io.remove_directory(done_path, ignore_errors=True)
# pylint: disable=protected-access,too-many-locals
[docs]
def to_hats(
catalog: HealpixDataset,
*,
base_catalog_path: str | Path | UPath,
catalog_name: str | None = None,
default_columns: list[str] | None = None,
histogram_order: int | None = None,
overwrite: bool = False,
resume: bool = False,
progress_bar: bool = True,
tqdm_kwargs=None,
create_thumbnail: bool = False,
skymap_alt_orders: list[int] | None = None,
npix_suffix: str = ".parquet",
npix_parquet_name: str | None = None,
addl_hats_properties: dict | None = None,
error_if_empty: bool = True,
**kwargs,
):
"""Writes a catalog to disk, in HATS format.
The output catalog comprises partitioned parquet files and respective metadata,
as well as text and CSV files detailing partition, catalog and provenance info.
This will only write a SINGLE catalog, so if this catalog contains margins,
you should use ``to_collection`` to write all parts of the catalog together.
Parameters
----------
catalog : HealpixDataset
A catalog to export
base_catalog_path : path-like
Location where catalog is saved to
catalog_name : str or None, default None
The name of the output catalog
default_columns : list[str] or None, default None
A metadata property with the list of the columns in the
catalog to be loaded by default. Uses the default columns from the original hats
catalog if they exist.
histogram_order : int or None, default None
The default order for the count histogram. Defaults to the same
skymap order as original catalog, or the highest order healpix of the current
catalog data partitions.
overwrite : bool, default False
If True existing catalog is overwritten
resume : bool, default False
If True, will attempt to resume a previously interrupted write to the same directory.
progress_bar : bool, default True
If True, shows a progress bar for the export process. Defaults to True.
tqdm_kwargs : dict, default None
Additional kwargs to pass to the tqdm progress bar.
create_thumbnail : bool, default False
If True, create a data thumbnail of the catalog for
previewing purposes. Defaults to False.
skymap_alt_orders : list[int] or None, default None
We will write a skymap file at the ``histogram_order``,
but can also write down-sampled skymaps, for easier previewing of the data.
npix_suffix : str, default '.parquet'
Extension for the parquet file (or `/` if a directory)
npix_parquet_name : str | None, default None
Name of the pixel parquet file to be used when npix_suffix=/.
By default, it will be named after the pixel with a .parquet
extension (e.g. 'Npix=10.parquet').
addl_hats_properties : dict or None, default None
key-value pairs of additional properties to write in the
``hats.properties`` file.
error_if_empty : bool, default True
If True, raises an error if the output catalog is empty
**kwargs :
Arguments to pass to the parquet write operations
"""
if overwrite and resume:
raise ValueError("overwrite and resume cannot both be True")
# Create the output directory for the catalog
base_catalog_path = hc.io.file_io.get_upath(base_catalog_path)
existing_pixels: list[HealpixPixel] = []
histograms: list[SparseHistogram] = []
counts: list[int] = []
if hc.io.file_io.directory_has_contents(base_catalog_path):
if not overwrite and not resume:
raise ValueError(
f"base_catalog_path ({str(base_catalog_path)}) contains files."
" choose a different directory or set overwrite or resume to True."
)
if overwrite:
hc.io.file_io.remove_directory(base_catalog_path)
if resume:
existing_pixels, histograms, counts = _read_existing_progress(base_catalog_path, catalog)
hc.io.file_io.make_directory(base_catalog_path, exist_ok=True)
if histogram_order is None:
if catalog.hc_structure.catalog_info.skymap_order is not None:
histogram_order = catalog.hc_structure.catalog_info.skymap_order
else:
max_catalog_depth = (
catalog.hc_structure.pixel_tree.get_max_depth()
if len(catalog.get_healpix_pixels()) > 0
else 0
)
histogram_order = max(max_catalog_depth, 8)
# Save partition parquet files
kwargs = set_default_write_table_kwargs(kwargs)
desc = tqdm_kwargs.pop("desc", "Writing Catalog") if tqdm_kwargs else "Writing Catalog"
with TqdmCallback(desc=desc, disable=not progress_bar, **(tqdm_kwargs or {})):
new_pixels, new_counts, new_histograms = write_partitions(
catalog,
base_catalog_dir_fp=base_catalog_path,
histogram_order=histogram_order,
existing_pixels=existing_pixels,
npix_suffix=npix_suffix,
npix_parquet_name=npix_parquet_name,
**kwargs,
)
pixels = existing_pixels + new_pixels
histograms = histograms + new_histograms
counts = counts + new_counts
# Check that the catalog is not empty
if error_if_empty and len(pixels) == 0:
raise RuntimeError("The output catalog is empty")
# Save parquet metadata and create a data thumbnail if needed
hats_max_rows = int(max(counts)) if counts else 0
_write_parquet_metadata(base_catalog_path, catalog, create_thumbnail, hats_max_rows, pixels)
# Save partition info
PartitionInfo(pixels).write_to_file(base_catalog_path / "partition_info.csv")
# Save catalog info
if default_columns:
_validate_default_columns(catalog, default_columns)
else:
default_columns = None
if not addl_hats_properties:
addl_hats_properties = {}
if catalog.hc_structure.catalog_info.catalog_type in (CatalogType.OBJECT, CatalogType.SOURCE):
addl_hats_properties = addl_hats_properties | {
"skymap_order": int(histogram_order),
"skymap_alt_orders": skymap_alt_orders,
}
_write_skymaps(base_catalog_path, histogram_order, histograms, skymap_alt_orders)
addl_hats_properties = addl_hats_properties | new_provenance_properties(base_catalog_path)
new_hc_structure = create_modified_catalog_structure(
catalog.hc_structure,
base_catalog_path,
catalog_name if catalog_name else catalog.hc_structure.catalog_name,
total_rows=int(np.sum(counts)),
default_columns=default_columns,
hats_max_rows=hats_max_rows,
hats_npix_suffix=npix_suffix,
**addl_hats_properties,
)
new_hc_structure.catalog_info.to_properties_file(base_catalog_path)
remove_histogram_files(base_catalog_path)
remove_done_files(base_catalog_path)
def _read_existing_progress(
base_catalog_path: UPath, catalog: HealpixDataset
) -> tuple[list[HealpixPixel], Any, Any]:
"""Read the done and histogram files from a previous write attempt to the same directory"""
existing_pixels = read_done_pixels(base_catalog_path)
hist_pixels, histograms, counts = read_histograms(base_catalog_path)
for pixel in existing_pixels:
if pixel not in catalog.hc_structure.pixel_tree:
raise ValueError(
f"Pixel {pixel} from existing catalog files is not present in the provided catalog."
" Cannot resume write. Please check that the directory contains files from this "
" catalog, or set resume to False and overwrite to True to overwrite to this"
" directory instead of resuming."
)
if len(hist_pixels) != len(existing_pixels) or set(hist_pixels) != set(existing_pixels):
raise ValueError(
"The existing histogram files in the directory do not match the done pixels."
" Cannot resume write. Please check that the directory contains files from this catalog,"
" or set resume to False and overwrite to True to overwrite to this directory instead of"
" resuming."
)
return existing_pixels, histograms, counts
def _write_parquet_metadata(
base_catalog_path: UPath,
catalog: HealpixDataset,
create_thumbnail: bool,
hats_max_rows: int,
pixels: list[Any],
):
"""Writes the parquet metadata for the catalog. If there are no pixels,
writes empty metadata files with the correct schema."""
if len(pixels) > 0:
hc.io.write_parquet_metadata(
base_catalog_path, create_thumbnail=create_thumbnail, thumbnail_threshold=hats_max_rows
)
else:
metadata_path = hc.io.paths.get_parquet_metadata_pointer(base_catalog_path)
common_metadata_path = hc.io.paths.get_common_metadata_pointer(base_catalog_path)
file_io.make_directory(metadata_path.parent, exist_ok=True)
pq.write_metadata(catalog.hc_structure.schema, metadata_path.path, filesystem=metadata_path.fs)
pq.write_metadata(
catalog.hc_structure.schema, common_metadata_path.path, filesystem=common_metadata_path.fs
)
def _write_skymaps(
base_catalog_path: UPath,
histogram_order: int,
histograms: list[Any] | Any,
skymap_alt_orders: list[int] | None,
):
"""Writes the skymap files for the catalog, based on the histograms for each partition."""
# Save the point distribution map
total_histogram = HistogramAggregator(histogram_order)
for partition_hist in histograms:
total_histogram.add(partition_hist)
point_map_path = hc.io.paths.get_point_map_file_pointer(base_catalog_path)
full_histogram = total_histogram.full_histogram
hc.io.file_io.write_fits_image(full_histogram, point_map_path)
write_skymap(histogram=full_histogram, catalog_dir=base_catalog_path, orders=skymap_alt_orders)
def _validate_default_columns(catalog: HealpixDataset, default_columns: list[str]):
"""Checks that the provided default columns are valid"""
# Check if any of the default columns is missing
missing_columns = set(default_columns) - set(catalog._ddf.exploded_columns)
if missing_columns:
raise ValueError(f"Default columns `{missing_columns}` not found in catalog")
# Check for full and partial load of the same column and error
all_subcolumns = catalog._ddf._meta.get_subcolumns()
for col in default_columns:
if col in all_subcolumns:
nested_col = col.split(".")[0]
if nested_col in default_columns:
raise ValueError(
f"The provided default column list contains both a "
f"full and partial load of the column '{nested_col}'. "
f"Please either remove the partial load or the full load."
)
def write_partitions(
catalog: HealpixDataset,
base_catalog_dir_fp: str | Path | UPath,
histogram_order: int,
existing_pixels: list[HealpixPixel] | None = None,
npix_suffix: str = ".parquet",
npix_parquet_name: str | None = None,
**kwargs,
) -> tuple[list[HealpixPixel], list[int], list[SparseHistogram]]:
"""Saves catalog partitions as parquet to disk and computes the sparse
count histogram for each partition. The histogram is either of order 8
or the maximum pixel order in the catalog, whichever is greater.
Parameters
----------
catalog : HealpixDataset
A catalog to export
base_catalog_dir_fp : path-like
Path to the base directory of the catalog
histogram_order : int
The order of the count histogram to generate
existing_pixels : list[HealpixPixel] | None, default None
The pixels that already exist and should be skipped.
npix_suffix : str, default '.parquet'
Extension for the parquet file (or `/` if a directory)
npix_parquet_name : str | None, default None
Name of the pixel parquet file to be used when npix_suffix=/.
By default, it will be named after the pixel with a .parquet
extension (e.g. 'Npix=10.parquet').
**kwargs
Arguments to pass to the parquet write operations
Returns
-------
tuple[list[HealpixPixel], list[int], list[SparseHistogram]]
A tuple with the array of non-empty pixels, the array with the total counts
as well as the array with the sparse count histograms.
"""
base_catalog_dir_fp = hc.io.file_io.get_upath(base_catalog_dir_fp)
results, pixels = [], []
partitions = catalog._ddf.to_delayed()
existing_pixels_set = set(existing_pixels) if existing_pixels is not None else set()
for pixel, partition_index in catalog._ddf_pixel_map.items():
if pixel in existing_pixels_set:
continue
results.append(
perform_write(
partitions[partition_index],
pixel,
base_catalog_dir_fp,
histogram_order,
npix_suffix,
npix_parquet_name,
**kwargs,
)
)
pixels.append(pixel)
if len(results) > 0:
results = dask.compute(*results)
counts, histograms = list(zip(*results))
else:
counts, histograms = (), ()
non_empty_indices = np.nonzero(counts)
non_empty_pixels = np.array(pixels)[non_empty_indices]
non_empty_counts = np.array(counts)[non_empty_indices]
non_empty_hists = np.array(histograms)[non_empty_indices]
return list(non_empty_pixels), list(non_empty_counts), list(non_empty_hists)
def create_modified_catalog_structure(
catalog_structure: HCHealpixDataset, catalog_base_dir: str | Path | UPath, catalog_name: str, **kwargs
) -> HCHealpixDataset:
"""Creates a modified version of the HATS catalog structure
Parameters
----------
catalog_structure : HCHealpixDataset
HATS catalog structure
catalog_base_dir : path-like
Base location for the catalog
catalog_name : str
The name of the catalog to be saved
**kwargs
The remaining parameters to be updated in the catalog info object
Returns
-------
HCHealpixDataset
A HATS structure, modified with the parameters provided.
"""
new_hc_structure = copy(catalog_structure)
new_hc_structure.catalog_name = catalog_name
new_hc_structure.catalog_path = catalog_base_dir
new_hc_structure.catalog_base_dir = hc.io.file_io.get_upath(catalog_base_dir)
new_hc_structure.catalog_info = new_hc_structure.catalog_info.copy_and_update(**kwargs)
new_hc_structure.catalog_info.catalog_name = catalog_name
return new_hc_structure