Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add helpers for launching MPI jobs on various platforms #37

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 183 additions & 0 deletions pytools/mpi.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import absolute_import

import abc


def check_for_mpi_relaunch(argv):
if argv[1] != "--mpi-relaunch":
Expand Down Expand Up @@ -29,3 +31,184 @@ def run_with_mpi_ranks(py_script, ranks, callable_, args=(), kwargs=None):
check_call(["mpirun", "-np", str(ranks),
sys.executable, py_script, "--mpi-relaunch", callable_and_args],
env=newenv)


# {{{ MPI executors
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you link this into the documentation? (There might not be an MPI section yet, please create one. :)


class MPIExecutorParams:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dataclass maybe?

"""Collection of parameters to pass to the MPI launcher."""
def __init__(self, num_tasks=None, num_nodes=None, tasks_per_node=None,
gpus_per_task=None):
"""
Possible arguments are:

:arg num_tasks: The number of MPI ranks to launch.
:arg num_nodes: The number of nodes on which to run.
:arg tasks_per_nodes: The number of MPI ranks to launch per node.
:arg gpus_per_task: The number of GPUs to assign per task.

Note: A given executor may not support all of these arguments. If it is
passed an unsupported argument, it will raise an instance of
:class:`MPIExecutorParamError`.
"""
self._create_param_dict(num_tasks=num_tasks, num_nodes=num_nodes,
tasks_per_node=tasks_per_node, gpus_per_task=gpus_per_task)

def _create_param_dict(self, **kwargs):
self.param_dict = {}
for name, value in kwargs.items():
if value is not None:
self.param_dict[name] = value


class MPIExecutorParamError(RuntimeError):
def __init__(self, param_name):
self.param_name = param_name
super().__init__("MPI executor does not support parameter "
+ f"'{self.param_name}'.")


class MPIExecError(RuntimeError):
def __init__(self, exit_code):
self.exit_code = exit_code
super().__init__(f"MPI execution failed with exit code {exit_code}.")


class MPIExecutor(metaclass=abc.ABCMeta):
"""Base class for a general MPI launcher."""
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add

.. automethod:: xxxy

to the the class docstring for each method.

@abc.abstractmethod
def get_mpi_command(self, command, exec_params=None):
"""
Returns a list of strings representing the MPI command that will be executed
to launch *command*.
"""
pass

@abc.abstractmethod
def __call__(self, command, exec_params=None):
"""Executes *command* with MPI."""
pass

def execute(self, code_string, exec_params=None):
"""Executes Python code stored in *code_string* with MPI."""
import sys
return self.__call__([sys.executable, "-m", "mpi4py", "-c", "\'"
+ code_string + "\'"], exec_params)

def check_execute(self, code_string, exec_params=None):
"""
Executes Python code stored in *code_string* with MPI and raises an instance
of :class:`MPIExecError` if the execution fails.
"""
exit_code = self.execute(code_string, exec_params)
if exit_code != 0:
raise MPIExecError(exit_code)

def call(self, func, exec_params=None):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be useful to allow arguments here. Not every user might know about functools.partial.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a convention for passing in the argument list? As a dict? Or something else?

"""Calls *func* with MPI. Note: *func* must be picklable."""
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document exec_params. (Here and elsewhere, by reference "See :meth:\...`` for exec_params" OK.)

import pickle
calling_code = ('import sys; import pickle; pickle.loads(bytes.fromhex("'
+ pickle.dumps(func).hex() + '"))()')
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.hex() is not an efficient encoding. Base64 is better.

return self.execute(calling_code, exec_params)

def check_call(self, func, exec_params=None):
"""
Calls *func* with MPI and raises an instance of :class:`MPIExecError` if
the execution fails. Note: *func* must be picklable.
"""
exit_code = self.call(func, exec_params)
if exit_code != 0:
raise MPIExecError(exit_code)


class BasicMPIExecutor(MPIExecutor):
"""Simple `mpiexec` executor."""
def get_mpi_command(self, command, exec_params=None):
mpi_command = ["mpiexec"]
param_dict = {}
if exec_params is not None:
param_dict = exec_params.param_dict
for name, value in param_dict.items():
if name == "num_tasks":
mpi_command += ["-n", str(value)]
else:
raise MPIExecutorParamError(name)
mpi_command += command
return mpi_command

def __call__(self, command, exec_params=None):
mpi_command = self.get_mpi_command(command, exec_params)
import subprocess
return subprocess.call(" ".join(mpi_command), shell=True)


class SlurmMPIExecutor(MPIExecutor):
"""Executor for Slurm-based platforms."""
def get_mpi_command(self, command, exec_params=None):
mpi_command = ["srun"]
param_dict = {}
if exec_params is not None:
param_dict = exec_params.param_dict
for name, value in param_dict.items():
if name == "num_tasks":
mpi_command += ["-n", str(value)]
elif name == "num_nodes":
mpi_command += ["-N", str(value)]
elif name == "tasks_per_node":
mpi_command += [f"--ntasks-per-node={value}"]
else:
raise MPIExecutorParamError(name)
mpi_command += command
return mpi_command

def __call__(self, command, exec_params=None):
mpi_command = self.get_mpi_command(command, exec_params)
import subprocess
return subprocess.call(" ".join(mpi_command), shell=True)


