Skip to content

Commit

Permalink
DALI; all_reduce to gather results; fitnet, kd script
Browse files Browse the repository at this point in the history
  • Loading branch information
triomino committed Jul 13, 2020
1 parent f36f6c2 commit dc34553
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 68 deletions.
138 changes: 138 additions & 0 deletions dataset/imagenet_dali.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# https://github.com/NVIDIA/DALI/blob/master/docs/examples/use_cases/pytorch/resnet50/main.py

import argparse
import os
import shutil
import time
import math

import torch
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.distributed as dist
import torch.optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models

import numpy as np

try:
from nvidia.dali.plugin.pytorch import DALIClassificationIterator
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
except ImportError:
raise ImportError("Please install DALI from https://www.github.com/NVIDIA/DALI to run this example.")

class HybridTrainPipe(Pipeline):
def __init__(self, batch_size, num_threads, device_id, data_dir, crop,
shard_id, num_shards, dali_cpu=False):
super(HybridTrainPipe, self).__init__(batch_size,
num_threads,
device_id,
seed=12 + device_id)
self.input = ops.FileReader(file_root=data_dir,
shard_id=shard_id,
num_shards=num_shards,
shuffle_after_epoch=True,
pad_last_batch=True)
#let user decide which pipeline works him bets for RN version he runs
dali_device = 'cpu' if dali_cpu else 'gpu'
decoder_device = 'cpu' if dali_cpu else 'mixed'
# This padding sets the size of the internal nvJPEG buffers to be able to handle all images from full-sized ImageNet
# without additional reallocations
device_memory_padding = 211025920 if decoder_device == 'mixed' else 0
host_memory_padding = 140544512 if decoder_device == 'mixed' else 0
self.decode = ops.ImageDecoderRandomCrop(device=decoder_device, output_type=types.RGB,
device_memory_padding=device_memory_padding,
host_memory_padding=host_memory_padding,
random_aspect_ratio=[0.8, 1.25],
random_area=[0.1, 1.0],
num_attempts=100)
self.res = ops.Resize(device=dali_device,
resize_x=crop,
resize_y=crop,
interp_type=types.INTERP_TRIANGULAR)
self.cmnp = ops.CropMirrorNormalize(device="gpu",
output_dtype=types.FLOAT,
output_layout=types.NCHW,
crop=(crop, crop),
mean=[0.485 * 255,0.456 * 255,0.406 * 255],
std=[0.229 * 255,0.224 * 255,0.225 * 255])
self.coin = ops.CoinFlip(probability=0.5)
print('DALI "{0}" variant'.format(dali_device))

def define_graph(self):
rng = self.coin()
self.jpegs, self.labels = self.input(name="Reader")
images = self.decode(self.jpegs)
images = self.res(images)
output = self.cmnp(images.gpu(), mirror=rng)
return [output, self.labels]

class HybridValPipe(Pipeline):
def __init__(self, batch_size, num_threads, device_id, data_dir, crop,
size, shard_id, num_shards):
super(HybridValPipe, self).__init__(batch_size,
num_threads,
device_id,
seed=12 + device_id)
self.input = ops.FileReader(file_root=data_dir,
shard_id=shard_id,
num_shards=num_shards,
random_shuffle=False,
pad_last_batch=True)
self.decode = ops.ImageDecoder(device="mixed", output_type=types.RGB)
self.res = ops.Resize(device="gpu",
resize_shorter=size,
interp_type=types.INTERP_TRIANGULAR)
self.cmnp = ops.CropMirrorNormalize(device="gpu",
output_dtype=types.FLOAT,
output_layout=types.NCHW,
crop=(crop, crop),
mean=[0.485 * 255,0.456 * 255,0.406 * 255],
std=[0.229 * 255,0.224 * 255,0.225 * 255])

def define_graph(self):
self.jpegs, self.labels = self.input(name="Reader")
images = self.decode(self.jpegs)
images = self.res(images)
output = self.cmnp(images)
return [output, self.labels]

from dataset.imagenet import get_data_folder
def get_dali_data_loader(args):
crop_size = 224
val_size = 256

data_folder = get_data_folder()
train_folder = os.path.join(data_folder, 'train')
val_folder = os.path.join(data_folder, 'val')

pipe = HybridTrainPipe(batch_size=args.batch_size,
num_threads=args.num_workers,
device_id=args.rank,
data_dir=train_folder,
crop=crop_size,
dali_cpu=args.dali == 'cpu',
shard_id=args.rank,
num_shards=args.world_size)
pipe.build()
train_loader = DALIClassificationIterator(pipe, reader_name="Reader", fill_last_batch=False)

