Skip to content

Commit

Permalink
hod.prepare_sim: detect and report when a prepare_slab subprocess…
Browse files Browse the repository at this point in the history
… fails (#151)

* hod.prepare_sim: detect and report when a prepare_slab subprocess fails, e.g. due to OOM

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* changelog

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
lgarrison and pre-commit-ci[bot] authored Sep 21, 2024
1 parent acde333 commit 229fe9a
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 28 deletions.
7 changes: 7 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changelog
=========

2.0.2 (upcoming)
----------------

Enhancements
~~~~~~~~~~~~
- ``hod.prepare_sim``: detect and report when a ``prepare_slab`` subprocess fails [#151]

2.0.1 (2024-03-01)
------------------
This is a bugfix release primarily to add support for ASDF 3.1.0.
Expand Down
67 changes: 39 additions & 28 deletions abacusnbody/hod/prepare_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
"""

import argparse
import concurrent.futures
import gc
import glob
import itertools
import multiprocessing
import os
from itertools import repeat
from pathlib import Path
import time
from pathlib import Path

import h5py
import numba
import numpy as np
Expand All @@ -26,7 +27,7 @@
from abacusnbody.data.compaso_halo_catalog import CompaSOHaloCatalog
from abacusnbody.data.read_abacus import read_asdf

from ..analysis.shear import smooth_density, get_shear
from ..analysis.shear import get_shear, smooth_density
from ..analysis.tsc import tsc_parallel

DEFAULTS = {}
Expand Down Expand Up @@ -1103,32 +1104,42 @@ def main(
else:
nthread = int(nthread)

p = multiprocessing.Pool(config['prepare_sim']['Nparallel_load'])
p.starmap(
prepare_slab,
zip(
range(numslabs),
repeat(savedir),
repeat(simdir),
repeat(simname),
repeat(z_mock),
repeat(ztype),
repeat(tracer_flags),
repeat(MT),
repeat(want_ranks),
repeat(want_AB),
repeat(want_shear),
repeat(shearmark),
repeat(cleaning),
repeat(newseed),
repeat(halo_lc),
repeat(nthread),
repeat(overwrite),
),
)
p.close()
p.join()
with concurrent.futures.ProcessPoolExecutor(
max_workers=config['prepare_sim']['Nparallel_load'],
mp_context=multiprocessing.get_context('spawn'),
) as pool:
futures = [
pool.submit(
prepare_slab,
i,
savedir=savedir,
simdir=simdir,
simname=simname,
z_mock=z_mock,
z_type=ztype,
tracer_flags=tracer_flags,
MT=MT,
want_ranks=want_ranks,
want_AB=want_AB,
want_shear=want_shear,
shearmark=shearmark,
cleaning=cleaning,
newseed=newseed,
halo_lc=halo_lc,
nthread=nthread,
overwrite=overwrite,
)
for i in range(numslabs)
]

# check that all futures succeeded
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except concurrent.futures.process.BrokenProcessPool as bpp:
raise RuntimeError(
'A subprocess died in prepare_sim. Did prepare_slab() run out of memory?'
) from bpp
# print("done, took time ", time.time() - start)


Expand Down

0 comments on commit 229fe9a

Please sign in to comment.