Skip to content

Commit

Permalink
Make MPI an optional dependency (#147)
Browse files Browse the repository at this point in the history
* Make MPI an optional dependency

* Fix list->List

* missed one

* Add MPI test

* Update tests, README and CLI tutorial

* Remove duplicate line

Co-authored-by: Aniket Fadia <[email protected]>

* fix spacing

* Support using MPI for CC (#148)

* Support using MPI for CC

* Fix download retry

---------

Co-authored-by: Aniket Fadia <[email protected]>
  • Loading branch information
carlosgjs and aniketfadia96 authored Jul 6, 2023
1 parent e0c94a0 commit ee84cef
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 58 deletions.
20 changes: 16 additions & 4 deletions .github/actions/setup/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,30 @@ inputs:
python-version:
required: true
description: Python version to install
mpi:
required: false
description: Whether to install with MPI or not
default: 'false'
runs:
using: composite
steps:
- name: Setup Python
uses: actions/[email protected]
with:
python-version: ${{inputs.python-version}}
- name: Setup MPI
uses: mpi4py/setup-mpi@v1
- name: Install project
- name: Setup pip
shell: sh
run: |
python3 -m ensurepip
pip3 install --upgrade pip
pip install ".[dev]"
- name: Setup MPI
if: ${{ inputs.mpi == 'true' }}
uses: mpi4py/setup-mpi@v1
- name: Install project no MPI
if: ${{ inputs.mpi == 'false' }}
shell: sh
run: pip install ".[dev]"
- name: Install project MPI
if: ${{ inputs.mpi == 'true' }}
shell: sh
run: pip install ".[dev,mpi]"
36 changes: 35 additions & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
python-version: ${{env.python_version}}
- name: Test Download (S0)
working-directory: ./src
run: noisepy download --start_date 2019-02-01T00:00:00 --end_date 2019-02-01T01:00:00 --stations ARV,BAK --inc_hours 1 --raw_data_path $RUNNER_TEMP/RAW_DATA
run: noisepy download --start_date 2019-02-01T00:00:00 --end_date 2019-02-01T02:00:00 --stations ARV,BAK --inc_hours 1 --raw_data_path $RUNNER_TEMP/RAW_DATA
- name: Cache data
uses: actions/cache/save@v3
with:
Expand Down Expand Up @@ -70,6 +70,40 @@ jobs:
working-directory: ./src
run: |
noisepy stack --raw_data_path $RUNNER_TEMP/RAW_DATA --ccf_path $RUNNER_TEMP/CCF --stack_path $RUNNER_TEMP/STACK --stack_method ${{matrix.method}}
s1_s2_mpi:
strategy:
fail-fast: true
matrix:
python_version: ['3.10']
method: [linear]
freq_norm: [rma]
runs-on: ubuntu-22.04
needs: s0_download
steps:
- name: Checkout Repo
uses: actions/[email protected]
- name: Setup NoisePy
uses: ./.github/actions/setup
with:
python-version: ${{matrix.python_version}}
mpi: 'true'
- name: Cache data
id: cache
uses: actions/cache/restore@v3
with:
key: download_data-${{ github.sha }}
path: ${{runner.temp}}
- name: Check cache hit
if: steps.cache.outputs.cache-hit != 'true'
run: exit 1
- name: Test Cross-Correlation (S1)
working-directory: ./src
run: |
mpiexec -n 2 noisepy cross_correlate --mpi --raw_data_path $RUNNER_TEMP/RAW_DATA --ccf_path $RUNNER_TEMP/CCF --freq_norm ${{matrix.freq_norm}}
- name: Test Stacking (S2)
working-directory: ./src
run: |
mpiexec -n 3 noisepy stack --mpi --raw_data_path $RUNNER_TEMP/RAW_DATA --ccf_path $RUNNER_TEMP/CCF --stack_path $RUNNER_TEMP/STACK --stack_method ${{matrix.method}}
s3_dates:
strategy:
fail-fast: true
Expand Down
17 changes: 15 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,24 @@ The nature of NoisePy being composed of python scripts allows flexible package i
```bash
conda create -n noisepy python=3.8 pip
conda activate noisepy
conda install -c conda-forge openmpi
pip install noisepy-seis
```

## With Conda and pip and MPI support:
```bash
conda create -n noisepy python=3.8 pip
conda activate noisepy
conda install -c conda-forge openmpi
pip install noisepy-seis[mpi]
```

## With virtual environment:
```bash
python -m venv noisepy
source noisepy/bin/activate
pip install noisepy-seis
```
## With virtual environment and MPI support:
An MPI installation is required. E.g. for macOS using [brew](https://brew.sh/) :
```bash
brew install open-mpi
Expand All @@ -36,7 +49,7 @@ brew install open-mpi
```bash
python -m venv noisepy
source noisepy/bin/activate
pip install noisepy-seis
pip install noisepy-seis[mpi]
```


Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ classifiers = [
dependencies = [
"DateTimeRange==2.0.0",
"h5py>=3.8.0",
"mpi4py>=3.1.4",
"numba>=0.57.0",
"numpy>=1.22.0",
"pandas>=1.5.3",
Expand Down Expand Up @@ -79,6 +78,9 @@ dev = [
sql = [
"SQLite3-0611",
]
mpi = [
"mpi4py>=3.1.4",
]

[project.scripts]
noisepy = "noisepy.seis:main.main_cli"
Expand Down
9 changes: 8 additions & 1 deletion src/noisepy/seis/S0A_download_ASDF_MPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from noisepy.seis.utils import TimeLogger
from . import noise_module
from obspy.clients.fdsn import Client
from obspy.clients.fdsn.header import FDSNNoDataException

logger = logging.getLogger(__name__)
if not sys.warnoptions:
Expand Down Expand Up @@ -281,6 +282,7 @@ def download_stream(
client = Client(prepro_para.client_url_key, timeout=15)
retries = 5
while retries > 0:
retries -= 1
try:
tr = client.get_waveforms(
network=net,
Expand All @@ -290,8 +292,12 @@ def download_stream(
starttime=starttime,
endtime=endtime,
)
except FDSNNoDataException:
logger.warning(f"No data available for {starttime}-{endtime}/{sta}.{chan}")
return -1, None
except Exception as e:
logger.warning(f"{e} for get_waveforms({sta}.{chan})")
logger.warning(f"{type(e)}/{e} for get_waveforms({sta}.{chan})")
continue

logger.debug(f"Got waveforms for {sta}.{chan}")

Expand All @@ -304,6 +310,7 @@ def download_stream(
endtime,
)
return index, tr
logger.warning(f"Could not download data for {starttime}-{endtime}/{sta}.{chan}")
return -1, None


Expand Down
40 changes: 19 additions & 21 deletions src/noisepy/seis/S1_fft_cc_MPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
from datetimerange import DateTimeRange
from scipy.fftpack.helper import next_fast_len

from noisepy.seis.datatypes import Channel, ChannelData, ConfigParameters, NoiseFFT

from . import noise_module
from .datatypes import Channel, ChannelData, ConfigParameters, NoiseFFT
from .scheduler import Scheduler, SingleNodeScheduler
from .stores import CrossCorrelationDataStore, RawDataStore
from .utils import TimeLogger, error_if

Expand All @@ -35,19 +35,13 @@
Marine Denolle ([email protected])
NOTE:
0. MOST occasions you just need to change parameters followed with detailed
explanations to run the script.
1. To read SAC/mseed files, we assume the users have sorted the data
by the time chunk they prefer (e.g., 1day)
and store them in folders named after the time chunk (e.g, 2010_10_1).
modify L135 to find your local data;
2. A script of S0B_to_ASDF.py is provided to help clean messy SAC/MSEED data and
1. A script of S0B_to_ASDF.py is provided to help clean messy SAC/MSEED data and
convert them into ASDF format.
the script takes minor time compared to that for cross-correlation.
so we recommend to use S0B script for
better NoisePy performance. the downside is that it duplicates the
continuous noise data on your machine;
3. When "coherency" is preferred, please set "freq_norm" to "rma" and "time_norm" to "no"
2. When "coherency" is preferred, please set "freq_norm" to "rma" and "time_norm" to "no"
for better performance.
"""

Expand All @@ -56,28 +50,32 @@ def cross_correlate(
raw_store: RawDataStore,
fft_params: ConfigParameters,
cc_store: CrossCorrelationDataStore,
scheduler: Scheduler = SingleNodeScheduler(),
):
"""
Perform the cross-correlation analysis
Parameters:
raw_store: Store to load data from
fft_params: Parameters for the FFT calculations
cc_store: Store for saving cross correlations
Args:
raw_store: Store to load data from
fft_params: Parameters for the FFT calculations
cc_store: Store for saving cross correlations
"""

executor = ThreadPoolExecutor()
tlog = TimeLogger(logger, logging.INFO)
t_s1_total = tlog.reset()

# set variables to broadcast
timespans = raw_store.get_timespans()
splits = len(timespans)
if splits == 0:
raise IOError("Abort! no available seismic files for FFT")
def init() -> List:
# set variables to broadcast
timespans = raw_store.get_timespans()
if len(timespans) == 0:
raise IOError("Abort! no available seismic files for FFT")
return [timespans]

[timespans] = scheduler.initialize(init, 1)

for ts in timespans:
for its in scheduler.get_indices(timespans):
ts = timespans[its]
if cc_store.is_done(ts):
continue

Expand Down
35 changes: 15 additions & 20 deletions src/noisepy/seis/S2_stacking.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import numpy as np
import pandas as pd
from datetimerange import DateTimeRange
from mpi4py import MPI

from noisepy.seis.datatypes import ConfigParameters, StackMethod
from noisepy.seis.scheduler import Scheduler, SingleNodeScheduler
from noisepy.seis.stores import CrossCorrelationDataStore, StackStore
from noisepy.seis.utils import TimeLogger

Expand Down Expand Up @@ -42,7 +42,12 @@
MAX_MEM = 4.0


def stack(cc_store: CrossCorrelationDataStore, stack_store: StackStore, fft_params: ConfigParameters):
def stack(
cc_store: CrossCorrelationDataStore,
stack_store: StackStore,
fft_params: ConfigParameters,
scheduler: Scheduler = SingleNodeScheduler(),
):
tlog = TimeLogger(logger=logger)
t_tot = tlog.reset()
if fft_params.rotation and fft_params.correction:
Expand All @@ -64,33 +69,23 @@ def stack(cc_store: CrossCorrelationDataStore, stack_store: StackStore, fft_para
#######################################
# #########PROCESSING SECTION##########
#######################################

# --------MPI---------
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank == 0:
def initializer():
timespans = cc_store.get_timespans()
pairs_all = list(set(pair for ts in timespans for pair in cc_store.get_station_pairs(ts)))
logger.info(f"Station pairs: {pairs_all}")

splits = len(pairs_all)
if len(timespans) == 0 or splits == 0:
if len(timespans) == 0 or len(pairs_all) == 0:
raise IOError("Abort! no available CCF data for stacking")
else:
splits, timespans, pairs_all = [None for _ in range(3)]
return timespans, pairs_all

timespans, pairs_all = scheduler.initialize(initializer, 2)

# broadcast the variables
splits = comm.bcast(splits, root=0)
timespans = comm.bcast(timespans, root=0)
pairs_all = comm.bcast(pairs_all, root=0)
nccomp = fft_params.ncomp * fft_params.ncomp
num_chunk = len(timespans) * nccomp
num_segmts = 1

# MPI loop: loop through each user-defined time chunck
for ipair in range(rank, splits, size):
# loop through each pair assigned to this process by the scheduler
for ipair in scheduler.get_indices(pairs_all):
tlog.reset()

logger.debug("%dth path for station-pair %s" % (ipair, pairs_all[ipair]))
Expand Down Expand Up @@ -259,7 +254,7 @@ def write_stacks(comp: str, tparameters: Dict[str, Any], stacks: List[Tuple[Stac
stack_store.mark_done(src_sta, rec_sta)

tlog.log("step 2 in total", t_tot)
comm.barrier()
scheduler.synchronize()


def validate_pairs(ncomp: int, sta_pair: str, fauto: int, ts: DateTimeRange, n_pairs: int) -> bool:
Expand Down
20 changes: 14 additions & 6 deletions src/noisepy/seis/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .S1_fft_cc_MPI import cross_correlate
from .S2_stacking import stack
from .scedc_s3store import SCEDCS3DataStore
from .scheduler import MPIScheduler, SingleNodeScheduler
from .utils import fs_join, get_filesystem

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -156,14 +157,16 @@ def run_cross_correlation():
cc_store = ASDFCCStore(ccf_dir)
params = initialize_params(args, args.raw_data_path)
raw_store = create_raw_store(args, params)
cross_correlate(raw_store, params, cc_store)
scheduler = MPIScheduler(0) if args.mpi else SingleNodeScheduler()
cross_correlate(raw_store, params, cc_store, scheduler)
params.save_yaml(fs_join(ccf_dir, CONFIG_FILE))

def run_stack():
cc_store = ASDFCCStore(args.ccf_path, "r")
stack_store = ASDFStackStore(args.stack_path)
params = initialize_params(args, args.ccf_path)
stack(cc_store, stack_store, params)
scheduler = MPIScheduler(0) if args.mpi else SingleNodeScheduler()
stack(cc_store, stack_store, params, scheduler)
params.save_yaml(fs_join(args.stack_path, CONFIG_FILE))

if args.cmd == Command.DOWNLOAD:
Expand Down Expand Up @@ -192,7 +195,7 @@ def add_paths(parser, types: List[str]):
add_path(parser, t)


def make_step_parser(subparsers: Any, cmd: Command, paths: List[str]):
def make_step_parser(subparsers: Any, cmd: Command, paths: List[str]) -> Any:
parser = subparsers.add_parser(
cmd.name.lower(),
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
Expand All @@ -209,6 +212,11 @@ def make_step_parser(subparsers: Any, cmd: Command, paths: List[str]):
"-c", "--config", type=lambda f: _valid_config_file(parser, f), required=False, help="Configuration YAML file"
)
add_model(parser, ConfigParameters())
return parser


def add_mpi(parser: Any):
parser.add_argument("-m", "--mpi", action="store_true")


def main_cli():
Expand All @@ -220,9 +228,9 @@ def parse_args(arguments: Iterable[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="cmd", required=True)
make_step_parser(subparsers, Command.DOWNLOAD, ["raw_data"])
make_step_parser(subparsers, Command.CROSS_CORRELATE, ["raw_data", "ccf", "xml"])
make_step_parser(subparsers, Command.STACK, ["raw_data", "stack", "ccf"])
make_step_parser(subparsers, Command.ALL, ["raw_data", "ccf", "stack", "xml"])
add_mpi(make_step_parser(subparsers, Command.CROSS_CORRELATE, ["raw_data", "ccf", "xml"]))
add_mpi(make_step_parser(subparsers, Command.STACK, ["raw_data", "stack", "ccf"]))
add_mpi(make_step_parser(subparsers, Command.ALL, ["raw_data", "ccf", "stack", "xml"]))

args = parser.parse_args(arguments)

Expand Down
Loading

0 comments on commit ee84cef

Please sign in to comment.