Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Input files packaging #510

Merged
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3ca4cef
package input files and folders (backend)
andrii-i Mar 14, 2024
3b5f74e
package input files and folders (frontend)
andrii-i Mar 14, 2024
eaed794
remove "input_dir" from staging_paths dict
andrii-i Mar 14, 2024
292b445
ensure execution context matches the notebook directory
andrii-i Mar 18, 2024
324f043
update snapshots
andrii-i Mar 18, 2024
92f01b9
copy staging folder to output folder after job runs (SUCESS or FAILURE)
andrii-i Mar 27, 2024
e3cf95a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 27, 2024
ee89d9e
copy staging folder and side effects to output after job runs, track …
andrii-i Apr 2, 2024
f0fd372
remove staging to output copying logic from executor
andrii-i Apr 3, 2024
dcc096b
refactor output files creation logic into a separate function for cla…
andrii-i Apr 10, 2024
3a9ac88
Fix job definition data model
andrii-i Apr 10, 2024
2a6da0e
add packaged_files to JobDefinition and DescribeJobDefinition model
andrii-i Apr 11, 2024
6ec35f8
fix existing pytests
andrii-i Apr 24, 2024
d9936f7
clarify FilesDirectoryLink title
andrii-i Apr 23, 2024
ff7cc25
Dynamically display input folder in the checkbox text
andrii-i Apr 23, 2024
e2596eb
display packageInputFolder parameter as 'Files included'
andrii-i Apr 23, 2024
51a0e55
use helper text with input directory for 'include files' checkbox
andrii-i Apr 24, 2024
fb65360
Update Playwright Snapshots
github-actions[bot] Apr 24, 2024
6720e84
add test side effects accountability test for execution manager
andrii-i Apr 25, 2024
a56a740
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 25, 2024
2a83057
Use "Run job with input folder" for packageInputFolder checkbox text
andrii-i Apr 25, 2024
f2318b1
Update Playwright Snapshots
github-actions[bot] Apr 25, 2024
7ff7e50
Use "Ran with input folder" in detail page
andrii-i Apr 25, 2024
ad43bde
Update src/components/input-folder-checkbox.tsx
andrii-i Apr 25, 2024
5da9b26
fix lint error
andrii-i Apr 25, 2024
1452f1f
Update Playwright Snapshots
github-actions[bot] Apr 25, 2024
343f403
Update existing screenshots
andrii-i Apr 26, 2024
22483a6
Update "Submit the Create Job" section mentioning “Run job with input…
andrii-i Apr 26, 2024
5497f08
Update docs/users/index.md
andrii-i Apr 26, 2024
9ef28ee
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 26, 2024
abc5085
Update src/components/input-folder-checkbox.tsx
andrii-i Apr 26, 2024
247e44d
Update Playwright Snapshots
github-actions[bot] Apr 26, 2024
106bdc8
Describe side effects behavior better
andrii-i Apr 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified docs/users/images/create_job_form.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions docs/users/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ To create a _job_ or _job definition_ from an open notebook, click on a “Creat

Give your notebook job or job definition a name, choose an environment to run it in, select its output formats, and provide parameters that are set as local variables when your notebook gets executed. This parameterized execution is similar to the one used in [Papermill](https://papermill.readthedocs.io/en/latest/).

Select “Run job with input folder” to ensure the scheduled job will have access to all files within the same folder as the input file.
andrii-i marked this conversation as resolved.
Show resolved Hide resolved

To create a _job_ that runs once, select "Run now" in the "Schedule" section, and click "Create".
!["Create Job Form"](./images/create_job_form.png)

Expand Down
42 changes: 34 additions & 8 deletions jupyter_scheduler/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,47 @@ def execute(self):
if job.parameters:
nb = add_parameters(nb, job.parameters)

staging_dir = os.path.dirname(self.staging_paths["input"])
ep = ExecutePreprocessor(
kernel_name=nb.metadata.kernelspec["name"],
store_widget_state=True,
kernel_name=nb.metadata.kernelspec["name"], store_widget_state=True, cwd=staging_dir
)

try:
ep.preprocess(nb)
ep.preprocess(nb, {"metadata": {"path": staging_dir}})
except CellExecutionError as e:
raise e
finally:
for output_format in job.output_formats:
cls = nbconvert.get_exporter(output_format)
output, resources = cls().from_notebook_node(nb)
with fsspec.open(self.staging_paths[output_format], "w", encoding="utf-8") as f:
f.write(output)
self.add_side_effects_files(staging_dir)
self.create_output_files(job, nb)

def add_side_effects_files(self, staging_dir: str):
"""Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files"""
input_notebook = os.path.relpath(self.staging_paths["input"])
new_files_set = set()
for root, _, files in os.walk(staging_dir):
for file in files:
file_rel_path = os.path.relpath(os.path.join(root, file), staging_dir)
if file_rel_path != input_notebook:
new_files_set.add(file_rel_path)

if new_files_set:
with self.db_session() as session:
current_packaged_files_set = set(
session.query(Job.packaged_files).filter(Job.job_id == self.job_id).scalar()
or []
)
updated_packaged_files = list(current_packaged_files_set.union(new_files_set))
session.query(Job).filter(Job.job_id == self.job_id).update(
{"packaged_files": updated_packaged_files}
)
session.commit()

def create_output_files(self, job: DescribeJob, notebook_node):
for output_format in job.output_formats:
cls = nbconvert.get_exporter(output_format)
output, _ = cls().from_notebook_node(notebook_node)
with fsspec.open(self.staging_paths[output_format], "w", encoding="utf-8") as f:
f.write(output)

def supported_features(cls) -> Dict[JobFeature, bool]:
return {
Expand Down
13 changes: 11 additions & 2 deletions jupyter_scheduler/job_files_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals
job = await ensure_async(self.scheduler.get_job(job_id, False))
staging_paths = await ensure_async(self.scheduler.get_staging_paths(job))
output_filenames = self.scheduler.get_job_filenames(job)
output_dir = self.scheduler.get_local_output_path(job)
output_dir = self.scheduler.get_local_output_path(model=job, root_dir_relative=True)

p = Process(
target=Downloader(
Expand All @@ -30,6 +30,7 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals
staging_paths=staging_paths,
output_dir=output_dir,
redownload=redownload,
include_staging_files=job.package_input_folder,
).download
)
p.start()
Expand All @@ -43,22 +44,30 @@ def __init__(
staging_paths: Dict[str, str],
output_dir: str,
redownload: bool,
include_staging_files: bool = False,
):
self.output_formats = output_formats
self.output_filenames = output_filenames
self.staging_paths = staging_paths
self.output_dir = output_dir
self.redownload = redownload
self.include_staging_files = include_staging_files

def generate_filepaths(self):
"""A generator that produces filepaths"""
output_formats = self.output_formats + ["input"]

for output_format in output_formats:
input_filepath = self.staging_paths[output_format]
output_filepath = os.path.join(self.output_dir, self.output_filenames[output_format])
if not os.path.exists(output_filepath) or self.redownload:
yield input_filepath, output_filepath
if self.include_staging_files:
staging_dir = os.path.dirname(self.staging_paths["input"])
for file_relative_path in self.output_filenames["files"]:
input_filepath = os.path.join(staging_dir, file_relative_path)
output_filepath = os.path.join(self.output_dir, file_relative_path)
if not os.path.exists(output_filepath) or self.redownload:
yield input_filepath, output_filepath

def download_tar(self, archive_format: str = "tar"):
archive_filepath = self.staging_paths[archive_format]
Expand Down
6 changes: 6 additions & 0 deletions jupyter_scheduler/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class CreateJob(BaseModel):
name: str
output_filename_template: Optional[str] = OUTPUT_FILENAME_TEMPLATE
compute_type: Optional[str] = None
package_input_folder: Optional[bool] = None

@root_validator
def compute_input_filename(cls, values) -> Dict:
Expand Down Expand Up @@ -145,6 +146,8 @@ class DescribeJob(BaseModel):
status: Status = Status.CREATED
status_message: Optional[str] = None
downloaded: bool = False
package_input_folder: Optional[bool] = None
packaged_files: Optional[List[str]] = []

class Config:
orm_mode = True
Expand Down Expand Up @@ -209,6 +212,7 @@ class CreateJobDefinition(BaseModel):
compute_type: Optional[str] = None
schedule: Optional[str] = None
timezone: Optional[str] = None
package_input_folder: Optional[bool] = None

@root_validator
def compute_input_filename(cls, values) -> Dict:
Expand All @@ -234,6 +238,8 @@ class DescribeJobDefinition(BaseModel):
create_time: int
update_time: int
active: bool
package_input_folder: Optional[bool] = None
packaged_files: Optional[List[str]] = []

class Config:
orm_mode = True
Expand Down
2 changes: 2 additions & 0 deletions jupyter_scheduler/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class CommonColumns:
output_filename_template = Column(String(256))
update_time = Column(Integer, default=get_utc_timestamp, onupdate=get_utc_timestamp)
create_time = Column(Integer, default=get_utc_timestamp)
package_input_folder = Column(Boolean)
packaged_files = Column(JsonType, default=[])


class Job(CommonColumns, Base):
Expand Down
107 changes: 96 additions & 11 deletions jupyter_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import random
import shutil
from typing import Dict, Optional, Type, Union
from typing import Dict, List, Optional, Type, Union

import fsspec
import psutil
Expand Down Expand Up @@ -39,7 +39,11 @@
UpdateJobDefinition,
)
from jupyter_scheduler.orm import Job, JobDefinition, create_session
from jupyter_scheduler.utils import create_output_directory, create_output_filename
from jupyter_scheduler.utils import (
copy_directory,
create_output_directory,
create_output_filename,
)


class BaseScheduler(LoggingConfigurable):
Expand Down Expand Up @@ -248,7 +252,29 @@ def file_exists(self, path: str):
else:
return os.path.isfile(os_path)

def get_job_filenames(self, model: DescribeJob) -> Dict[str, str]:
def dir_exists(self, path: str):
"""Returns True if the directory exists, else returns False.

API-style wrapper for os.path.isdir

Parameters
----------
path : string
The relative path to the directory (with '/' as separator)

Returns
-------
exists : bool
Whether the directory exists.
"""
root = os.path.abspath(self.root_dir)
os_path = to_os_path(path, root)
if not (os.path.abspath(os_path) + os.path.sep).startswith(root):
return False
else:
return os.path.isdir(os_path)

def get_job_filenames(self, model: DescribeJob) -> Dict[str, Union[str, List[str]]]:
"""Returns dictionary mapping output formats to
the job filenames in the JupyterLab workspace.

Expand All @@ -265,7 +291,8 @@ def get_job_filenames(self, model: DescribeJob) -> Dict[str, str]:
{
'ipynb': 'helloworld-2022-10-10.ipynb',
'html': 'helloworld-2022-10-10.html',
'input': 'helloworld.ipynb'
'input': 'helloworld.ipynb',
'files': ['data/helloworld.csv', 'images/helloworld.png']
}

"""
Expand All @@ -278,6 +305,9 @@ def get_job_filenames(self, model: DescribeJob) -> Dict[str, str]:

filenames["input"] = model.input_filename

if model.package_input_folder and model.packaged_files:
filenames["files"] = [relative_path for relative_path in model.packaged_files]

return filenames

def add_job_files(self, model: DescribeJob):
Expand All @@ -289,7 +319,8 @@ def add_job_files(self, model: DescribeJob):
mapping = self.environments_manager.output_formats_mapping()
job_files = []
output_filenames = self.get_job_filenames(model)
output_dir = os.path.relpath(self.get_local_output_path(model), self.root_dir)
output_dir = self.get_local_output_path(model, root_dir_relative=True)

for output_format in model.output_formats:
filename = output_filenames[output_format]
output_path = os.path.join(output_dir, filename)
Expand All @@ -313,16 +344,42 @@ def add_job_files(self, model: DescribeJob):
)
)

# Add link to output folder with packaged input files and side effects
if model.package_input_folder and model.packaged_files:
job_files.append(
JobFile(
display_name="Files",
file_format="files",
file_path=output_dir if self.dir_exists(output_dir) else None,
)
)

model.job_files = job_files
model.downloaded = all(job_file.file_path for job_file in job_files)

def get_local_output_path(self, model: DescribeJob) -> str:
packaged_files = []
if model.package_input_folder and model.packaged_files:
packaged_files = [
os.path.join(output_dir, packaged_file_rel_path)
for packaged_file_rel_path in model.packaged_files
]
model.downloaded = all(job_file.file_path for job_file in job_files) and all(
self.file_exists(file_path) for file_path in packaged_files
)

def get_local_output_path(
self, model: DescribeJob, root_dir_relative: Optional[bool] = False
) -> str:
"""Returns the local output directory path
where all the job files will be downloaded
from the staging location.
"""
output_dir_name = create_output_directory(model.input_filename, model.job_id)
return os.path.join(self.root_dir, self.output_directory, output_dir_name)
if root_dir_relative:
return os.path.relpath(
os.path.join(self.root_dir, self.output_directory, output_dir_name), self.root_dir
)
else:
return os.path.join(self.root_dir, self.output_directory, output_dir_name)


class Scheduler(BaseScheduler):
Expand Down Expand Up @@ -371,6 +428,15 @@ def copy_input_file(self, input_uri: str, copy_to_path: str):
with fsspec.open(copy_to_path, "wb") as output_file:
output_file.write(input_file.read())

def copy_input_folder(self, input_uri: str, nb_copy_to_path: str) -> List[str]:
"""Copies the input file along with the input directory to the staging directory, returns the list of copied files relative to the staging directory"""
input_dir_path = os.path.dirname(os.path.join(self.root_dir, input_uri))
staging_dir = os.path.dirname(nb_copy_to_path)
return copy_directory(
source_dir=input_dir_path,
destination_dir=staging_dir,
)

def create_job(self, model: CreateJob) -> str:
if not model.job_definition_id and not self.file_exists(model.input_uri):
raise InputUriError(model.input_uri)
Expand All @@ -397,11 +463,20 @@ def create_job(self, model: CreateJob) -> str:
model.output_formats = []

job = Job(**model.dict(exclude_none=True, exclude={"input_uri"}))

session.add(job)
session.commit()

staging_paths = self.get_staging_paths(DescribeJob.from_orm(job))
self.copy_input_file(model.input_uri, staging_paths["input"])
if model.package_input_folder:
copied_files = self.copy_input_folder(model.input_uri, staging_paths["input"])
input_notebook_filename = os.path.basename(model.input_uri)
job.packaged_files = [
file for file in copied_files if file != input_notebook_filename
]
session.commit()
else:
self.copy_input_file(model.input_uri, staging_paths["input"])

# The MP context forces new processes to not be forked on Linux.
# This is necessary because `asyncio.get_event_loop()` is bugged in
Expand Down Expand Up @@ -538,12 +613,22 @@ def create_job_definition(self, model: CreateJobDefinition) -> str:
session.add(job_definition)
session.commit()

# copy values for use after session is closed to avoid DetachedInstanceError
job_definition_id = job_definition.job_definition_id
job_definition_schedule = job_definition.schedule

staging_paths = self.get_staging_paths(DescribeJobDefinition.from_orm(job_definition))
self.copy_input_file(model.input_uri, staging_paths["input"])
if model.package_input_folder:
copied_files = self.copy_input_folder(model.input_uri, staging_paths["input"])
input_notebook_filename = os.path.basename(model.input_uri)
job_definition.packaged_files = [
file for file in copied_files if file != input_notebook_filename
]
session.commit()
else:
self.copy_input_file(model.input_uri, staging_paths["input"])

if self.task_runner and job_definition.schedule:
if self.task_runner and job_definition_schedule:
self.task_runner.add_job_definition(job_definition_id)

return job_definition_id
Expand Down
50 changes: 50 additions & 0 deletions jupyter_scheduler/tests/test_execution_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import os
from contextlib import contextmanager
from pathlib import Path
from unittest.mock import PropertyMock, patch

import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from jupyter_scheduler.executors import DefaultExecutionManager
from jupyter_scheduler.orm import Base, Job

NOTEBOOK_DIR = Path(__file__).resolve().parent / "test_staging_dir" / "job-3"
NOTEBOOK_NAME = "side_effects.ipynb"
NOTEBOOK_PATH = NOTEBOOK_DIR / NOTEBOOK_NAME
SIDE_EFFECT_FILE = NOTEBOOK_DIR / "output_side_effect.txt"


def test_execution_manager_with_side_effects():
db_url = "sqlite://"
engine = create_engine(db_url, echo=False)
Base.metadata.create_all(engine)
db_session = sessionmaker(bind=engine)
with db_session() as session:
job = Job(
runtime_environment_name="abc",
input_filename=NOTEBOOK_NAME,
job_id="123",
)
session.add(job)
session.commit()

manager = DefaultExecutionManager(
job_id="123",
root_dir=str(NOTEBOOK_DIR),
db_url=db_url,
staging_paths={"input": str(NOTEBOOK_PATH)},
)

with patch.object(
DefaultExecutionManager,
"db_session",
new_callable=PropertyMock,
) as mock_db_session:
mock_db_session.return_value = db_session
manager.add_side_effects_files(str(NOTEBOOK_DIR))

assert (
"output_side_effect.txt" in job.packaged_files
), "Side effect file was not added to packaged_files"
Loading
Loading