Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disk/memory modes of PersHelper #25

Merged
merged 8 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ Set the Score-P Python bindings arguments. For a documentation of arguments, see

![](doc/pythonBindings_setup.png)

`%%serializer_settings`

Set serializer used for persistence and mode of communicating persistence between notebook and subprocess. Currently available serializers: `dill`, `cloudpickle`; modes of communication: `disk`, `memory`. If no arguments were provided, will print current configuration. Use:
```
%%serializer_settings
SERIALIZER=[dill,cloudpickle]
MODE=[disk,memory]
```

When using persistence in `disk` mode, user can also define directory to which serializer output will be saved with `SCOREP_KERNEL_PERSISTENCE_DIR` environment variable.

## Executing Cells

Expand Down
175 changes: 106 additions & 69 deletions src/scorep_jupyter/kernel.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ def __init__(self, **kwargs):
self.bash_script = None
self.python_script = None

self.pershelper = PersHelper('dill')

os.environ['SCOREP_KERNEL_PERSISTENCE_DIR'] = './'
self.pershelper = PersHelper('dill', 'memory')

def cell_output(self, string, stream='stdout'):
"""
Display string as cell output.
Expand All @@ -58,16 +59,27 @@ def standard_reply(self):
'user_expressions': {},
}

def switch_serializer(self, code):
def serializer_settings(self, code):
"""
Switch serializer backend used for persistence in kernel.
"""
serializer = code.split('\n')[1]
if serializer == 'dill':
self.pershelper = PersHelper('dill')
elif serializer == 'cloudpickle':
self.pershelper = PersHelper('cloudpickle')
self.cell_output(f'Serializer backend switched to {serializer}, persistence was reset.')
# Clean files/pipes before switching
self.pershelper.postprocess()

serializer_match = re.search(r'SERIALIZER=(\w+)', code.split('\n', 1)[1])
mode_match = re.search(r'MODE=(\w+)', code.split('\n', 1)[1])
serializer = serializer_match.group(1) if serializer_match else None
mode = mode_match.group(1) if mode_match else None

if serializer:
if not self.pershelper.set_serializer(serializer):
self.cell_output(f"Serializer '{serializer}' is not recognized, kernel will use '{self.pershelper.serializer}'.", 'stderr')
return self.standard_reply()
if mode:
if not self.pershelper.set_mode(mode):
self.cell_output(f"Serialization mode '{mode}' is not recognized, kernel will use '{self.pershelper.mode}'.", 'stderr')

self.cell_output(f"Kernel uses '{self.pershelper.serializer}' serializer in '{self.pershelper.mode}' mode.")
return self.standard_reply()

def set_scorep_env(self, code):
Expand Down Expand Up @@ -210,87 +222,106 @@ def append_writefile(self, code):
'Python commands without instrumentation recorded.')
return self.standard_reply()

def ghost_cell_error(self, reply_status, error_message):
self.shell.execution_count += 1
reply_status['execution_count'] = self.shell.execution_count - 1
self.pershelper.postprocess()
self.cell_output(error_message, 'stderr')

async def scorep_execute(self, code, silent, store_history=True, user_expressions=None,
allow_stdin=False, *, cell_id=None):
"""
Execute given code with Score-P Python bindings instrumentation.
"""
# Ghost cell - dump current Jupyter session for subprocess
# Run in a "silent" way to not increase cells counter
reply_status_dump = await super().do_execute(self.pershelper.jupyter_dump(), silent, store_history=False,
user_expressions=user_expressions, allow_stdin=allow_stdin, cell_id=cell_id)

if reply_status_dump['status'] != 'ok':
self.shell.execution_count += 1
reply_status_dump['execution_count'] = self.shell.execution_count - 1
self.pershelper.pers_cleanup()
self.cell_output("KernelError: Failed to pickle previous notebook's persistence and variables.",
'stderr')
return reply_status_dump
# Set up files/pipes for persistence communication
if not self.pershelper.preprocess():
self.pershelper.postprocess()
self.cell_output("KernelError: Failed to set up the persistence communication files/pipes.", "stderr")
return self.standard_reply()

# Prepare code for the Score-P instrumented execution as subprocess
# Transmit user persistence and updated sys.path from Jupyter notebook to subprocess
# After running code, transmit subprocess persistence back to Jupyter notebook
with open(scorep_script_name, 'w+') as file:
# After running the code, transmit subprocess persistence back to Jupyter notebook
with open(scorep_script_name, 'w') as file:
file.write(self.pershelper.subprocess_wrapper(code))

# For disk mode use implicit synchronization between kernel and subprocess:
# await jupyter_dump, subprocess.wait(), await jupyter_update
# Ghost cell - dump current Jupyter session for subprocess
# Run in a "silent" way to not increase cells counter
if self.pershelper.mode == 'disk':
reply_status_dump = await super().do_execute(self.pershelper.jupyter_dump(), silent, store_history=False,
user_expressions=user_expressions, allow_stdin=allow_stdin, cell_id=cell_id)
if reply_status_dump['status'] != 'ok':
self.ghost_cell_error(reply_status_dump, "KernelError: Failed to pickle notebook's persistence.")
return reply_status_dump

# Launch subprocess with Jupyter notebook environment
cmd = [PYTHON_EXECUTABLE, "-m", "scorep"] + \
self.scorep_binding_args + [scorep_script_name]
proc_env = self.scorep_env.copy()
proc_env.update({'PATH': os.environ['PATH'], 'PYTHONUNBUFFERED': 'x'}) # scorep path, subprocess observation

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=proc_env)

# For memory mode jupyter_dump and jupyter_update must be awaited
# concurrently to the running subprocess
if self.pershelper.mode == 'memory':
reply_status_dump = await super().do_execute(self.pershelper.jupyter_dump(), silent, store_history=False,
user_expressions=user_expressions, allow_stdin=allow_stdin, cell_id=cell_id)
if reply_status_dump['status'] != 'ok':
self.ghost_cell_error(reply_status_dump, "KernelError: Failed to pickle notebook's persistence.")
return reply_status_dump

# Redirect process stderr to stdout and observe the latter
# Observing two stream with two threads causes interference in cell_output in Jupyter notebook
# stdout is read in chunks, which are split into lines using \r or \n as delimiter
# Last element in the list might be "incomplete line", not ending with \n or \r, it is saved
# and merged with the first line in the next chunk
incomplete_line = ''
endline_pattern = re.compile(r'(.*?[\r\n]|.+$)')
# Empty cell output, required for interactive output e.g. tqdm for-loop progress bar
self.cell_output('\0')
while True:
chunk = b'' + proc.stdout.read(READ_CHUNK_SIZE)
if chunk == b'':
break
chunk = chunk.decode(sys.getdefaultencoding(), errors='ignore')
lines = endline_pattern.findall(chunk)
if len(lines) > 0:
lines[0] = incomplete_line + lines[0]
if lines[-1][-1] not in ['\n', '\r']:
incomplete_line = lines.pop(-1)
else:
incomplete_line = ""
for line in lines:
self.cell_output(line)

# In disk mode, subprocess already terminated after dumping persistence to file
if self.pershelper.mode == 'disk':
if proc.returncode:
self.pershelper.postprocess()
self.cell_output('KernelError: Cell execution failed, cell persistence was not recorded.', 'stderr')
return self.standard_reply()

# os_environ_.clear()
# sys_path_.clear()

with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=proc_env) as proc:
# Redirect process stderr to stdout and observe the latter
# Observing two stream with two threads causes interfence in cell_output in Jupyter notebook
# stdout is read in chunks, which are split into lines using \r or \n as delimeter
# Last element in the list might be "incomplete line", not ending with \n or \r, it is saved
# and merged with the first line in the next chunk

# Empty cell output, required for interactive output e.g. tqdm for-loop progress bar
self.cell_output('\0')
while True:
chunk = b'' + proc.stdout.read(READ_CHUNK_SIZE)
if not chunk:
break
chunk = chunk.decode(sys.getdefaultencoding(), errors='ignore')
lines = endline_pattern.findall(chunk)

if len(lines) > 0:
lines[0] = incomplete_line + lines[0]
if lines[-1][-1] not in ['\n', '\r']:
incomplete_line = lines.pop(-1)
else:
incomplete_line = ""
for line in lines:
self.cell_output(line)

proc.wait()

if proc.returncode:
self.pershelper.pers_cleanup()
self.cell_output(
'KernelError: Cell execution failed, cell persistence and variables are not recorded.',
'stderr')
return self.standard_reply()

# Ghost cell - load subprocess definitions and persistence back to Jupyter notebook
# Ghost cell - load subprocess persistence back to Jupyter notebook
# Run in a "silent" way to not increase cells counter
reply_status_update = await super().do_execute(self.pershelper.jupyter_update(code), silent, store_history=False,
user_expressions=user_expressions, allow_stdin=allow_stdin, cell_id=cell_id)

user_expressions=user_expressions, allow_stdin=allow_stdin, cell_id=cell_id)
if reply_status_update['status'] != 'ok':
self.shell.execution_count += 1
reply_status_update['execution_count'] = self.shell.execution_count - 1
self.pershelper.pers_cleanup()
self.cell_output("KernelError: Failed to load cell's persistence and variables to the notebook.",
'stderr')
self.ghost_cell_error(reply_status_update, "KernelError: Failed to load cell's persistence to the notebook.")
return reply_status_update

# In memory mode, subprocess terminates once jupyter_update is executed and pipe is closed
if self.pershelper.mode == 'memory':
if proc.returncode:
self.pershelper.postprocess()
self.cell_output('KernelError: Cell execution failed, cell persistence was not recorded.', 'stderr')
return self.standard_reply()

self.pershelper.pers_cleanup()
# Determine directory to which trace files were saved by Score-P
if 'SCOREP_EXPERIMENT_DIRECTORY' in self.scorep_env:
scorep_folder = self.scorep_env['SCOREP_EXPERIMENT_DIRECTORY']
self.cell_output(
Expand All @@ -305,6 +336,8 @@ async def scorep_execute(self, code, silent, store_history=True, user_expression
f"Instrumentation results can be found in {os.getcwd()}/{scorep_folder}")
else:
self.cell_output("KernelWarning: Instrumentation results were not saved locally.", 'stderr')

self.pershelper.postprocess()
return self.standard_reply()

async def do_execute(self, code, silent, store_history=False, user_expressions=None,
Expand Down Expand Up @@ -355,8 +388,8 @@ async def do_execute(self, code, silent, store_history=False, user_expressions=N
elif code.startswith('%%enable_multicellmode'):
return self.enable_multicellmode()

elif code.startswith('%%switch_serializer'):
return self.switch_serializer(code)
elif code.startswith('%%serializer_settings'):
return self.serializer_settings(code)
elif code.startswith('%%scorep_env'):
return self.set_scorep_env(code)
elif code.startswith('%%scorep_python_binding_arguments'):
Expand All @@ -369,6 +402,10 @@ async def do_execute(self, code, silent, store_history=False, user_expressions=N
self.pershelper.parse(code, 'jupyter')
return await super().do_execute(code, silent, store_history, user_expressions, allow_stdin, cell_id=cell_id)

def do_shutdown(self, restart):
self.pershelper.postprocess()
return super().do_shutdown(restart)


if __name__ == '__main__':
from ipykernel.kernelapp import IPKernelApp
Expand Down
Loading
Loading