Skip to content

Commit

Permalink
WIP: Removes zip type, incremental work for archiving work dir
Browse files Browse the repository at this point in the history
  • Loading branch information
JasonWeill committed Aug 7, 2023
1 parent 58eba46 commit 5ac1f74
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 20 deletions.
4 changes: 2 additions & 2 deletions jupyter_scheduler/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def manage_environments_command(self) -> str:
return ""

def output_formats_mapping(self) -> Dict[str, str]:
return {"ipynb": "Notebook", "html": "HTML", "zip": "Zip"}
return {"ipynb": "Notebook", "html": "HTML"}


class StaticEnvironmentManager(EnvironmentManager):
Expand All @@ -90,7 +90,7 @@ def manage_environments_command(self) -> str:
return ""

def output_formats_mapping(self) -> Dict[str, str]:
return {"ipynb": "Notebook", "html": "HTML", "zip": "Zip"}
return {"ipynb": "Notebook", "html": "HTML"}


class EnvironmentRetrievalError(Exception):
Expand Down
37 changes: 24 additions & 13 deletions jupyter_scheduler/executors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import io
import os
import shutil
import tarfile
import traceback
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -226,8 +225,8 @@ def execute(self):


class AllFilesArchivingExecutionManager(DefaultExecutionManager):
"""Execution manager that, for automated runs, archives all output files
in and under the output directory into a single zip file
"""Execution manager that archives all output files in and under the
output directory into a single archive file
Notes
-----
Expand All @@ -254,9 +253,9 @@ def execute(self):
except CellExecutionError as e:
pass
finally:
# Create all desired output files, other than "input" and "zip"
# Create all desired output files, other than "input" and "tar.gz"
for output_format in job.output_formats:
if output_format == "input" or output_format == "zip":
if output_format == "input" or output_format == "tar.gz":
pass
else:
cls = nbconvert.get_exporter(output_format)
Expand All @@ -266,12 +265,24 @@ def execute(self):
f.close()
print(f"Wrote file {self.staging_paths[output_format]}\n")

if "zip" in job.output_formats:
# For automated runs, create a zip file of the current directory
# and everything under it
staging_dir = "."
# Create an archive file of the staging directory for this run
# and everything under it
fh = io.BytesIO()
with tarfile.open(fileobj=fh, mode="w:gz") as tar:
# Get the directory of the input file
working_dir = os.path.dirname(os.path.abspath(self.staging_paths["input"]))

for root, dirs, files in os.walk(working_dir):
for file in files:
with open(os.path.join(root, file)) as f:
output = f.read()
data = bytes(output, "utf-8")
source_f = io.BytesIO(initial_bytes=data)
# TODO: Include relative path?
info = tarfile.TarInfo(file)
info.size = len(data)
tar.addfile(info, source_f)

# Truncate '.zip' off the end of the filename
basename = self.staging_paths["zip"][:-4]
shutil.make_archive(basename, "zip", staging_dir)
print(f"Wrote zip file {basename}.zip\n")
archive_filepath = self.staging_paths["tar.gz"]
with fsspec.open(archive_filepath, "wb") as f:
f.write(fh.getvalue())
15 changes: 10 additions & 5 deletions jupyter_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ def get_staging_paths(self, model: Union[DescribeJob, DescribeJobDefinition]) ->


class AllFilesArchivingScheduler(Scheduler):
"""Scheduler that, for scheduled runs, captures all files in output directory in a zip file."""
"""Scheduler that captures all files in output directory in an archive."""

execution_manager_class = TType(
klass="jupyter_scheduler.executors.ExecutionManager",
Expand All @@ -735,12 +735,17 @@ def get_staging_paths(self, model: Union[DescribeJob, DescribeJobDefinition]) ->
filename = create_output_filename(
model.input_filename, model.create_time, output_format
)
staging_paths[output_format] = filename
# Use the staging directory to capture output files
staging_paths[output_format] = os.path.join(
self.staging_path,
id,
filename
)

# Create an output zip file for automated runs
# Create an output archive file for automated runs
if isinstance(model, DescribeJob) and model.job_definition_id is not None:
staging_paths["zip"] = create_output_filename(
model.input_filename, model.create_time, "zip"
staging_paths["tar.gz"] = create_output_filename(
model.input_filename, model.create_time, "tar.gz"
)
staging_paths["input"] = os.path.join(self.staging_path, id, model.input_filename)

Expand Down

0 comments on commit 5ac1f74

Please sign in to comment.