-
Notifications
You must be signed in to change notification settings - Fork 1
/
torch_horovod.py
130 lines (101 loc) · 3.76 KB
/
torch_horovod.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import argparse
import torch
import horovod.torch as hvd
from horovod import run
from torch.nn import NLLLoss
from torch.optim import SGD
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torchvision.models import wide_resnet50_2
class RndDataset(Dataset):
def __init__(self, nb_samples=128):
self._nb_samples = nb_samples
def __len__(self):
return self._nb_samples
def __getitem__(self, index):
x = torch.randn((3, 32, 32))
y = torch.randint(0, 100, (1,)).item()
return x, y
def training(world_size, cuda, config):
# Specific hvd
hvd.init()
print({hvd.local_rank()}, ": run with config:", config, " - cuda:", cuda)
if cuda:
# Pin GPU to be used to process local rank (one GPU per process)
# Specific hvd
torch.cuda.set_device(hvd.local_rank())
# Data preparation
dataset = RndDataset(nb_samples=config["nb_samples"])
# Specific hvd
train_sampler = torch.utils.data.distributed.DistributedSampler(
dataset, num_replicas=hvd.size(), rank=hvd.rank()
)
train_loader = torch.utils.data.DataLoader(
dataset,
batch_size=int(config["batch_size"] / hvd.size()),
num_workers=1,
sampler=train_sampler,
)
# Model, criterion, optimizer setup
model = wide_resnet50_2(num_classes=100)
if cuda:
model.cuda()
criterion = NLLLoss()
optimizer = SGD(model.parameters(), lr=0.001)
# Specific hvd
# Add Horovod Distributed Optimizer
optimizer = hvd.DistributedOptimizer(
optimizer, named_parameters=model.named_parameters()
)
# Specific hvd
# Broadcast parameters from rank 0 to all other processes.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
# Training loop log param
log_interval = config["log_interval"]
def _train_step(batch_idx, data, target):
if cuda:
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
output = model(data)
# Add a softmax layer
probabilities = torch.nn.functional.softmax(output, dim=0)
loss_val = criterion(probabilities, target)
loss_val.backward()
optimizer.step()
if (batch_idx + 1) % (log_interval) == 0:
print(
"Process {}/{} Train Epoch: {} [{}/{}]\tLoss: {}".format(
hvd.local_rank(),
hvd.size(),
epoch,
(batch_idx + 1) * len(data),
len(train_sampler),
loss_val.item(),
)
)
return loss_val
# Running _train_step for n_epochs
n_epochs = 1
for epoch in range(n_epochs):
for batch_idx, (data, target) in enumerate(train_loader):
_train_step(batch_idx, data, target)
# Specific hvd
hvd.shutdown()
if __name__ == "__main__":
parser = argparse.ArgumentParser("Torch Native - Horovod")
parser.add_argument('--no-cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument("--nproc_per_node", type=int, default=2)
parser.add_argument("--log_interval", type=int, default=4)
parser.add_argument("--nb_samples", type=int, default=128)
parser.add_argument("--batch_size", type=int, default=16)
args_parsed = parser.parse_args()
args_parsed.cuda = not args_parsed.no_cuda and torch.cuda.is_available()
config = {
"log_interval": args_parsed.log_interval,
"batch_size": args_parsed.batch_size,
"nb_samples": args_parsed.nb_samples,
}
args = (args_parsed.nproc_per_node, args_parsed.cuda, config)
# Specific hvd
run(training, args=args, use_gloo=True, np=args_parsed.nproc_per_node)