Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify filter_observations to accept a path to a parquet file #153

Merged
merged 2 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 8 additions & 23 deletions thor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pathlib
import time
from dataclasses import dataclass
from typing import Iterable, Iterator, List, Literal, Optional, Tuple
from typing import Iterable, Iterator, List, Literal, Optional, Tuple, Union

import quivr as qv
import ray
Expand Down Expand Up @@ -62,7 +62,7 @@ class LinkTestOrbitStageResult:

def link_test_orbit(
test_orbit: TestOrbits,
observations: Observations,
observations: Union[str, Observations],
moeyensj marked this conversation as resolved.
Show resolved Hide resolved
working_dir: Optional[str] = None,
filters: Optional[List[ObservationFilter]] = None,
config: Optional[Config] = None,
Expand All @@ -81,8 +81,11 @@ def link_test_orbit(
----------
test_orbit : `~thor.orbit.TestOrbit`
Test orbit to use to gather and transform observations.
observations : `~thor.observations.observations.Observations`
Observations from which range and transform the detections.
observations : `~thor.observations.observations.Observations` or str
Observations to search for moving objects. These observations can
be an in-memory Observations object or a path to a parquet file containing the
observations. If a path is provided, the observations will be loaded in chunks for
filtering.
working_dir : str, optional
Directory with persisted config and checkpointed results.
filters : list of `~thor.observations.filters.ObservationFilter`, optional
Expand Down Expand Up @@ -125,14 +128,6 @@ def link_test_orbit(
)

refs_to_free = []
if (
use_ray
and observations is not None
and not isinstance(observations, ray.ObjectRef)
):
observations = ray.put(observations)
refs_to_free.append(observations)
logger.info("Placed observations in the object store.")

checkpoint = load_initial_checkpoint_values(test_orbit_directory)
logger.info(f"Starting at stage: {checkpoint.stage}")
Expand All @@ -152,12 +147,6 @@ def link_test_orbit(
)

if checkpoint.stage == "filter_observations":
if use_ray:
if not isinstance(observations, ray.ObjectRef):
observations = ray.put(observations)
refs_to_free.append(observations)
logger.info("Placed observations in the object store.")

filtered_observations = filter_observations(
observations, test_orbit, config, filters
)
Expand Down Expand Up @@ -186,11 +175,7 @@ def link_test_orbit(
filtered_observations=filtered_observations,
)

# Observations are no longer needed. If we are using ray
# lets explicitly free the memory.
if use_ray and isinstance(observations, ray.ObjectRef):
ray.internal.free([observations])
logger.info("Removed observations from the object store.")
# Observations are no longer needed
del observations

if checkpoint.stage == "range_and_transform":
Expand Down
Loading
Loading