pipe = HybridValPipe(batch_size=args.batch_size,
num_threads=args.num_workers,
device_id=args.rank,
data_dir=val_folder,
crop=crop_size,
size=val_size,
shard_id=args.rank,
num_shards=args.world_size)
pipe.build()
val_loader = DALIClassificationIterator(pipe, reader_name="Reader", fill_last_batch=False)

return train_loader, val_loader
5 changes: 5 additions & 0 deletions fitnet.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
python train_student.py --path-t ./save/models/ResNet34_vanilla/resnet34_transformed.pth \
--batch_size 256 --epochs 90 --dataset imagenet --gpu_id 0,1,2,3 --dist-url tcp:https://127.0.0.1:23334 \
--print-freq 100 --num_workers 16 --distill hint --model_s ResNet18 -r 1 -a 1 -b 100 --trial 0 \
--multiprocessing-distributed --learning_rate 0.1 --lr_decay_epochs 30,60,90 --weight_decay 1e-4 --hint_layer 1 \
--dali gpu
69 changes: 36 additions & 33 deletions helper/loops.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
import torch

from .util import AverageMeter, accuracy
from .util import AverageMeter, accuracy, reduce_tensor

def train_vanilla(epoch, train_loader, model, criterion, optimizer, opt):
"""vanilla training"""
Expand All @@ -16,8 +16,15 @@ def train_vanilla(epoch, train_loader, model, criterion, optimizer, opt):
top1 = AverageMeter()
top5 = AverageMeter()

n_batch = len(train_loader) if opt.dali is None else (train_loader._size + opt.batch_size - 1) // opt.batch_size

end = time.time()
for idx, (input, target) in enumerate(train_loader):
for idx, batch_data in enumerate(train_loader):
if opt.dali is None:
input, target = batch_data
else:
input, target = batch_data[0]['data'], batch_data[0]['label'].squeeze().long()

data_time.update(time.time() - end)

# input = input.float()
Expand Down Expand Up @@ -52,7 +59,7 @@ def train_vanilla(epoch, train_loader, model, criterion, optimizer, opt):
'Loss {loss.avg:.4f}\t'
'Acc@1 {top1.avg:.3f}\t'
'Acc@5 {top5.avg:.3f}'.format(
epoch, idx, len(train_loader), opt.gpu, batch_time=batch_time,
epoch, idx, n_batch, opt.gpu, batch_time=batch_time,
data_time=data_time, loss=losses, top1=top1, top5=top5))
sys.stdout.flush()

