Get light-curves from ZTF and PS1 for SNAD catalog#

This notebook demonstrates how to get epoch photometry for a custom pointing catalog, in this case, the SNAD catalog. We will get the SNAD catalog as a pandas dataframe, cross-match it with PS1 DR2 object (OTMO) “detection” catalogs and ZTF DR16 Zubercal catalog.

Install and import required packages#

[1]:
# Install lsdb and matplotlib
%pip install --quiet lsdb matplotlib
Note: you may need to restart the kernel to use updated packages.
[2]:
import dask.distributed
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from lsdb import read_hats, from_dataframe
from upath import UPath

Define paths to PS1 DR2 catalogs#

We access all the data through the web, so we don’t need to download it in advance. However, we are going much more data than we’re actually going to use, so it will take some time.

[3]:
# Define paths to PS1 DR2 catalogs
PS1_PATH = UPath("s3://stpubdata/panstarrs/ps1/public/hats/", anon=True)
PS1_OBJECT = PS1_PATH / "otmo"
PS1_DETECTION = PS1_PATH / "detection"

# Define paths to ZTF DR16 Zubercal catalog
ZUBERCAL_PATH = "https://data.lsdb.io/hats/ztf_dr16/zubercal/"

Download and convert SNAD catalog to Hats format, in memory#

SNAD catalog is just ~100 rows, we download it through Pandas and convert to LSDB’s Catalog object with lsdb.from_dataframe.

[4]:
# Load SNAD catalog, remove rows with missing values and rename columns to more friendly names
snad_df = pd.read_csv(
    "https://snad.space/catalog/snad_catalog.csv",
    dtype_backend="pyarrow",
).dropna()
display(snad_df)

# Convert to LSDB's Catalog object
snad_catalog = from_dataframe(
    snad_df,
    # Optimize partition size
    drop_empty_siblings=True,
    # Keep partitions small
    lowest_order=5,
    # Specify columns with coordinates
    ra_column="R.A.",
    dec_column="Dec.",
)
display(snad_catalog)
Name R.A. Dec. OID Discovery date (UT) mag er_down er_up ref er_ref TNS Type Comments
0 SNAD101 247.45543 24.77282 633207400004730 2018-04-08 09:45:49 21.11 0.27 0.36 20.84 0.06 AT 2018lwh PSN ZTF18abqkqdm
1 SNAD102 245.05375 28.3822 633216300024691 2018-03-21 11:08:19 21.18 0.28 0.39 20.26 0.07 AT 2018lwi PSN ZTF18abdgwos
... ... ... ... ... ... ... ... ... ... ... ... ... ...
155 SNAD257 275.96866 27.24837 637112100033906 2018-07-09 09:11:44 21.36 0.28 0.37 20.56 0.04 AT 2018mok PSN ZTF18abhxidx
156 SNAD258 43.61864 35.95618 653113400006264 2018-08-06 11:56:08 19.93 0.12 0.13 20.67* 0.02 AT 2018mol PSN ZTF18abtslir

110 rows × 13 columns

lsdb Catalog from_lsdb_dataframe:
Name R.A. Dec. OID Discovery date (UT) mag er_down er_up ref er_ref TNS Type Comments Norder Dir Npix
npartitions=104
Order: 7, Pixel: 3996 string[pyarrow] double[pyarrow] double[pyarrow] int64[pyarrow] string[pyarrow] double[pyarrow] double[pyarrow] double[pyarrow] string[pyarrow] double[pyarrow] string[pyarrow] string[pyarrow] string[pyarrow] uint8[pyarrow] uint64[pyarrow] uint64[pyarrow]
Order: 7, Pixel: 22298 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
Order: 7, Pixel: 130876 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
Order: 5, Pixel: 8184 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
The catalog has been loaded lazily, meaning no data has been read, only the catalog schema

Load PS1 catalog structure and metadata#

lsdb.read_hats doesn’t load the data immediately, it just reads the metadata and structure of the catalog. Here we use it to load PS1 DR2 object and detection catalogs, selecting only a few columns to speed up the pipeline.

