Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ghost corner cells #161

Merged
merged 5 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion sopht_mpi/simulator/flow/flow_simulators_mpi_2d.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,12 @@ def init_mpi(self):
real_t=self.real_t,
rank_distribution=self.rank_distribution,
)
# Unbounded (i.e. non periodic) 2D flow solver with mpi4py-fft (only slabs
# decomp) does not need full exchange in ghost communicator.
self.mpi_ghost_exchange_communicator = MPIGhostCommunicator2D(
ghost_size=self.ghost_size, mpi_construct=self.mpi_construct
ghost_size=self.ghost_size,
mpi_construct=self.mpi_construct,
full_exchange=False,
)

def init_domain(self):
Expand Down
249 changes: 219 additions & 30 deletions sopht_mpi/utils/mpi_utils_2d.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(
self,
grid_size_y,
grid_size_x,
periodic_flag=False,
periodic_domain=False,
real_t=np.float64,
rank_distribution=None,
):
Expand All @@ -27,6 +27,7 @@ def __init__(
# Set the MPI dtype generator based on precision
self.dtype_generator = MPI.FLOAT if real_t == np.float32 else MPI.DOUBLE
# Setup MPI environment
self.periodic_domain = periodic_domain
self.world = MPI.COMM_WORLD
# Automatically create topologies
if rank_distribution is None:
Expand All @@ -37,8 +38,10 @@ def __init__(
else:
self.rank_distribution = rank_distribution
if 1 not in self.rank_distribution:
raise ValueError(
f"Rank distribution {self.rank_distribution} needs to be"
# Log warning here for more generic use.
# mpi4py-fft will take care of throwing errors if fft is invoked later.
logger.warning(
f"Rank distribution {self.rank_distribution} needs to be "
"aligned in at least one direction for fft"
)
self.grid_topology = MPI.Compute_dims(
Expand All @@ -60,7 +63,7 @@ def __init__(

# Create Cartesian grid communicator
self.grid = self.world.Create_cart(
self.grid_topology, periods=periodic_flag, reorder=False
self.grid_topology, periods=self.periodic_domain, reorder=False
)
# Determine neighbours in all directions
self.previous_grid_along = np.zeros(self.grid_dim).astype(int)
Expand All @@ -84,13 +87,36 @@ def __init__(
class MPIGhostCommunicator2D:
"""
Class exclusive for ghost communication across ranks, initialises data types
that will be used for comm. in both blocking and non-blocking styles. Builds
dtypes based on ghost_size (determined from stencil width of the kernel)
This class wont be seen by the user, rather based on stencils we determine
the properties here.
for communication based on ghost_size (determined from stencil width of the kernel).

Here we refer the side and corners ghost cells following the terminologies for
polygons (https://en.wikipedia.org/wiki/Polygon) as below:
"The segments of a polygonal circuit are called its edges or sides. The points where
two edges meet are the polygon's vertices (singular: vertex) or corners."

v e e e e e v <- vertex (corner)
e x x x x x e
e x x x x x e <- edge (side)
e x x x x x e
v e e e e e v

(v) vertex (corner) ghost cell
(e) edge (side) ghost cell
(x) inner non-ghost cell

Note
---
`full_exchange` allows for exchange both the edges (sides) and the vertex (corners)
ghost cells of the local domain (see illustration above). The full exchange mode is
required when Eulerian-to-Lagrangian (structured-to-unstructured) grid interpolation
is performed. In our 2D current unbounded flow solver, we won't need the
full exchange mode even when the interpolation is performed since mpi4py-fft only
for allows for slabs decomposition, and the corner cells corresponds to then the
corners of the unbounded domain. The full exchange mode may become useful if a
periodic domain flow solver is employed.
"""

def __init__(self, ghost_size, mpi_construct):
def __init__(self, ghost_size, mpi_construct, full_exchange=True):
# extra width needed for kernel computation
if ghost_size <= 0 and not isinstance(ghost_size, int):
raise ValueError(
Expand All @@ -99,38 +125,83 @@ def __init__(self, ghost_size, mpi_construct):
)
self.ghost_size = ghost_size
self.mpi_construct = mpi_construct
# define field_size variable for local field size (which includes ghost)
self.field_size = mpi_construct.local_grid_size + 2 * self.ghost_size
# exchange mode to include corners (forming full halo ring) or not
self.full_exchange = full_exchange
self.grid_coord = np.array(self.mpi_construct.grid.coords)

# Initialize data types
self.init_datatypes()

# Initialize requests list for non-blocking comm
self.comm_requests = []

# Set datatypes for ghost communication
# For row we use contiguous type
self.row_type = mpi_construct.dtype_generator.Create_contiguous(
count=self.ghost_size * self.field_size[1]
if self.full_exchange:
self.exchange_scalar_field_init = self.exchange_scalar_field_full_init
else:
self.exchange_scalar_field_init = self.exchange_scalar_field_edges_init

def init_datatypes(self):
"""
Set datatypes for ghost communication
Note: this can be done more cleanly using Create_subarray, but using vector
approach here for illustration in a simpler 2d setting
"""
# define field_size variable for local field size (without ghost)
self.field_size = self.mpi_construct.local_grid_size

# (1) For row data
self.row_type = self.mpi_construct.dtype_generator.Create_vector(
count=self.ghost_size,
blocklength=self.field_size[1],
stride=self.field_size[1] + 2 * self.ghost_size,
)
self.row_type.Commit()
# For column we use strided vector
self.column_type = mpi_construct.dtype_generator.Create_vector(

# (2) For column data
self.column_type = self.mpi_construct.dtype_generator.Create_vector(
count=self.field_size[0],
blocklength=self.ghost_size,
stride=self.field_size[1],
stride=self.field_size[1] + 2 * self.ghost_size,
)
self.column_type.Commit()

# Initialize requests list for non-blocking comm
self.comm_requests = []
# (3) For corner data (only if needed)
if self.full_exchange:
self.corner_type = self.mpi_construct.dtype_generator.Create_vector(
count=self.ghost_size,
blocklength=self.ghost_size,
stride=self.field_size[1] + 2 * self.ghost_size,
)
self.corner_type.Commit()

def _get_diagonally_shifted_coord_rank(self, coord_shift):
"""Helper function to get diagonally shifted coords"""
shifted_coord = self.grid_coord + np.array(coord_shift)
if not self.mpi_construct.periodic_domain and (
np.any(shifted_coord >= self.mpi_construct.grid_topology)
or np.any(shifted_coord < 0)
):
# The shifted coord is out of non-periodic domain
rank = MPI.PROC_NULL
else:
# Periodic domain is automatically taken care of in mpi cartersian grid
rank = self.mpi_construct.grid.Get_cart_rank(shifted_coord)
return rank

def exchange_scalar_field_init(self, local_field):
def exchange_scalar_field_edges_init(self, local_field):
"""
Exchange scalar field ghost data between neighbors.
Exchange scalar field ghost data on edges (sides) between neighbors.
"""
# Lines below to make code more literal
y_axis = 0
x_axis = 1
ghost_rows_offset = self.ghost_size * local_field.shape[1]

# Along Y: send to previous block, receive from next block
self.comm_requests.append(
self.mpi_construct.grid.Isend(
(
local_field[self.ghost_size : 2 * self.ghost_size, :],
local_field.ravel()[ghost_rows_offset + self.ghost_size :],
1,
self.row_type,
),
Expand All @@ -140,7 +211,7 @@ def exchange_scalar_field_init(self, local_field):
self.comm_requests.append(
self.mpi_construct.grid.Irecv(
(
local_field[-self.ghost_size : local_field.shape[0], :],
local_field.ravel()[-ghost_rows_offset + self.ghost_size :],
1,
self.row_type,
),
Expand All @@ -152,7 +223,7 @@ def exchange_scalar_field_init(self, local_field):
self.comm_requests.append(
self.mpi_construct.grid.Isend(
(
local_field[-2 * self.ghost_size : -self.ghost_size, :],
local_field.ravel()[-2 * ghost_rows_offset + self.ghost_size :],
1,
self.row_type,
),
Expand All @@ -162,7 +233,7 @@ def exchange_scalar_field_init(self, local_field):
self.comm_requests.append(
self.mpi_construct.grid.Irecv(
(
local_field[0 : self.ghost_size, :],
local_field.ravel()[self.ghost_size :],
1,
self.row_type,
),
Expand All @@ -174,7 +245,7 @@ def exchange_scalar_field_init(self, local_field):
self.comm_requests.append(
self.mpi_construct.grid.Isend(
(
local_field.ravel()[self.ghost_size :],
local_field.ravel()[ghost_rows_offset + self.ghost_size :],
1,
self.column_type,
),
Expand All @@ -184,7 +255,9 @@ def exchange_scalar_field_init(self, local_field):
self.comm_requests.append(
self.mpi_construct.grid.Irecv(
(
local_field.ravel()[local_field.shape[1] - self.ghost_size :],
local_field.ravel()[
ghost_rows_offset + local_field.shape[1] - self.ghost_size :
],
1,
self.column_type,
),
Expand All @@ -196,7 +269,9 @@ def exchange_scalar_field_init(self, local_field):
self.comm_requests.append(
self.mpi_construct.grid.Isend(
(
local_field.ravel()[local_field.shape[1] - 2 * self.ghost_size :],
local_field.ravel()[
ghost_rows_offset + local_field.shape[1] - 2 * self.ghost_size :
],
1,
self.column_type,
),
Expand All @@ -206,14 +281,128 @@ def exchange_scalar_field_init(self, local_field):
self.comm_requests.append(
self.mpi_construct.grid.Irecv(
(
local_field.ravel()[0:],
local_field.ravel()[ghost_rows_offset:],
1,
self.column_type,
),
source=self.mpi_construct.previous_grid_along[x_axis],
)
)

def exchange_scalar_field_vertices_init(self, local_field):
"""
Exchange scalar field ghost data on vertices (corners) between neighbors.
"""
ghost_rows_offset = self.ghost_size * local_field.shape[1]
# Exchange corner ghost cells (four corners, so 4 exchanges here)
# (1) Update top right corner
# send bottom left corner to corresponding diagonal block (-1, -1),
# receive as top right corner from corresponding diagonal block (+1, +1)
self.comm_requests.append(
self.mpi_construct.grid.Isend(
(
local_field.ravel()[ghost_rows_offset + self.ghost_size :],
1,
self.corner_type,
),
dest=self._get_diagonally_shifted_coord_rank(coord_shift=[-1, -1]),
)
)
self.comm_requests.append(
self.mpi_construct.grid.Irecv(
(
local_field.ravel()[
-(self.ghost_size - 1) * local_field.shape[1]
- self.ghost_size :
],
1,
self.corner_type,
),
source=self._get_diagonally_shifted_coord_rank(coord_shift=[1, 1]),
)
)
# (2) Update bottom left corner
# send top right corner to corresponding block (+1, +1),
# receive as bottom left corner from corresponding diagonal block (-1, -1)
self.comm_requests.append(
self.mpi_construct.grid.Isend(
(
local_field.ravel()[
-(2 * self.ghost_size - 1) * local_field.shape[1]
- 2 * self.ghost_size :
],
1,
self.corner_type,
),
dest=self._get_diagonally_shifted_coord_rank(coord_shift=[1, 1]),
)
)
self.comm_requests.append(
self.mpi_construct.grid.Irecv(
(
local_field.ravel()[0:],
1,
self.corner_type,
),
source=self._get_diagonally_shifted_coord_rank(coord_shift=[-1, -1]),
)
)
# (3) Update top left corner
# send bottom right corner to corresponding diagonal block (-1, +1),
# receive as top left corner from corresponding diagonal block (+1, -1)
self.comm_requests.append(
self.mpi_construct.grid.Isend(
(
local_field.ravel()[
ghost_rows_offset + local_field.shape[1] - 2 * self.ghost_size :
],
1,
self.corner_type,
),
dest=self._get_diagonally_shifted_coord_rank(coord_shift=[-1, 1]),
)
)
self.comm_requests.append(
self.mpi_construct.grid.Irecv(
(
local_field.ravel()[-ghost_rows_offset:],
1,
self.corner_type,
),
source=self._get_diagonally_shifted_coord_rank(coord_shift=[1, -1]),
)
)
# (4) Update bottom right corner
# send top left corner to corresponding diagonal block (+1, -1),
# receive as bottom right corner from corresponding diagonal block (-1, +1)
self.comm_requests.append(
self.mpi_construct.grid.Isend(
(
local_field.ravel()[-2 * ghost_rows_offset + self.ghost_size :],
1,
self.corner_type,
),
dest=self._get_diagonally_shifted_coord_rank(coord_shift=[1, -1]),
)
)
self.comm_requests.append(
self.mpi_construct.grid.Irecv(
(
local_field.ravel()[local_field.shape[1] - self.ghost_size :],
1,
self.corner_type,
),
source=self._get_diagonally_shifted_coord_rank(coord_shift=[-1, 1]),
)
)

def exchange_scalar_field_full_init(self, local_field):
"""
Exchange scalar field ghost data (a full halo ring) between neighbors.
"""
self.exchange_scalar_field_edges_init(local_field)
self.exchange_scalar_field_vertices_init(local_field)

def exchange_vector_field_init(self, local_vector_field):
self.exchange_scalar_field_init(
local_field=local_vector_field[VectorField.x_axis_idx()]
Expand Down
Loading