Expand Down Expand Up @@ -84,17 +91,23 @@ def train_distill(epoch, train_loader, module_list, criterion_list, optimizer, o
top1 = AverageMeter()
top5 = AverageMeter()

n_batch = len(train_loader) if opt.dali is None else (train_loader._size + opt.batch_size - 1) // opt.batch_size

end = time.time()
for idx, data in enumerate(train_loader):
data_time.update(time.time() - end)

if opt.distill in ['crd']:
input, target, index, contrast_idx = data
if opt.dali is None:
if opt.distill in ['crd']:
input, target, index, contrast_idx = data
else:
input, target = data
else:
input, target = data
input, target = data[0]['data'], data[0]['label'].squeeze().long()

if target.shape[0] < opt.batch_size:
continue
# TODO: how to deal with the last batch
# if target.shape[0] < opt.batch_size:
# continue

if opt.gpu is not None:
input = input.cuda(opt.gpu, non_blocking=True)
Expand All @@ -103,7 +116,7 @@ def train_distill(epoch, train_loader, module_list, criterion_list, optimizer, o
if opt.distill in ['crd']:
index = index.cuda()
contrast_idx = contrast_idx.cuda()

# ===================forward=====================
feat_s, logit_s = model_s(input, is_feat=True)
with torch.no_grad():
Expand Down Expand Up @@ -167,7 +180,7 @@ def train_distill(epoch, train_loader, module_list, criterion_list, optimizer, o

loss = opt.gamma * loss_cls + opt.alpha * loss_div + opt.beta * loss_kd
losses.update(loss.item(), input.size(0))

metrics = accuracy(logit_s, target, topk=(1, 5))
top1.update(metrics[0].item(), input.size(0))
top5.update(metrics[1].item(), input.size(0))
Expand All @@ -177,7 +190,7 @@ def train_distill(epoch, train_loader, module_list, criterion_list, optimizer, o
# ===================backward=====================
optimizer.zero_grad()
loss.backward()
optimizer.step()
optimizer.step()

# print info
if idx % opt.print_freq == 0:
Expand All @@ -188,14 +201,14 @@ def train_distill(epoch, train_loader, module_list, criterion_list, optimizer, o
'Loss {loss.avg:.4f}\t'
'Acc@1 {top1.avg:.3f}\t'
'Acc@5 {top5.avg:.3f}'.format(
epoch, idx, len(train_loader), opt.gpu, loss=losses, top1=top1, top5=top5,
epoch, idx, n_batch, opt.gpu, loss=losses, top1=top1, top5=top5,
batch_time=batch_time, data_time=data_time))
sys.stdout.flush()

return top1.avg, top5.avg, losses.avg, data_time.avg


def validate(val_loader, model, criterion, opt, meter_queue = None):
def validate(val_loader, model, criterion, opt):
"""validation"""

batch_time = AverageMeter()
Expand All @@ -206,9 +219,16 @@ def validate(val_loader, model, criterion, opt, meter_queue = None):
# switch to evaluate mode
model.eval()

n_batch = len(val_loader) if opt.dali is None else (val_loader._size + opt.batch_size - 1) // opt.batch_size

with torch.no_grad():
end = time.time()
for idx, (input, target) in enumerate(val_loader):
for idx, batch_data in enumerate(val_loader):
if opt.dali is None:
input, target = batch_data
else:
input, target = batch_data[0]['data'], batch_data[0]['label'].squeeze().long()

if opt.gpu is not None:
input = input.cuda(opt.gpu, non_blocking=True)
if torch.cuda.is_available():
Expand All @@ -235,24 +255,7 @@ def validate(val_loader, model, criterion, opt, meter_queue = None):
'Loss {loss.avg:.4f}\t'
'Acc@1 {top1.avg:.3f}\t'
'Acc@5 {top5.avg:.3f}'.format(
idx, len(val_loader), opt.gpu, batch_time=batch_time, loss=losses,
idx, n_batch, opt.gpu, batch_time=batch_time, loss=losses,
top1=top1, top5=top5))

if opt.multiprocessing_distributed:
if opt.rank % opt.ngpus_per_node == 0:
for i in range(opt.ngpus_per_node - 1):
start_time = time.time()
# In fact the other three is not used here.
top1_peer, top5_peer, batch_time_peer, losses_peer = meter_queue.get()
top1.merge(top1_peer)
top5.merge(top5_peer)
losses.merge(losses_peer)
meter_queue.task_done()
else:
meter_queue.put((top1, top5, batch_time, losses))
# TODO: This cost about 2 seconds idle on each processor.
# Without this may cause problem if
# next round putting begins before ending of this round.
meter_queue.join()


return top1.avg, top5.avg, losses.avg
6 changes: 6 additions & 0 deletions helper/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import torch
import numpy as np
import torch.distributed as dist


def adjust_learning_rate_new(epoch, optimizer, LUT):
Expand Down Expand Up @@ -85,6 +86,11 @@ def load_json_to_dict(json_path):
params = json.load(f)
return params

def reduce_tensor(tensor, world_size = 1):
rt = tensor.clone()
dist.all_reduce(rt, op=dist.ReduceOp.SUM)
rt /= world_size
return rt

if __name__ == '__main__':

Expand Down
4 changes: 4 additions & 0 deletions kd.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
python train_student.py --path-t ./save/models/ResNet34_vanilla/resnet34_transformed.pth \
--batch_size 256 --epochs 90 --dataset imagenet --gpu_id 4,5,6,7 --dist-url tcp:https://127.0.0.1:23333 \
--print-freq 100 --num_workers 16 --distill kd --model_s ResNet18 -r 1 -a 1 -b 0 --trial 0 \
--multiprocessing-distributed --learning_rate 0.1 --lr_decay_epochs 30,60 --weight_decay 1e-4 --dali gpu
9 changes: 9 additions & 0 deletions scripts/model_transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import torch

if __name__ == '__main__':
state_dict = torch.load('save/models/ResNet34_vanilla/resnet34-333f7ec4.pth')
torch.save({
# 'epoch': model['epoch'],
'model': state_dict,
# 'best_acc': model['best_acc1']
}, 'save/models/ResNet34_vanilla/resnet34_transformed.pth')
Loading

0 comments on commit dc34553

Please sign in to comment.