-
Notifications
You must be signed in to change notification settings - Fork 4k
/
dataloader.py
108 lines (90 loc) · 3.7 KB
/
dataloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
'''
Copyright 2019 The Microsoft DeepSpeed Team
'''
import torch
from torch.utils.data import DataLoader, RandomSampler
from torch.utils.data.distributed import DistributedSampler
class RepeatingLoader:
def __init__(self, loader):
"""Wraps an iterator to allow for infinite iteration. This is especially useful
for DataLoader types that we wish to automatically restart upon completion.
Args:
loader (iterator): The data loader to repeat.
"""
self.loader = loader
self.data_iter = iter(self.loader)
def __iter__(self):
return self
def __next__(self):
try:
batch = next(self.data_iter)
except StopIteration:
self.data_iter = iter(self.loader)
batch = next(self.data_iter)
return batch
class DeepSpeedDataLoader(object):
def __init__(self,
dataset,
batch_size,
pin_memory,
local_rank,
tput_timer,
collate_fn=None,
num_local_io_workers=None,
data_sampler=None,
data_parallel_world_size=None,
data_parallel_rank=None,
dataloader_drop_last=False):
self.tput_timer = tput_timer
self.batch_size = batch_size
if local_rank >= 0:
if data_sampler is None:
data_sampler = DistributedSampler(dataset=dataset,
num_replicas=data_parallel_world_size,
rank=data_parallel_rank)
device_count = 1
else:
if data_sampler is None:
data_sampler = RandomSampler(dataset)
device_count = torch.cuda.device_count()
batch_size *= device_count
if num_local_io_workers is None:
num_local_io_workers = 2 * device_count
self.num_local_io_workers = num_local_io_workers
self.data_sampler = data_sampler
self.dataset = dataset
self.collate_fn = collate_fn
self.device_count = device_count
self.batch_size = batch_size
self.pin_memory = pin_memory
self.len = len(self.data_sampler)
self.data = None
self.dataloader_drop_last = dataloader_drop_last
def __iter__(self):
self._create_dataloader()
return self
def __len__(self):
return self.len
def __next__(self):
if self.tput_timer:
self.tput_timer.start()
return next(self.data)
def _create_dataloader(self):
if self.collate_fn is None:
self.dataloader = DataLoader(self.dataset,
batch_size=self.batch_size,
pin_memory=self.pin_memory,
sampler=self.data_sampler,
num_workers=self.num_local_io_workers,
drop_last=self.dataloader_drop_last)
else:
self.dataloader = DataLoader(self.dataset,
batch_size=self.batch_size,
pin_memory=self.pin_memory,
sampler=self.data_sampler,
collate_fn=self.collate_fn,
num_workers=self.num_local_io_workers,
drop_last=self.dataloader_drop_last)
self.data = (x for x in self.dataloader)
return self.dataloader
# DataLoader([(torch.randn(3, 3), torch.tensor(i % 2)) for i in range(10)], batch_size=2))