Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archiving all-files scheduler #388

Merged
merged 17 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion jupyter_scheduler/executors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import io
import os
import tarfile
import traceback
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -179,7 +180,7 @@ class ArchivingExecutionManager(DefaultExecutionManager):

Notes
-----
Should be used along with :class:`~jupyter_scheduler.scheduler.ArchiveDownloadingScheduler`
Should be used along with :class:`~jupyter_scheduler.scheduler.ArchivingScheduler`
as the `scheduler_class` during jupyter server start.
"""

Expand Down Expand Up @@ -221,3 +222,66 @@ def execute(self):
archive_filepath = self.staging_paths["tar.gz"]
with fsspec.open(archive_filepath, "wb") as f:
f.write(fh.getvalue())


class AllFilesArchivingExecutionManager(DefaultExecutionManager):
"""Execution manager that archives all output files in and under the
output directory into a single archive file

Notes
-----
Should be used along with :class:`~jupyter_scheduler.scheduler.AllFilesArchivingScheduler`
as the `scheduler_class` during jupyter server start.
"""
JasonWeill marked this conversation as resolved.
Show resolved Hide resolved

def execute(self):
job = self.model

with open(self.staging_paths["input"], encoding="utf-8") as f:
nb = nbformat.read(f, as_version=4)

if job.parameters:
nb = add_parameters(nb, job.parameters)

ep = ExecutePreprocessor(
kernel_name=nb.metadata.kernelspec["name"],
store_widget_state=True,
)

# Get the directory of the input file
working_dir = os.path.dirname(os.path.abspath(self.staging_paths["input"]))
JasonWeill marked this conversation as resolved.
Show resolved Hide resolved

try:
ep.preprocess(nb, {"metadata": {"path": working_dir}})
except CellExecutionError as e:
pass
finally:
# Create all desired output files, other than "input" and "tar.gz"
for output_format in job.output_formats:
if output_format == "input" or output_format == "tar.gz":
pass
else:
cls = nbconvert.get_exporter(output_format)
output, resources = cls().from_notebook_node(nb)
f = open(self.staging_paths[output_format], "wb")
f.write(bytes(output, "utf-8"))
f.close()

# Create an archive file of the staging directory for this run
# and everything under it
fh = io.BytesIO()
with tarfile.open(fileobj=fh, mode="w:gz") as tar:
for root, dirs, files in os.walk(working_dir):
for file in files:
with open(os.path.join(root, file)) as f:
output = f.read()
data = bytes(output, "utf-8")
source_f = io.BytesIO(initial_bytes=data)
# TODO: Include relative path?
info = tarfile.TarInfo(file)
info.size = len(data)
tar.addfile(info, source_f)
JasonWeill marked this conversation as resolved.
Show resolved Hide resolved

archive_filepath = self.staging_paths["tar.gz"]
with fsspec.open(archive_filepath, "wb") as f:
f.write(fh.getvalue())
27 changes: 26 additions & 1 deletion jupyter_scheduler/job_files_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,44 @@ def generate_filepaths(self):
if not os.path.exists(output_filepath) or self.redownload:
yield input_filepath, output_filepath

def exclude_output_filepaths(self, members, filepaths):
output_filepaths = [filepath[1] for filepath in filepaths]
for tarinfo in members:
if tarinfo.name not in output_filepaths:
yield tarinfo
JasonWeill marked this conversation as resolved.
Show resolved Hide resolved

def download_tar(self, archive_format: str = "tar"):
archive_filepath = self.staging_paths[archive_format]
read_mode = "r:gz" if archive_format == "tar.gz" else "tar"

# Use the notebook's output filepath for side effect files
side_effect_file_directory = None
output_formats = self.output_formats
JasonWeill marked this conversation as resolved.
Show resolved Hide resolved

filepaths = self.generate_filepaths()

# Take the first output filepath
for _, output_filepath in filepaths:
side_effect_file_directory = os.path.dirname(os.path.abspath(output_filepath))
break
JasonWeill marked this conversation as resolved.
Show resolved Hide resolved

with fsspec.open(archive_filepath) as f:
with tarfile.open(fileobj=f, mode=read_mode) as tar:
filepaths = self.generate_filepaths()
for input_filepath, output_filepath in filepaths:
try:
input_file = tar.extractfile(member=input_filepath)
with fsspec.open(output_filepath, mode="wb") as output_file:
output_file.write(input_file.read())
except Exception as e:
pass
if side_effect_file_directory is not None:
JasonWeill marked this conversation as resolved.
Show resolved Hide resolved
# Extract all files in the tar.gz file,
# other than the output formats listed above
tar.extractall(
side_effect_file_directory,
members=self.exclude_output_filepaths(tar, filepaths),
filter="data",
)
JasonWeill marked this conversation as resolved.
Show resolved Hide resolved

def download(self):
if not self.staging_paths:
Expand Down
38 changes: 36 additions & 2 deletions jupyter_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,8 +709,42 @@ def get_staging_paths(self, model: Union[DescribeJob, DescribeJobDefinition]) ->

output_format = "tar.gz"
filename = create_output_filename(model.input_filename, model.create_time, output_format)
staging_paths[output_format] = os.path.join(self.staging_path, model.job_id, filename)
staging_paths["input"] = os.path.join(self.staging_path, model.job_id, model.input_filename)
staging_paths[output_format] = os.path.join(self.staging_path, id, filename)
staging_paths["input"] = os.path.join(self.staging_path, id, model.input_filename)

return staging_paths


class AllFilesArchivingScheduler(Scheduler):
"""Scheduler that captures all files in output directory in an archive."""
JasonWeill marked this conversation as resolved.
Show resolved Hide resolved

execution_manager_class = TType(
klass="jupyter_scheduler.executors.ExecutionManager",
default_value="jupyter_scheduler.executors.AllFilesArchivingExecutionManager",
config=True,
)

def get_staging_paths(self, model: Union[DescribeJob, DescribeJobDefinition]) -> Dict[str, str]:
staging_paths = {}
if not model:
return staging_paths

id = model.job_id if isinstance(model, DescribeJob) else model.job_definition_id

for output_format in model.output_formats:
filename = create_output_filename(
model.input_filename, model.create_time, output_format
)
# Use the staging directory to capture output files
staging_paths[output_format] = os.path.join(self.staging_path, id, filename)

# Create an output archive file
staging_paths["tar.gz"] = os.path.join(
self.staging_path,
id,
create_output_filename(model.input_filename, model.create_time, "tar.gz"),
)
staging_paths["input"] = os.path.join(self.staging_path, id, model.input_filename)

return staging_paths

Expand Down
Loading