Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add test for TensorFlow #2

Closed
boegel opened this issue Nov 23, 2022 · 12 comments
Closed

add test for TensorFlow #2

boegel opened this issue Nov 23, 2022 · 12 comments
Milestone

Comments

@boegel
Copy link
Contributor

boegel commented Nov 23, 2022

For example based on https://github.com/EESSI/eessi-demo/tree/main/TensorFlow

@casparvl
Copy link
Collaborator

casparvl commented Apr 7, 2023

I've been trying to make a TensorFlow test that supports multinode (essentially: this tutorial) . Our previous test used Horovod for that, but it's not a very nice approach: we want to test TensorFlow functionality with this test, it doesn't make sense to pull in another framework just for multinode testing. But, in fact, we may want to verify that TensorFlow's native multinode support works.

The TensorFlow code is pretty straightforward:

# mnist_setup.py
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
# Actual tf test
per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])

import tensorflow as tf
# Make sure we can import mnist_setup from current dir
if '.' not in sys.path:
  sys.path.insert(0, '.')
import mnist_setup

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)

with strategy.scope():
    multi_worker_model = mnist_setup.build_and_compile_cnn_model()

multi_worker_model.fit(multi_worker_dataset, epochs=10, steps_per_epoch=70)

The challenge that I have been struggling with for the better part of today is how to set TF_CONFIG and CUDA_VISIBLE_DEVICES. TensorFlow has no notion of SLURM or MPI. It expects you to hardcode which nodes (and ports) participate in TF_CONFIG, as well as what your current global rank is. Furthermore, it expects you to set CUDA_VISIBLE_DEVICES in such a way that multiple ranks don't try to use the same GPUs.

My solution was to use mpi4py (which is in the dependency list of TensorFlow anyway, through SciPy-bundle) to let each process query hostname, get a port, communicate that, and then figure out local rank based on the combination of how often the same hostname occurs, what your own rank is, etc.

That worked, except... if I set CUDA_VISIBLE_DEVICES after from mpi4py import MPI, it seems that CUDA is already initialized. Thus, setting CUDA_VISIBLE_DEVICES after that import has absolutely no use.

I thought this could be resolved by running the parallel launcher multiple times in the test, as is done in this reframe tutorial. If we keep the same default arguments to the parallel launcher, the amount of processes and its distribution over the nodes should be consistent. That enables us to create a mapping global ranks to local ranks. Based on that, we can then set CUDA_VISIBLE_DEVICES (in a wrapper bash script) and then call a Python script that imports mpi4py to figure out global rank. However, there is one problem with this: to index the mapping, we need to global rank. And the only one way to (portably) get the global rank, is to use mpi4py. And that puts us back in a catch-22, as I need mpi4py to get my global rank, but I need to set CUDA_VISIBLE_DEVICES before invoking mpi4py.