[5]:
# Load PS1 catalogs metadata
ps1_object = read_hats(
    PS1_OBJECT,
    columns=[
        "objID",  # PS1 ID
        "raMean",
        "decMean",  # coordinates to use for cross-matching
        "nStackDetections",  # some other data to use
    ],
)
display(ps1_object)

ps1_detection = read_hats(
    PS1_DETECTION,
    columns=[
        "objID",  # PS1 object ID
        "detectID",  # PS1 detection ID
        # not really going to use it, but we can alternatively directly cross-match with detection table
        "ra",
        "dec",
        # light-curve stuff
        "obsTime",
        "filterID",
        "psfFlux",
        "psfFluxErr",
    ],
)
display(ps1_detection)
lsdb Catalog otmo:
objID raMean decMean nStackDetections
npartitions=27161
Order: 5, Pixel: 0 int64[pyarrow] double[pyarrow] double[pyarrow] int16[pyarrow]
Order: 5, Pixel: 1 ... ... ... ...
... ... ... ... ...
Order: 6, Pixel: 49150 ... ... ... ...
Order: 6, Pixel: 49151 ... ... ... ...
The catalog has been loaded lazily, meaning no data has been read, only the catalog schema
lsdb Catalog detection:
objID detectID ra dec obsTime filterID psfFlux psfFluxErr
npartitions=83004
Order: 6, Pixel: 0 int64[pyarrow] int64[pyarrow] double[pyarrow] double[pyarrow] double[pyarrow] int16[pyarrow] double[pyarrow] double[pyarrow]
Order: 6, Pixel: 1 ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ...
Order: 6, Pixel: 49150 ... ... ... ... ... ... ... ...
Order: 6, Pixel: 49151 ... ... ... ... ... ... ... ...
The catalog has been loaded lazily, meaning no data has been read, only the catalog schema

Load ZTF DR16 Zubercal catalog#

Zubercal is a uber-calibrated light-curve catalog for ZTF DR16. It matches ZTF detections to PS1 objects and provides calibrated magnitudes.

[6]:
# Load Zubercal metadata
zubercal = read_hats(
    ZUBERCAL_PATH,
    columns=[
        "objectid",  # matches to PS1 objID
        "mjd",
        "band",
        "mag",
        "magerr",  # integer, units are 1e-4 mag
    ],
)
display(zubercal)
lsdb Catalog zubercal:
objectid mjd band mag magerr
npartitions=70853
Order: 5, Pixel: 0 int64[pyarrow] double[pyarrow] string[pyarrow] float[pyarrow] uint16[pyarrow]
Order: 5, Pixel: 1 ... ... ... ... ...
... ... ... ... ... ...
Order: 6, Pixel: 49150 ... ... ... ... ...
Order: 6, Pixel: 49151 ... ... ... ... ...
The catalog has been loaded lazily, meaning no data has been read, only the catalog schema

Plan cross-matching and joining#

LSDB doesn’t do any work until you call compute method, so here we just plan the work.

  1. We find PS1 objects that are within 1 arcsec of SNAD objects.

  2. Join Zubercal catalog to aggregate light-curves. Each light-curve would be represented by a “nested” dataframe, where each row is a detection.

  3. Do the same for PS1 detections.

[7]:
# Planning cross-matching with objects, no work happens here
snad_ps1 = snad_catalog.crossmatch(
    ps1_object,
    radius_arcsec=0.2,
    suffixes=["", ""],
    # Ignore the missing margin cache.
    # Theoretically it may cause some data loss, but not in this pipeline
    require_right_margin=False,
)

# Join with Zubercal detections to get one more set of light-curves
snad_ztf_lc = snad_ps1.join_nested(
    zubercal,
    left_on="objID",
    right_on="objectid",
    # light-curve will live in "ztf_lc" column
    nested_column_name="ztf_lc",
    output_catalog_name="snad_ztf_lc",
)

# Join with PS1 detections to get light-curves
snad_ps1_ztf_lc = snad_ztf_lc.join_nested(
    ps1_detection,
    left_on="objID",
    right_on="objID",
    # light-curve will live in "ps1_lc" column
    nested_column_name="ps1_lc",
    output_catalog_name="snad_ps1_ztf_lc",
)

