seo網(wǎng)站分析報(bào)告百度置頂廣告多少錢
1 基本概念
-
rank:進(jìn)程號(hào),在多進(jìn)程上下文中,我們通常假定rank 0是第一個(gè)進(jìn)程或者主進(jìn)程,其它進(jìn)程分別具有1,2,3不同rank號(hào),這樣總共具有4個(gè)進(jìn)程
-
node:物理節(jié)點(diǎn),可以是一個(gè)容器也可以是一臺(tái)機(jī)器,節(jié)點(diǎn)內(nèi)部可以有多個(gè)GPU;nnodes指物理節(jié)點(diǎn)數(shù)量, nproc_per_node指每個(gè)物理節(jié)點(diǎn)上面進(jìn)程的數(shù)量
-
local_rank:指在一個(gè)node上進(jìn)程的相對(duì)序號(hào),local_rank在node之間相互獨(dú)立
-
WORLD_SIZE:全局進(jìn)程總個(gè)數(shù),即在一個(gè)分布式任務(wù)中rank的數(shù)量
-
Group:進(jìn)程組,一個(gè)分布式任務(wù)對(duì)應(yīng)了一個(gè)進(jìn)程組。只有用戶需要?jiǎng)?chuàng)立多個(gè)進(jìn)程組時(shí)才會(huì)用到group來管理,默認(rèn)情況下只有一個(gè)group
如下圖所示,共有3個(gè)節(jié)點(diǎn)(機(jī)器),每個(gè)節(jié)點(diǎn)上有4個(gè)GPU,每臺(tái)機(jī)器上起4個(gè)進(jìn)程,每個(gè)進(jìn)程占一塊GPU,那么圖中一共有12個(gè)rank,nproc_per_node=4,nnodes=3,每個(gè)節(jié)點(diǎn)都一個(gè)對(duì)應(yīng)的node_rank。
「注意」 rank與GPU之間沒有必然的對(duì)應(yīng)關(guān)系,一個(gè)rank可以包含多個(gè)GPU;一個(gè)GPU也可以為多個(gè)rank服務(wù)(多進(jìn)程共享GPU),在torch的分布式訓(xùn)練中習(xí)慣默認(rèn)一個(gè)rank對(duì)應(yīng)著一個(gè)GPU,因此local_rank可以當(dāng)作GPU號(hào)
-
backend 通信后端,可選的包括:nccl(NVIDIA推出)、gloo(Facebook推出)、mpi(OpenMPI)。一般建議GPU訓(xùn)練選擇nccl,CPU訓(xùn)練選擇gloo
-
master_addr與master_port 主節(jié)點(diǎn)的地址以及端口,供init_method 的tcp方式使用。 因?yàn)閜ytorch中網(wǎng)絡(luò)通信建立是從機(jī)去連接主機(jī),運(yùn)行ddp只需要指定主節(jié)點(diǎn)的IP與端口,其它節(jié)點(diǎn)的IP不需要填寫。 這個(gè)兩個(gè)參數(shù)可以通過環(huán)境變量或者init_method傳入
# 方式1:
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
dist.init_process_group("nccl", rank=rank, world_size=world_size)
# 方式2:
dist.init_process_group("nccl", init_method="tcp://localhost:12355",rank=rank, world_size=world_size)
2. 使用分布式訓(xùn)練模型
使用DDP分布式訓(xùn)練,一共就如下個(gè)步驟:
-
初始化進(jìn)程組
dist.init_process_group
-
設(shè)置分布式采樣器
DistributedSampler
-
使用
DistributedDataParallel
封裝模型 -
使用
torchrun
或者mp.spawn
啟動(dòng)分布式訓(xùn)練
2.1 初始化進(jìn)程組
進(jìn)程組初始化如下:
torch.distributed.init_process_group(backend, init_method=None, world_size=-1, rank=-1, store=None,...)
backend
: 指定分布式的后端,torch提供了NCCL, GLOO,MPI
三種可用的后端,通常CPU的分布式訓(xùn)練選擇GLOO, GPU的分布式訓(xùn)練就用NCCL即可init_method
:初始化方法,可以是TCP連接、File共享文件系統(tǒng)、ENV環(huán)境變量三種方式?init_method='tcp://ip:port'
: 通過指定rank 0(即:MASTER進(jìn)程)的IP和端口,各個(gè)進(jìn)程進(jìn)行信息交換。 需指定 rank 和 world_size 這兩個(gè)參數(shù)init_method='file://path'
:通過所有進(jìn)程都可以訪問共享文件系統(tǒng)來進(jìn)行信息共享。需要指定rank和world_size參數(shù)init_method=env://
:從環(huán)境變量中讀取分布式的信息(os.environ),主要包括 MASTER_ADDR, MASTER_PORT, RANK, WORLD_SIZE。 其中,rank和world_size可以選擇手動(dòng)指定,否則從環(huán)境變量讀取?
tcp和env兩種方式比較類似(其實(shí)env就是對(duì)tcp的一層封裝),都是通過網(wǎng)絡(luò)地址的方式進(jìn)行通信,也是最常用的初始化方法
「case 1」
import os, argparse
import torch
import torch.distributed as distparse = argparse.ArgumentParser()
parse.add_argument('--init_method', type=str)
parse.add_argument('--rank', type=int)
parse.add_argument('--ws', type=int)
args = parse.parse_args()if args.init_method == 'TCP':dist.init_process_group('nccl', init_method='tcp://127.0.0.1:28765', rank=args.rank, world_size=args.ws)
elif args.init_method == 'ENV':dist.init_process_group('nccl', init_method='env://')rank = dist.get_rank()
print(f"rank = {rank} is initialized")
# 單機(jī)多卡情況下,localrank = rank. 嚴(yán)謹(jǐn)應(yīng)該是local_rank來設(shè)置device
torch.cuda.set_device(rank)
tensor = torch.tensor([1, 2, 3, 4]).cuda()
print(tensor)
?假設(shè)單機(jī)雙卡的機(jī)器上運(yùn)行,則「開兩個(gè)終端」,同時(shí)運(yùn)行下面的命令
# TCP方法
python3 test_ddp.py --init_method=TCP --rank=0 --ws=2
python3 test_ddp.py --init_method=TCP --rank=1 --ws=2
# ENV方法
MASTER_ADDR='localhost' MASTER_PORT=28765 RANK=0 WORLD_SIZE=2 python3 test_gpu.py --init_method=ENV
MASTER_ADDR='localhost' MASTER_PORT=28765 RANK=1 WORLD_SIZE=2 python3 test_gpu.py --init_method=ENV
如果開啟的進(jìn)程未達(dá)到 word_size 的數(shù)量,則所有進(jìn)程會(huì)一直等待,直到都開始運(yùn)行,可以得到輸出如下:
# rank0 的終端:
rank 0 is initialized
tensor([1, 2, 3, 4], device='cuda:0')
# rank1的終端
rank 1 is initialized
tensor([1, 2, 3, 4], device='cuda:1')
在初始化DDP的時(shí)候,能夠給后端提供主進(jìn)程的地址端口、本身的RANK,以及進(jìn)程數(shù)量即可。初始化完成后,就可以執(zhí)行很多分布式的函數(shù)了,比如dist.get_rank
, dist.all_gather
等等
2.2 分布式訓(xùn)練數(shù)據(jù)加載
DistributedSampler
把所有數(shù)據(jù)分成N份(N為worldsize), 并能正確的分發(fā)到不同的進(jìn)程中,每個(gè)進(jìn)程可以拿到一個(gè)數(shù)據(jù)的子集,不重疊,不交叉
torch.utils.data.distributed.DistributedSampler(dataset,num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False)
-
dataset
: 需要加載的完整數(shù)據(jù)集 -
num_replicas
: 把數(shù)據(jù)集分成多少份,默認(rèn)是當(dāng)前dist的world_size -
rank
: 當(dāng)前進(jìn)程的id,默認(rèn)dist的rank -
shuffle
:是否打亂 -
drop_last
: 如果數(shù)據(jù)長(zhǎng)度不能被world_size整除,可以考慮是否將剩下的扔掉 -
seed
:隨機(jī)數(shù)種子。這里需要注意,從源碼中可以看出,真正的種子其實(shí)是self.seed+self.epoch
這樣的好處是,不同的epoch每個(gè)進(jìn)程拿到的數(shù)據(jù)是不一樣,因此需要在每個(gè)epoch開始前設(shè)置下:sampler.set_epoch(epoch)
「case 2」
sampler = DistributedSampler(dataset)
loader = DataLoader(dataset, sampler=sampler)
for epoch in range(start_epoch, n_epochs):sampler.set_epoch(epoch) # 設(shè)置epoch 更新種子train(loader)
2.3 模型分布式封裝
將單機(jī)模型使用torch.nn.parallel.DistributedDataParallel
進(jìn)行封裝
torch.cuda.set_device(local_rank)
model = Model().cuda()
model = DistributedDataParallel(model, device_ids=[local_rank])
「注意」 要調(diào)用model內(nèi)的函數(shù)或者屬性,使用model.module.xxxx
這樣在多卡訓(xùn)練時(shí),每個(gè)進(jìn)程有一個(gè)model副本和optimizer,使用自己的數(shù)據(jù)進(jìn)行訓(xùn)練,之后反向傳播計(jì)算完梯度的時(shí)候,所有進(jìn)程的梯度會(huì)進(jìn)行all-reduce操作進(jìn)行同步,進(jìn)而保證每個(gè)卡上的模型更新梯度是一樣的,模型參數(shù)也是一致的。
在save和load模型時(shí)候,為了減小所有進(jìn)程同時(shí)讀寫磁盤,一般處理方法是以主進(jìn)程為主
「case 3」
model = DistributedDataParallel(model, device_ids=[local_rank])
CHECKPOINT_PATH ="./model.checkpoint"
if rank == 0:torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# barrier()其他保證rank 0保存完成
dist.barrier()
map_location = {"cuda:0": f"cuda:{local_rank}"}
model.load_state_dict(torch.load(CHECKPOINT_PATH, map_location=map_location))
# 后面正常訓(xùn)練代碼
optimizer = xxx
for epoch:for data in Dataloader:model(data)xxx# 訓(xùn)練完成 只需要保存rank 0上的即可# 不需要dist.barrior(), all_reduce 操作保證了同步性if rank == 0:torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
2.4 啟動(dòng)分布式訓(xùn)練
如case1所示我們手動(dòng)運(yùn)行多個(gè)程序,相對(duì)繁瑣。實(shí)際上本身DDP就是一個(gè)python 的多進(jìn)程,因此完全可以直接通過多進(jìn)程的方式來啟動(dòng)分布式程序。 torch提供了以下兩種啟動(dòng)工具來更加方便的運(yùn)行torch的DDP程序。
2.4.1 mp.spawn
使用torch.multiprocessing
(python的multiprocessing
的封裝類) 來自動(dòng)生成多個(gè)進(jìn)程
mp.spawn(fn, args=(), nprocs=1, join=True, daemon=False)
-
fn
: 進(jìn)程的入口函數(shù),該函數(shù)的第一個(gè)參數(shù)會(huì)被默認(rèn)自動(dòng)加入當(dāng)前進(jìn)*程的rank, 即實(shí)際調(diào)用:fn(rank, *args)
-
nprocs
: 進(jìn)程數(shù)量,即:world_size -
args
: 函數(shù)fn的其他常規(guī)參數(shù)以tuple的形式傳遞
「case 4」
import torch
import torch.distributed as dist
import torch.multiprocessing as mpdef fn(rank, ws, nums):dist.init_process_group('nccl', init_method='tcp://127.0.0.1:28765',rank=rank, world_size=ws)rank = dist.get_rank()print(f"rank = {rank} is initialized")torch.cuda.set_device(rank)tensor = torch.tensor(nums).cuda()print(tensor)if __name__ == "__main__":ws = 2mp.spawn(fn, nprocs=ws, args=(ws, [1, 2, 3, 4]))
直接執(zhí)行一次命令 python3 test_ddp.py 即可,輸出如下:
rank = 0 is initialized
rank = 1 is initialized
tensor([1, 2, 3, 4], device='cuda:1')
tensor([1, 2, 3, 4], device='cuda:0')
這種方式同時(shí)適用于TCP和ENV初始化
2.4.2 launch/run
使用torch提供的 torch.distributed.launch
工具,可以以模塊的形式直接執(zhí)行
python3 -m torch.distributed.launch --配置 train.py --args參數(shù)
常用配置有:
-
--nnodes
: 使用的機(jī)器數(shù)量,單機(jī)的話,就默認(rèn)是1了 -
--nproc_per_node
: 單機(jī)的進(jìn)程數(shù),即單機(jī)的worldsize -
--master_addr/port
: 使用的主進(jìn)程rank0的地址和端口 -
--node_rank
: 當(dāng)前的進(jìn)程rank
在單機(jī)情況下, 只有--nproc_per_node
是必須指定的,--master_addr/port
和node_rank
都是可以由launch通過環(huán)境自動(dòng)配置
「case5 test_dist.py」
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import osdist.init_process_group('nccl', init_method='env://')rank = dist.get_rank()
local_rank = os.environ['LOCAL_RANK']
master_addr = os.environ['MASTER_ADDR']
master_port = os.environ['MASTER_PORT']
print(f"rank = {rank} is initialized in {master_addr}:{master_port}; local_rank = {local_rank}")
torch.cuda.set_device(rank)
tensor = torch.tensor([1, 2, 3, 4]).cuda()
print(tensor)
輸入如下命令
python3 -m torch.distribued.launch --nproc_per_node=2 test_dist.py
得到如下輸出
rank = 0 is initialized in 127.0.0.1:29500; local_rank = 0
rank = 1 is initialized in 127.0.0.1:29500; local_rank = 1
tensor([1, 2, 3, 4], device='cuda:1')
tensor([1, 2, 3, 4], device='cuda:0')
注意:torch1.10開始用終端命令torchrun來代替torch.distributed.launch
,具體來說,torchrun實(shí)現(xiàn)了launch的一個(gè)超集,不同的地方在于:
-
完全使用環(huán)境變量配置各類參數(shù),如RANK,LOCAL_RANK, WORLD_SIZE等,尤其是local_rank不再支持用命令行隱式傳遞的方式
-
能夠更加優(yōu)雅的處理某個(gè)worker失敗的情況,重啟worker。需要代碼中有l(wèi)oad_checkpoint(path)和save_checkpoint(path) 這樣有worker失敗的話,可以通過load最新的模型,重啟所有的worker接著訓(xùn)練。具體參考 imagenet-torchrun
-
訓(xùn)練的節(jié)點(diǎn)數(shù)目可以彈性變化
上面的命令可以寫成如下
torchrun --nproc_per_node=2 test_dist.py
torchrun或者launch對(duì)上面ENV的初始化方法支持最完善,TCP初始化方法的可能會(huì)出現(xiàn)問題,因此盡量使用env來初始化dist
3. 分布式做evaluation
分布式做evaluation的時(shí)候,一般需要先所有進(jìn)程的輸出結(jié)果進(jìn)行g(shù)ather,再進(jìn)行指標(biāo)的計(jì)算,兩個(gè)常用的函數(shù):
-
dist.all_gather(tensor_list, tensor)
: 將所有進(jìn)程的tensor進(jìn)行收集并拼接成新的tensorlist返回 -
dist.all_reduce(tensor, op)
這是對(duì)tensor的in-place的操作, 對(duì)所有進(jìn)程的某個(gè)tensor進(jìn)行合并操作,op可以是求和等
「case 6 test_ddp.py」
import torch
import torch.distributed as distdist.init_process_group('nccl', init_method='env://')
rank = dist.get_rank()
torch.cuda.set_device(rank)tensor = torch.arange(2) + 1 + 2 * rank
tensor = tensor.cuda()
print(f"rank {rank}: {tensor}")tensor_list = [torch.zeros_like(tensor).cuda() for _ in range(2)]
dist.all_gather(tensor_list, tensor)
print(f"after gather, rank {rank}: tensor_list: {tensor_list}")dist.barrier()
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
print(f"after reduce, rank {rank}: tensor: {tensor}")
通過torchrun --nproc_per_node=2 test_ddp.py
輸出結(jié)果如下:?
rank 1: tensor([3, 4], device='cuda:1')
rank 0: tensor([1, 2], device='cuda:0')
after gather, rank 1: tensor_list: [tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')]
after gather, rank 0: tensor_list: [tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')]
after reduce, rank 0: tensor: tensor([4, 6], device='cuda:0')
after reduce, rank 1: tensor: tensor([4, 6], device='cuda:1')
在evaluation的時(shí)候,可以拿到所有進(jìn)程中模型的輸出,最后統(tǒng)一計(jì)算指標(biāo),基本流程如下
pred_list = []
for data in Dataloader:pred = model(data)batch_pred = [torch.zeros_like(label) for _ in range(world_size)]dist.all_gather(batch_pred, pred)pred_list.extend(batch_pred)
pred_list = torch.cat(pred_list, 1)
# 所有進(jìn)程pred_list是一致的,保存所有數(shù)據(jù)模型預(yù)測(cè)的值
4. 常用函數(shù)
-
torch.distributed.get_rank(group=None)
獲取當(dāng)前進(jìn)程的rank -
torch.distributed.get_backend(group=None)
獲取當(dāng)前任務(wù)(或者指定group)的后端 -
data_loader_train = torch.utils.data.DataLoader(dataset=data_set, batch_size=32,num_workers=16,pin_memory=True)
num_workers
: 加載數(shù)據(jù)的進(jìn)程數(shù)量,默認(rèn)只有1個(gè),增加該數(shù)量能夠提升數(shù)據(jù)的讀入速度。(注意:該參數(shù)>1,在低版本的pytorch可能會(huì)觸發(fā)python的內(nèi)存溢出)pin_memory
: 鎖頁(yè)內(nèi)存,加快數(shù)據(jù)在內(nèi)存上的傳遞速度。 若數(shù)據(jù)加載成為訓(xùn)練速度的瓶頸,可以考慮將這兩個(gè)參數(shù)加上
進(jìn)程內(nèi)指定顯卡,很多場(chǎng)景下使用分布式都是默認(rèn)一張卡對(duì)應(yīng)一個(gè)進(jìn)程,所以通常,我們會(huì)設(shè)置進(jìn)程能夠看到卡數(shù)
# 方式1:在進(jìn)程內(nèi)部設(shè)置可見的device
torch.cuda.set_device(args.local_rank)
# 方式2:通過ddp里面的device_ids指定
ddp_model = DDP(model, device_ids=[rank])
# 方式3:通過在進(jìn)程內(nèi)修改環(huán)境變量
os.environ['CUDA_VISIBLE_DEVICES'] = loac_rank