One alternative is just to say "this test only supports slurm" (we can maybe implement a 'skip' if it's not run with SLURM), and use SLURM environment variables (SLURM_PROCID and SLURM_LOCALID) to get the global and local rank. That bypasses the need for mpi4py alltogether... However, I would also like to be able to use the mpirun launcher in a SLURM environment, as that is also quite a common use case. We can use OMPI_COMM_WORLD_RANK and it's equivalent when OpenMPI is used - but that might not always be the case. Ugh...

@smoors
Copy link
Collaborator

smoors commented Apr 9, 2023

could tf.config.set_visible_devices be used in the python script as an alternative to CUDA_VISIBLE_DEVICES?
https://www.tensorflow.org/api_docs/python/tf/config/set_visible_devices

@casparvl
Copy link
Collaborator

Hm, would be interesting to see if that still does something after mpi4py has run the CUDA initialization. I also thought of another solution:

  • Create a wrapper.sh script, that is the one that is mpirun-ed
  • In wrapper.sh, first launch an mpi4py based python script that gets what I need: global rank, local rank, etc. It could even already set CUDA_VISIBLE_DEVICES. Then, launch my actual TensorFlow test as a seperate python script. Since this runs after the first, the CUDA initialization should now be based on the CUDA_VISIBLE_DEVICES set in the first Python sript.

Unless I'm overlooking something, that should also work. But, if I can solve it without a wrapper script, and simply through the TensorFlow API, that might be less messy / more elegant. I'll give it a go.

@casparvl
Copy link
Collaborator

Ok, that works. I'm now using mpi4py to first set TF_CONFIG, as well as determine my local rank:"

from mpi4py import MPI

# Make sure we can import mnist_setup from current dir
if '.' not in sys.path:
  sys.path.insert(0, '.')
import mnist_setup

device='gpu'

os.environ.pop('TF_CONFIG', None)

def find_free_port():
    with closing(socket.socket()) as s:
        s.bind(('', 0))
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
        return s.getsockname()[1]


# Multi-worker config
# We'll use mpi4py to figure out our rank, have each process select a socket and hostname,
# and allreduce that information to create a TF_CONFIG
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
hostname = socket.gethostname()
port = find_free_port()

rank_info = {
    'rank': rank,
    'hostname': hostname,
    'port': port,
}

rank_info_vector = comm.allgather(rank_info)
# print(f"Rank_info_vector: {rank_info_vector}")

# Get the local rank
def get_local_rank(rank_info_vector):
    # Note that rank_info_vector is sorted by rank, by definition of the MPI allgather routine.
    # Thus, if our current rank is the n-th item in rank_info_vector for which the hostname matches,
    # our local rank is n
    local_rank = 0
    for item in rank_info_vector:
        if item['hostname'] == hostname:
            if item['rank'] == rank:
                return local_rank
            else:
                local_rank += 1

local_rank = get_local_rank(rank_info_vector)
print(f"Rank {rank} has local_rank {local_rank}, hostname {hostname} and port {port}")

worker_list = ['%s:%s' % (item['hostname'], item['port']) for item in rank_info_vector]

tf_config = {
    'cluster': {
        'worker': worker_list,
    },
    'task': {'type': 'worker', 'index': rank}
}
os.environ["TF_CONFIG"] = json.dumps(tf_config)

...

if device == 'gpu':
    # Limit each local rank to its own GPU.
    # Note that we need to create the MultiWorkerMirroredStrategy before calling tf.config.list_logical_devices
    # To avoid running into this error https://github.com/tensorflow/tensorflow/issues/34568
    physical_devices = tf.config.list_physical_devices('GPU')
    try:
        tf.config.set_visible_devices(physical_devices[local_rank], 'GPU')
...

I only get an error (this one) on process teardown. Annoying and a bit ugly, but not a dealbreaker: the test has run completely by then.

Still have to test it on multi-node, only did 2 GPU single node for now. Currently waiting in the queue for a multi-node test...

@casparvl
Copy link
Collaborator

2 node 8 GPU test also completed succesfully (though it produces the same error on process teardown, of course).

@smoors
Copy link
Collaborator

smoors commented Apr 16, 2023

maybe try calling MPI.Finalize() to properly shut down the workers?

@casparvl
Copy link
Collaborator

As far as I know, tf.distribute doesn't use MPI, it has its own process/task management, and its own teardown. I used MPI to figure out the local rank, but that's a different story. But who knows, it might be that it gets confused by the combination of me using MPI and tf.distribute using its own process management... I could give it a try :)

@casparvl
Copy link
Collaborator

Tried calling MPI.Finalize(), doesn't solve the error on process destruction.

@casparvl
Copy link
Collaborator

Oh, also, documentation of mpi4py says you don't need to call MPI.Finalize(), as this is done automatically (as long as mpi4py was the one to init the MPI environment) https://mpi4py.readthedocs.io/en/stable/overview.html

@smoors
Copy link
Collaborator

smoors commented Apr 17, 2023

chatgpt suggests the following solution :)

import psutil
import signal

for proc in psutil.process_iter():
    if "worker" in proc.name():
        proc.send_signal(signal.SIGTERM)

you might have to change the string "worker" into something more unique for tensorflow

EDIT: this will probably not work for workers in other nodes..

@casparvl
Copy link
Collaborator

boegel pushed a commit to boegel/EESSI-test-suite that referenced this issue Jun 3, 2023
use separate partitions for 40GB and 80GB GPU nodes on Hortense + also add CPU-only partitions
@boegel
Copy link
Contributor Author

boegel commented Aug 9, 2023

closing since #38 got merged

@boegel boegel closed this as completed Aug 9, 2023
@boegel boegel added this to the 0.1 milestone Aug 9, 2023
casparvl pushed a commit that referenced this issue May 29, 2024
smoors pushed a commit that referenced this issue Oct 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants