Source code for lsdb.io.to_lance
from __future__ import annotations
from datetime import timedelta
from pathlib import Path
import pyarrow as pa
from tqdm import tqdm
from upath import UPath
from lsdb.catalog.dataset.healpix_dataset import HealpixDataset
_TABLE_NAME = "data"
[docs]
def to_lance(
catalog: HealpixDataset,
*,
base_catalog_path: str | Path | UPath,
table_name: str = _TABLE_NAME,
overwrite: bool = False,
progress_bar: bool = True,
optimize_dataset: bool = True,
) -> None:
"""Writes a catalog to a Lance dataset.
All primary catalog partitions are written as a single flat Lance dataset.
Every column in the catalog — including the HEALPix spatial index — is preserved.
The margin catalog (if present) is not written to Lance. The resulting dataset
can be opened with ``lancedb.connect(base_catalog_path).open_table("data")``.
Parameters
----------
catalog : HealpixDataset
The catalog to export.
base_catalog_path : str | Path | UPath
Path where the Lance dataset will be written.
table_name : str, default "data"
Name of the table to create in the Lance database. This is the name used
to open the table later with lancedb.
overwrite : bool, default False
If True, an existing dataset at ``base_catalog_path`` is overwritten.
If False and a dataset already exists there, an error is raised.
progress_bar : bool, default True
If True, shows a progress bar while writing partitions.
optimize_dataset : bool, default True
If True, optimizes the Lance dataset after writing all partitions.
This will improve query performance but will increase the total time required
to write the dataset.
Raises
------
ImportError
If the ``lancedb`` package is not installed.
ValueError
If a dataset already exists at ``base_catalog_path`` and ``overwrite=False``.
RuntimeError
If the catalog is empty and no data is written.
Examples
--------
Export a catalog and open it with lancedb:
>>> import lsdb
>>> catalog = lsdb.read_hats("path/to/small_sky") # doctest: +SKIP
>>> catalog.to_lance("/tmp/my_catalog") # doctest: +SKIP
Open the result:
>>> import lancedb # doctest: +SKIP
>>> db = lancedb.connect("/tmp/my_catalog") # doctest: +SKIP
>>> tbl = db.open_table("data") # doctest: +SKIP
"""
try:
# pylint: disable=import-outside-toplevel
import lancedb
except ImportError as err:
raise ImportError(
"to_lance requires the `lancedb` package. Install it with `pip install lancedb`."
) from err
base_catalog_path = UPath(base_catalog_path)
lance_table_path = base_catalog_path / f"{table_name}.lance"
if lance_table_path.exists() and any(lance_table_path.iterdir()):
if not overwrite:
raise ValueError(
f"A Lance table already exists at '{lance_table_path}'."
" Choose a different path or set overwrite=True to overwrite the existing dataset."
)
path = str(base_catalog_path)
# pylint: disable=protected-access
delayed_partitions = catalog._ddf.to_delayed()
pixel_partition_pairs = list(catalog._ddf_pixel_map.items())
db = lancedb.connect(path)
table: lancedb.Table | None = None
for _, partition_index in tqdm(
pixel_partition_pairs,
desc="Writing to Lance",
disable=not progress_bar,
):
df = delayed_partitions[partition_index].compute()
if len(df) == 0:
continue
pa_table = pa.Table.from_pandas(df.reset_index(), preserve_index=False)
if table is None:
mode = "overwrite" if overwrite else "create"
table = db.create_table(table_name, pa_table, mode=mode)
else:
table.add(pa_table)
if table is None:
raise RuntimeError("The output catalog is empty. No data was written to Lance.")
if optimize_dataset:
# TODO: Replace with appropriate logging message and level
print("Optimizing Lance dataset...")
table.optimize(cleanup_older_than=timedelta(0), delete_unverified=True)
print(f"Finished writing output to lance. Path: {path}, Table name: {table_name}")