-
Notifications
You must be signed in to change notification settings - Fork 21.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RFC, Tracker] DataLoader improvements #41292
Comments
* new dataset format. * add basic test. * files for testing. * serialization using torch. * add diskcache. * adding deprecation warnings. * removing legacy. * warning about transforms. * detecting file format using reader.
Some random comments:
|
Thanks @vadimkantorov for detailed reply. I've updated issue with it. See some comments inline.
Captured.
Do you have url of good example?
Agree.
Missed state of Sampler. Captured as requirement.
Captured
Haven't heard of it. Maybe someone else have info.
I haven't seen feature requests about pinning. How much of it must be a feature in compare to
Captured. |
I guess (1) and (2) are the same things :) I never finally tried doing atexit in pytorch/audio#271 (comment). torchaudio currently requires initialization to use sox: https://pytorch.org/audio/#torchaudio.initialize_sox. Unitl very recently, ffmpeg required initialization as well. I'm never sure if we must do this initialization in every worker thread or initialization in main process is enough
If I'm not mistaken, I heard about this as a battle story from @szagoruyko who shared this advice received from @soumith (again if I'm not mixing things up), maybe 4 years ago
Yeah, I saw people using taskset sometimes (and that's what I meant by pinning), but I never saw anyone discussing what are benefits and when you have to use it |
Some great ideas, improvements listed here. Balancing IO and CPU utilization is a big challenge with the current DataLoader design. Chunking the data into shards (the webdata idea, tfrecords, riegeli, etc) can help significantly with the IO, but there is huge room for improvement with even individual files if the IO is dispatched and coalesced efficiently. I've spent time thinking about this related to past experience essentially building real-time video storage subsystems, moving the IO layer down to C++ could be a big win. It's really hard to dispatch enough IO requests simultaneously to fill sata queues from synchonous Python calls like PIL.open(). From C/C++ it's fairly straight forward to flood controllers with io calls using reactor like i/o multiplexing (select()/poll()) or proactor like (windows overlapped I/O). Maybe some of this could be done in Python, but Python threading and async interfaces have always struck me as being quite limiting and generally 'not worth the effort' given the ever present GIL and the near impossibility of writing low context switch code (no mutexes, no mallocs) in Python. However, the above makes the boundary between the Python and C++ potentially quite awkard. Once you have the data hot from the IO, it's best to do some decoding and possibly augmentation while you have it, but then that could result in a loss of the Python transform pipeline flexibility. |
I'd like to have an option where a batch is collected from multiple loader processes, rather than all from a single process. The general use case is:
Currently, this situation results in batches that have samples from a handful of files (the next batch from the next loader's handful), rather than from entire set of files that can be kept in RAM, which reduces the variation inside a batch. The workaround is to extract and cache samples ahead of time, but that makes experimenting with the details of what a sample is expensive. |
@elistevens sounds equivalent to the fairly common case you run into where you chunk data with some sort of compression .... parquet files, video codecs with IDR frame intervals, etc ... although sometimes you can at least start decoding with partial data (video) or skip some data because there is internal structure or external metadata about seek points, packets, etc multi-threading in the same memory space is usually much more effective there than trying to do it across process boundaries with IPC or shared memory |
In my actual use case it was CT scans converted into Not impossible to work around, obviously, but the current API doesn't help. |
@rwightman @elistevens thank you for the input, we realize that both multi-processing and multi-threading are equally important for the framework and will look for options to make them interchangeable. |
One more related dataloader issue: #22924 about "Make DataLoader return usable traces in the case of Ctrl+C and similar OS signals." |
#13246 this would be a huge problem if the processes are kept around and not recreated |
We've developed an open source framework called Tensorcom (based on the WebDataset I/O system, though it can also be used with existing Dataset implementations). It can be used instead of DataLoader. Worker processes are started up independently of the main training job; they can be started up before or after, as client or servers, and batching can happen either in the workers or the main job. Worker processes don't have to run on the same machine as the deep learning job, and for hyperparameter searches, a single set of worker processes can broadcast training samples to a large number of training jobs. Making worker processes explicit and separate from the main training job has a number of advantages: it simplifies testing and debugging, it allows performance tuning separate from the deep learning job, you can adjust the number of workers while the DL job is running, it makes I/O pipelines independent of the specific deep learning job, and it works really well with K8s (where worker processes and DL jobs can just be configured and scaled as separate ReplicaSets or StatefulSets). Tensorcom can be used as a replacement for DataLoader (and the Tensorcom workers can be started up by the DL job), but more importantly, it can take advantage of RDMA and direct-to-GPU hardware where available. We're currently still working on WebDataset integration into PyTorch. Tensorcom works very well and efficiently, but it's still largely undocumented and the APIs may still change. I just wanted to give a heads up. |
Some thoughts on batching: These issues can be resolved to some extent today using |
@ppwwyyxx do you have an example or preudocode of how you calculate dynamic batch size based on size/length? |
In order to only put samples that have similar sizes into a batch, we first created a dataloader that's not batched, i.e. buckets = defaultdict(list)
for d in data_loader:
bucket = buckets[compute_bucket_id(d)] # determine which bucket based on d's size
bucket.append(d)
if len(bucket) == batch_size:
yield collate_fn(bucket)
del bucket[:] |
Batching serves two purposes: it's needed for deep learning, but it's also needed for efficient data transfer from workers to the main process.
I think a redesign should be based on a general pipeline abstraction; batching and unbatching just become pipeline stages. There are initial sources of data, and a primitive for running multiple data pipelines in parallel and combining the results. We have tried to provide something like that in WebDataset (while still staying somewhat close to the existing Dataset/DataLoader framework). So, a regular multi-worker loader with shuffling across batches would look like (note the use of batching for efficient data transfers from workers to the main process): dataset = wds.Dataset(url).decode().to_tuple("png", "cls").batched(64)
loader = wds.MultiDataset(dataset, workers=4).unbatched().shuffle(1000).batched(64) For variable-sized batching, you would use something like: def bucket_batching(samples):
... as above ...
dataset = wds.Dataset(url).decode().to_tuple("png", "cls").batched(64, combine_tensors=False)
loader = wds.MultiDataset(dataset, workers=4).unbatched().shuffle(1000).pipe(bucket_batching) For multi-node distributed preprocessing with buckets, you can write: # CPU node
dataset = wds.Dataset(url).decode().to_tuple("png", "cls").shuffle(1000).pipe(bucket_batching)
tensorcom.Connection("zpush:https://gpunode:5678").serve(dataset) # GPU node
loader = tensorcom.Connection("zpull:https://*:5678") All of this essentially works today in WebDataset and Tensorcom (arguments may be slightly different from these examples in the released versions). |
Some feedbacks from PyText use case:
|
What is the current status of the support of NVIDIA DirectStorage/RTX IO direct data loading into GPU by PyTorch? |
Just adding my own use case for why I think this is a must have feature: Imagine you are trying to scale up your batch size by some factor N, and you can only increase the number of your CPUs but not so much your RAM. The default behaviour of DataLoader is to spin up num_workers processes, each of which samples a full batch before returning it. The issue that this causes is that in order for me to benefit from the increased availability of CPUs to scale my batch size up efficiently, I need to basically convert a situation where: batch_size=B, num_workers=W to batch_size=NxB, num_workers=NxW. This will ensure that I am able to properly utilize the extra CPUs I can get access to. However, in doing so I am also effectively increasing the memory that I need to use from B x W to N x B x N x W, which is problematic since I have access to more CPUs, but not additional RAM. And in most cases, such as on cloud compute, the factor of CPU to RAM increase is usually equal, therefore, an increase of N^2 complexity for memory could be very problematic. I was wondering if there is a way to change the default behaviour of the DataLoader from a situation where instead of each worker loading a full batch, it instead has a pool of workers sampling single samples each which are then integrated into a batch once they reach a certain number. That way I could do something like what I need above, without needing all that extra memory, and potentially saving memory as well. Please let me know your thoughts. |
This functionality is going to be supported using DataPipes and the new DataLoader, likely 1.12 release will cover such use-case. |
This issue created to track all current problems of
DataLoader
(and related components such asDataSet
,Sampler
,Transforms
). It is focused on what we want to archive. Implementation details will follow after prioritizing.Known problems
Issues interpretability. DataLoader periodically ends in a situation when the process hangs for an unknown to the user reason. Root cause analysis of such cases is overcomplicated due to cryptic Exception traces and/or inability to reproduce quickly. Examples Dataloader._shutdown_workers hangs #39570 torch.utils.data.DataLoader multiprocess hanging when matplotlib.pyplot imported after torch #36375 Training got stuck due to timeout from dataloader #33296 DataLoader: Segmentation Fault (core dumped) #31758 DataLoader fails to re-raise exceptions with required arguments #30147 [dataloader] Problem in exception reraise mechanism #25522. We plan to:
Issues with CPU Utilization. Usage of DataLoader frequently ends with oversubscribing to CPU threads or CPU underutilization. We plan to:
GPU underutilization. Multiple users reported problems with the CUDA context and multiprocessing. Examples Cannot re-initialize CUDA in forked subprocess #40403 how to do 3d data augmentation in parallel on the gpu? #35759. We plan to:
Transforms
easily.Recreation of the processes. Spawning processes is a costly procedure, especially in the case of CUDA usage. We plan to:
Memory usage issues. In case of unbalanced loading/training speed DataLoader behavior leads to OOM. Examples CPU memory would not release at the beginning of the next iteration #31101 [dataloader] Hang because of too many open files (and probably some process dead) #25691. We plan to:
Documentation
Clear Initialization process doc. Some users getting confused at what time DataSet / DataLoader getting initialized. The order must be clear and understandable.
Clear Multiprocessing / Multithreading docs. With recipes (torchaudio is a good example). Especially covering cases when threading meets forking and when cuda meets forking.
Benchmarking
To make sure that performance improvements are trackable and we are not introducing slow downs we need to write and document benchmark scripts / methodology.
Add documentation how to measure performance of data loading, so users can identify bottlenecks.
Ability to benchmark CPU vs GPU Transforms.
Improvements
Controllable size of batch on every step of pipeline. Various features (such as WebData, Batch Loading in general, Minibatces, batch level Transforms) requires various sizes of batch (1-N) on every step of the process. We need to make it possible to specify it on every level of loading graph.
Improved Sampling. Sampling is important component of data loading pipeline. Next requirements should be considered in case of architecture changes.
Introduce ability to choose between multi-threading and multi-processing. Right now we provide only multi-processing solution, making multi-threading possible will potentially unlock more performant pipelines.
Preprocessing/Networking. We currently support Distributed training but it is impossible to do distributed data preprocessing at this moment. We should consider this option.
Infinite DataSet/DataLoader. There is significant number of requests to provide DataSet/DataLoader which never stops to produce batches. Ex: https://discuss.pytorch.org/t/implementing-an-infinite-loop-dataset-dataloader-combo/35567 https://www.google.com/search?q=pytorch+Infinite+DataSet
Extend ConcatDataSet functionality. There are more than one way to combine two DataSets.
Caching support. In some cases caching is the easiest way to store entire DataSet in the memory. We can unify caching API to avoid multiple non optimal solutions. Caching Support for class Dataset #35642 Add caching support for Dataset class #39274
State saving / restoration for DataSet / DataLoader / Sampler. Restarting training from specific checkpoint is problematic when size of single epoch is too large. We can introduce method to save/restore data pipeline state. state_dict and load_state_dict methods for DataLoader and Sampler to continue training at specific epoch and batch #36650.
Improve collate_fn experience
Unify Transforms Interface.
Considering Improvements
cc @ssnl
cc @vadimkantorov
The text was updated successfully, but these errors were encountered: