Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disk/memory modes of PersHelper #25

Merged
merged 8 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 16 additions & 40 deletions src/scorep_jupyter/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,15 @@ def switch_serializer(self, code):
"""
Switch serializer backend used for persistence in kernel.
"""
# clean pipes before switching
# Clean files/pipes before switching
self.pershelper.postprocess()

serializer = code.split('\n')[1]
if serializer == 'dill':
self.pershelper = PersHelper('dill')
elif serializer == 'cloudpickle':
self.pershelper = PersHelper('cloudpickle')

self.cell_output(f'Serializer backend switched to {serializer}, persistence was reset.')
if serializer in ['dill', 'cloudpickle']:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have something non-static here, e.g. a config file so that we could support user-defined serializer as well? In the future, we don't want to change the code to plugin an alternative serializer.

self.pershelper = PersHelper(serializer)
self.cell_output(f'Serializer backend switched to {serializer}, persistence was reset.')
else:
self.cell_output(f'KernelError: {serializer} serializer backend is not supported.', 'stderr')
return self.standard_reply()

def set_scorep_env(self, code):
Expand Down Expand Up @@ -223,14 +222,15 @@ async def scorep_execute(self, code, silent, store_history=True, user_expression
"""
Execute given code with Score-P Python bindings instrumentation.
"""
# set up pipes for the communication
# Set up files/pipes for persistence communication
if not self.pershelper.preprocess():
self.cell_output("Error setting up the communication. Please try again. Aborting.", "stderr")
self.pershelper.postprocess()
self.cell_output("KernelError: Failed to set up the persistence communication files/pipes.", "stderr")
return self.standard_reply()

# Prepare code for the Score-P instrumented execution as subprocess
# Transmit user persistence and updated sys.path from Jupyter notebook to subprocess
# After running code, transmit subprocess persistence back to Jupyter notebook
# After running the code, transmit subprocess persistence back to Jupyter notebook
with open(scorep_script_name, 'w') as file:
file.write(self.pershelper.subprocess_wrapper(code))

Expand All @@ -239,45 +239,29 @@ async def scorep_execute(self, code, silent, store_history=True, user_expression
self.scorep_binding_args + [scorep_script_name]
proc_env = self.scorep_env.copy()
proc_env.update({'PATH': os.environ['PATH'], 'PYTHONUNBUFFERED': 'x'}) # scorep path, subprocess observation

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=proc_env)

# Ghost cell - dump current Jupyter session for subprocess
# Run in a "silent" way to not increase cells counter
#self.cell_output("Subprocess launched")
#self.cell_output(self.pershelper.jupyter_dump())


reply_status_dump = await super().do_execute(self.pershelper.jupyter_dump(), silent, store_history=False,
user_expressions=user_expressions, allow_stdin=allow_stdin, cell_id=cell_id)
if reply_status_dump['status'] != 'ok':
self.shell.execution_count += 1
reply_status_dump['execution_count'] = self.shell.execution_count - 1
self.pershelper.postprocess()
self.cell_output("KernelError: Failed to pickle previous notebook's persistence and variables.",
'stderr')
self.cell_output("KernelError: Failed to pickle notebook's persistence.", 'stderr')
return reply_status_dump

# Redirect process stderr to stdout and observe the latter
# Observing two stream with two threads causes interference in cell_output in Jupyter notebook
# stdout is read in chunks, which are split into lines using \r or \n as delimiter
# Last element in the list might be "incomplete line", not ending with \n or \r, it is saved
# and merged with the first line in the next chunk

if os.environ['SCOREP_KERNEL_PERSISTENCE_MODE'] == 'MEMORY':
# connect to the communication pipe in case of in-memory communication for persistence
flags = os.O_RDONLY | os.O_NONBLOCK
comm_pipe =os.open(self.pershelper.paths['subprocess']['comm'], flags=flags)

incomplete_line = ''
endline_pattern = re.compile(r'(.*?[\r\n]|.+$)')
# Empty cell output, required for interactive output e.g. tqdm for-loop progress bar
self.cell_output('\0')
while True:
if os.environ['SCOREP_KERNEL_PERSISTENCE_MODE'] == 'MEMORY':
comm_chunk = os.read(comm_pipe, 1)
if comm_chunk != b'':
# for pipe communication, we break as soon as os we get notified by subprocess via comm pipe
break
chunk = b'' + proc.stdout.read(READ_CHUNK_SIZE)
if chunk == b'':
break
Expand All @@ -293,31 +277,24 @@ async def scorep_execute(self, code, silent, store_history=True, user_expression
self.cell_output(line)

# os_environ_.clear()

# sys_path_.clear()

# Ghost cell - load subprocess definitions and persistence back to Jupyter notebook
# Ghost cell - load subprocess persistence back to Jupyter notebook
# Run in a "silent" way to not increase cells counter
reply_status_update = await super().do_execute(self.pershelper.jupyter_update(code), silent, store_history=False,
user_expressions=user_expressions, allow_stdin=allow_stdin, cell_id=cell_id)

if reply_status_update['status'] != 'ok':
# tidy up
self.pershelper.postprocess()
self.shell.execution_count += 1
reply_status_update['execution_count'] = self.shell.execution_count - 1
self.cell_output("KernelError: Failed to load cell's persistence and variables to the notebook.",
'stderr')
self.cell_output("KernelError: Failed to load cell's persistence to the notebook.", 'stderr')
return reply_status_update

if proc.returncode:
# tidy up
self.pershelper.postprocess()
self.cell_output(
'KernelError: Cell execution failed, cell persistence and variables are not recorded.',
'stderr')
self.cell_output('KernelError: Cell execution failed, cell persistence was not recorded.', 'stderr')
return self.standard_reply()

# Determine directory to which trace files were saved by Score-P
if 'SCOREP_EXPERIMENT_DIRECTORY' in self.scorep_env:
scorep_folder = self.scorep_env['SCOREP_EXPERIMENT_DIRECTORY']
self.cell_output(
Expand All @@ -333,7 +310,6 @@ async def scorep_execute(self, code, silent, store_history=True, user_expression
else:
self.cell_output("KernelWarning: Instrumentation results were not saved locally.", 'stderr')

# tidy up
self.pershelper.postprocess()
return self.standard_reply()

Expand Down
Loading