class LCLSFMPIExecutor(MPIExecutor):
"""Executor for Livermore wrapper around IBM LSF."""
def get_mpi_command(self, command, exec_params=None):
mpi_command = ["lrun"]
param_dict = {}
if exec_params is not None:
param_dict = exec_params.param_dict
for name, value in param_dict.items():
if name == "num_tasks":
mpi_command += ["-n", str(value)]
elif name == "num_nodes":
mpi_command += ["-N", str(value)]
elif name == "tasks_per_node":
mpi_command += ["-T", str(value)]
elif name == "gpus_per_task":
mpi_command += ["-g", str(value)]
else:
raise MPIExecutorParamError(name)
mpi_command += command
return mpi_command

def __call__(self, command, exec_params=None):
mpi_command = self.get_mpi_command(command, exec_params)
import subprocess
return subprocess.call(" ".join(mpi_command), shell=True)


def make_mpi_executor(executor_type_name):
"""
Returns an instance of a class derived from :class:`MPIExecutor` given an
executor type name as input.

:arg executor_type_name: The executor type name. Can be one of `'basic'`,
`'slurm'`, or `'lclsf'`.
"""
type_name_map = {
"basic": BasicMPIExecutor,
"slurm": SlurmMPIExecutor,
"lclsf": LCLSFMPIExecutor
}
return type_name_map[executor_type_name]()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to create some rudimentary auto-detection facility (which can be overridden by a documented env var)?


# }}}

# vim: foldmethod=marker
116 changes: 116 additions & 0 deletions test/test_mpi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from __future__ import division
from __future__ import absolute_import

from pytools.mpi import ( # noqa
make_mpi_executor,
MPIExecError,
MPIExecutorParams,
MPIExecutorParamError)

import pytest
import os
from functools import partial


def get_test_mpi_executor():
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these tests run as part of CI? If not I'd like them to.

if "MPI_EXECUTOR_TYPE" not in os.environ:
pytest.skip("No MPI executor specified.")
return make_mpi_executor(os.environ["MPI_EXECUTOR_TYPE"])


@pytest.mark.parametrize("num_tasks", [1, 2])
def test_mpi_launch(num_tasks):
pytest.importorskip("mpi4py")

mpi_exec = get_test_mpi_executor()

exit_code = mpi_exec(["true"], exec_params=MPIExecutorParams(
num_tasks=num_tasks))
assert exit_code == 0

exit_code = mpi_exec(["false"], exec_params=MPIExecutorParams(
num_tasks=num_tasks))
assert exit_code != 0


@pytest.mark.parametrize("num_tasks", [1, 2])
def test_mpi_execute(num_tasks):
pytest.importorskip("mpi4py")

mpi_exec = get_test_mpi_executor()

exit_code = mpi_exec.execute("from mpi4py import MPI; "
+ "MPI.COMM_WORLD.Barrier(); assert True",
exec_params=MPIExecutorParams(num_tasks=num_tasks))
assert exit_code == 0

exit_code = mpi_exec.execute("from mpi4py import MPI; "
+ "MPI.COMM_WORLD.Barrier(); assert False",
exec_params=MPIExecutorParams(num_tasks=num_tasks))
assert exit_code != 0


@pytest.mark.parametrize("num_tasks", [1, 2])
def test_mpi_check_execute(num_tasks):
pytest.importorskip("mpi4py")

mpi_exec = get_test_mpi_executor()

mpi_exec.check_execute("from mpi4py import MPI; "
+ "MPI.COMM_WORLD.Barrier(); assert True",
exec_params=MPIExecutorParams(num_tasks=num_tasks))

with pytest.raises(MPIExecError):
mpi_exec.check_execute("from mpi4py import MPI; "
+ "MPI.COMM_WORLD.Barrier(); assert False",
exec_params=MPIExecutorParams(num_tasks=num_tasks))


def _test_mpi_func(arg):
from mpi4py import MPI
MPI.COMM_WORLD.Barrier()
assert arg == "hello"


@pytest.mark.parametrize("num_tasks", [1, 2])
def test_mpi_call(num_tasks):
pytest.importorskip("mpi4py")

mpi_exec = get_test_mpi_executor()

exit_code = mpi_exec.call(partial(_test_mpi_func, "hello"),
exec_params=MPIExecutorParams(num_tasks=num_tasks))
assert exit_code == 0

exit_code = mpi_exec.call(partial(_test_mpi_func, "goodbye"),
exec_params=MPIExecutorParams(num_tasks=num_tasks))
assert exit_code != 0


@pytest.mark.parametrize("num_tasks", [1, 2])
def test_mpi_check_call(num_tasks):
pytest.importorskip("mpi4py")

mpi_exec = get_test_mpi_executor()

mpi_exec.check_call(partial(_test_mpi_func, "hello"),
exec_params=MPIExecutorParams(num_tasks=num_tasks))

with pytest.raises(MPIExecError):
mpi_exec.check_call(partial(_test_mpi_func, "goodbye"),
exec_params=MPIExecutorParams(num_tasks=num_tasks))


def test_mpi_unsupported_param():
pytest.importorskip("mpi4py")

mpi_exec = get_test_mpi_executor()

try:
mpi_exec.call(partial(_test_mpi_func, "hello"),
exec_params=MPIExecutorParams(num_tasks=2, gpus_per_task=1))
pytest.skip("Oops. Unsupported param is actually supported.")
except MPIExecutorParamError as e:
assert e.param_name == "gpus_per_task"

# }}}