display(snad_ps1_ztf_lc)
/Users/hombit/projects/lincc-frameworks/lsdb/src/lsdb/dask/crossmatch_catalog_data.py:108: RuntimeWarning: Right catalog does not have a margin cache. Results may be incomplete and/or inaccurate.
  warnings.warn(
/Users/hombit/projects/lincc-frameworks/lsdb/src/lsdb/dask/join_catalog_data.py:332: RuntimeWarning: Right catalog does not have a margin cache. Results may be incomplete and/or inaccurate.
  warnings.warn(
/Users/hombit/projects/lincc-frameworks/lsdb/src/lsdb/dask/join_catalog_data.py:332: RuntimeWarning: Right catalog does not have a margin cache. Results may be incomplete and/or inaccurate.
  warnings.warn(
lsdb Catalog snad_ps1_ztf_lc:
Name R.A. Dec. OID Discovery date (UT) mag er_down er_up ref er_ref TNS Type Comments Norder Dir Npix objID raMean decMean nStackDetections _dist_arcsec ztf_lc ps1_lc
npartitions=110
Order: 7, Pixel: 3996 string[pyarrow] double[pyarrow] double[pyarrow] int64[pyarrow] string[pyarrow] double[pyarrow] double[pyarrow] double[pyarrow] string[pyarrow] double[pyarrow] string[pyarrow] string[pyarrow] string[pyarrow] uint8[pyarrow] uint64[pyarrow] uint64[pyarrow] int64[pyarrow] double[pyarrow] double[pyarrow] int16[pyarrow] double[pyarrow] nested<mjd: [double], band: [string], mag: [fl... nested<detectID: [int64], ra: [double], dec: [...
Order: 7, Pixel: 22298 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
Order: 7, Pixel: 130947 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
Order: 7, Pixel: 130954 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
The catalog has been loaded lazily, meaning no data has been read, only the catalog schema

Run the pipeline!#

Here we finally run the pipeline and get the light-curves. This requires Dask client for parallel execution, and you probably want to adjust the parameters to your hardware. See Dask documentation for details: Dask deployment, Dask deployment with Python. Also, see this page for LSDB specific Dask configuration tips.

The output ndf is a nested dataframe, NestedFrame from nested-pandas package. We will see later how to access the light-curves from it.

[8]:
%%time

# It will take a way to fetch all the data from the Internet
with dask.distributed.Client() as client:
    display(client)
    ndf = snad_ps1_ztf_lc.compute()

display(ndf)

Client

Client-dceea3ee-fa06-11ef-84d2-1ecfd5b80ac1

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/distributed/client.py:3370: UserWarning: Sending large graph of size 11.52 MiB.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
  warnings.warn(
2025-03-05 17:01:21,129 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/distributed/comm/tcp.py", line 226, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/distributed/worker.py", line 1269, in heartbeat
    response = await retry_operation(
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/distributed/utils_comm.py", line 416, in retry_operation
    return await retry(
           ^^^^^^^^^^^^
  File "/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/distributed/utils_comm.py", line 395, in retry
    return await coro()
           ^^^^^^^^^^^^
  File "/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/distributed/core.py", line 1259, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/distributed/core.py", line 1018, in send_recv
    response = await comm.read(deserializers=deserializers)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/distributed/comm/tcp.py", line 237, in read
    convert_stream_closed_error(self, e)
  File "/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/distributed/comm/tcp.py", line 137, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:57825 remote=tcp://127.0.0.1:57799>: Stream is closed
2025-03-05 17:01:21,134 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/distributed/comm/tcp.py", line 226, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/distributed/worker.py", line 1269, in heartbeat
    response = await retry_operation(
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/distributed/utils_comm.py", line 416, in retry_operation
    return await retry(
           ^^^^^^^^^^^^
  File "/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/distributed/utils_comm.py", line 395, in retry
    return await coro()
           ^^^^^^^^^^^^
  File "/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/distributed/core.py", line 1259, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/distributed/core.py", line 1018, in send_recv
    response = await comm.read(deserializers=deserializers)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/distributed/comm/tcp.py", line 237, in read
    convert_stream_closed_error(self, e)
  File "/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/distributed/comm/tcp.py", line 137, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:57824 remote=tcp://127.0.0.1:57799>: Stream is closed
  Name R.A. Dec. OID Discovery date (UT) mag er_down er_up ref er_ref TNS Type Comments Norder Dir Npix objID raMean decMean nStackDetections _dist_arcsec ztf_lc ps1_lc
_healpix_29                                              
392922938507087610 SNAD195 173.238910 47.548460 754108100010282 2019-05-14 05:00:23 20.620000 0.200000 0.250000 20.84 0.040000 AT 2019aapu PSN ZTF19aavkyng 7 20000 22335 165051732389348889 173.238928 47.548471 5 0.058477
mjd band mag magerr
59329.258944 r 20.398399 2014

873 rows × 4 columns

detectID ra ... psfFlux psfFluxErr
293064378550000326 173.238913 ... 0.000031 0.000002

92 rows × 7 columns

446151138901733801 SNAD243 113.919200 31.779590 662105400007673 2018-03-30 05:34:18 20.810000 0.280000 0.390000 19.8 0.070000 AT 2018mdt PSN ZTF18aceqval 7 20000 25360 146131139191046084 113.919172 31.779571 10 0.110847
58854.504598 g 19.2197 1450

2979 rows × 4 columns

259330403710000706 113.919157 ... 0.000161 0.000008

94 rows × 7 columns

636762943118841180 SNAD199 232.002550 29.723200 631115200000919 2019-05-29 08:21:26 20.090000 0.150000 0.170000 19.99 0.070000 AT 2019aapy PSN ZTF19aavovik 5 0 2262 143662320025368377 232.002534 29.723172 5 0.112213
58733.212085 r 19.3976 1482

2873 rows × 4 columns

153653349440000411 232.002594 ... 0.000096 0.000009

74 rows × 7 columns

636879053204760495 SNAD225 231.084920 29.975240 677101400006297 2018-04-02 08:45:19 19.700000 0.120000 0.140000 19.6 0.040000 AT 2018mdb PSN ZTF18aajzqbq 5 0 2262 143972310848550829 231.084883 29.975228 5 0.121680
59267.387647 g 19.4119 1150

1671 rows × 4 columns

160534481420000393 231.084941 ... 0.000049 0.000002

70 rows × 7 columns

644648029047416404 SNAD184 223.601150 32.778720 676205100003608 2018-06-23 06:08:59 21.060000 0.310000 0.430000 20.26 0.040000 AT 2018mby PSN ZTF18abdgucl 7 30000 36643 147332236011775015 223.601160 32.778696 20 0.090933
58292.19438 g 20.217899 1609

2152 rows × 4 columns

188765988050000686 223.601056 ... 0.00004 0.000007

80 rows × 7 columns

... ... ... ... ... ... ...
48 rows x 23 columns
CPU times: user 1min 55s, sys: 42.1 s, total: 2min 37s
Wall time: 46min 36s

Plot first five light curves#

Here we plot light curves, you can change plot type by setting MAG_OR_FLUX to mag or flux.

NestedFrame from nested-pandas (docs) is just a wrapper around pandas DataFrame, so all you know about pandas applies here. Every light-curve is packed into items of ps1_lc and ztf_lc columns. When you are getting a single element from a nested dataframe, you get a pandas dataframe!

[9]:
MAG_OR_FLUX = "mag"

ps1_filter_id_to_name = {1: "g", 2: "r", 3: "i", 4: "z", 5: "y"}
get_ps1_name_from_filter_id = np.vectorize(ps1_filter_id_to_name.get)
filter_colors = {"g": "green", "r": "red", "i": "black", "z": "purple", "y": "cyan"}

ndf_sorted = ndf.sort_values("Name")

for i in range(5):
    # snad_name is a string, ps1_lc and ztf_lc are pandas dataframes
    snad_name, ps1_lc, ztf_lc = ndf_sorted.iloc[i][["Name", "ps1_lc", "ztf_lc"]]

    # when we iterate a nested series we are getting pandas dataframes of light curves
    plt.figure(figsize=(12, 6))

    # PS1
    ps1_bands = get_ps1_name_from_filter_id(ps1_lc["filterID"])
    for band in "grizy":
        color = filter_colors[band]
        band_idx = ps1_bands == band
        t = ps1_lc["obsTime"][band_idx]
        flux = ps1_lc["psfFlux"][band_idx] * 1e6  # micro Jy
        err = ps1_lc["psfFluxErr"][band_idx] * 1e6
        mag = 8.9 - 2.5 * np.log10(flux / 1e6)
        mag_plus = 8.9 - 2.5 * np.log10((flux - err) / 1e6)
        mag_minus = 8.9 - 2.5 * np.log10((flux + err) / 1e6)
        if MAG_OR_FLUX == "flux":
            plt.scatter(
                t,
                flux,
                marker="*",
                color=color,
                label=f"PS1 {band}",
                alpha=0.3,
            )
            plt.errorbar(
                t,
                flux,
                err,
                ls="",
                color=color,
                alpha=0.15,
            )
        elif MAG_OR_FLUX == "mag":
            plt.scatter(
                t,
                mag,
                marker="*",
                color=color,
                label=f"PS1 {band}",
                alpha=0.3,
            )
            plt.errorbar(
                t,
                mag,
                [mag - mag_minus, mag_plus - mag],
                ls="",
                color=color,
                alpha=0.15,
            )
        else:
            raise ValueError(f"MAG_OR_FLUX should be mag or flux")

    # ZTF
    ztf_bands = ztf_lc["band"]
    for band in "gri":
        band_idx = ztf_bands == band
        color = filter_colors[band]
        t = ztf_lc["mjd"][band_idx]
        mag = ztf_lc["mag"][band_idx]
        magerr = np.asarray(ztf_lc["magerr"][band_idx], dtype=float) * 1e-4
        flux = np.power(10, 0.4 * (8.9 - mag)) * 1e6  # micro Jy
        err = 0.4 * np.log(10) * flux * magerr
        if MAG_OR_FLUX == "flux":
            plt.scatter(
                t,
                flux,
                marker="x",
                color=color,
                label=f"ZTF {band}",
                alpha=0.3,
            )
            plt.errorbar(
                t,
                flux,
                err,
                ls="",
                color=color,
                alpha=0.15,
            )
        else:
            plt.scatter(
                t,
                mag,
                marker="x",
                color=color,
                label=f"ZTF {band}",
                alpha=0.3,
            )
            plt.errorbar(
                t,
                mag,
                magerr,
                ls="",
                color=color,
                alpha=0.15,
            )

    plt.title(snad_name)
    plt.xlabel("MJD")
    plt.ylabel("Flux, μJy" if MAG_OR_FLUX == "flux" else "mag")
    if MAG_OR_FLUX == "mag":
        plt.gca().invert_yaxis()
    plt.legend(loc="upper left")
/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/matplotlib/cbook.py:1699: FutureWarning: Calling float on a single element Series is deprecated and will raise a TypeError in the future. Use float(ser.iloc[0]) instead
  return math.isfinite(val)
/Users/hombit/.virtualenvs/lsdb/lib/python3.12/site-packages/matplotlib/cbook.py:1699: FutureWarning: Calling float on a single element Series is deprecated and will raise a TypeError in the future. Use float(ser.iloc[0]) instead
  return math.isfinite(val)
../../_images/tutorials_pre_executed_zubercal-ps1-snad_17_1.png
../../_images/tutorials_pre_executed_zubercal-ps1-snad_17_2.png
../../_images/tutorials_pre_executed_zubercal-ps1-snad_17_3.png
../../_images/tutorials_pre_executed_zubercal-ps1-snad_17_4.png
../../_images/tutorials_pre_executed_zubercal-ps1-snad_17_5.png

We observe that some objects are missing—this is because Pan-STARRS lacked information about the transients, and the host center is either too faint or too distant from the transient.

None of these objects were active in Pan-STARRS, but they were active in ZTF, confirming that they are robust supernova candidates.