-
Notifications
You must be signed in to change notification settings - Fork 281
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Petastorm sharding + Distributed PyTorch #508
Comments
I absolutely agree with the problem definition and your analysis of possible solutions. For the scenarios our user have encountered, they typically chose option 3 as it does not introduce unintended bias for some samples and the easiest to configure. Perhaps one more control variable relevant to Option 2, is the size of your rowgroups. Reducing it improves the granularity of the task-unit and for smaller datasets may reducing unintended bias some portions of samples may get during training. Implementing Option 1 will be tricky since that would introduce coordination between ranks that is not needed with the naive module-N sharding. |
Thank you! Good to know I'm on the right track. I control the size of the rowgroup, but reducing it also takes away the benefits of Parquet as a format - more reads required. Option 4 I found in PyTorch discussions on sampling IterableDataset: make each of |
|
@megaserg I am trying to do something similar with TPU Pods (multi-machine), PyTorch XLA reading imagnet petastorm dataset from GCS buckets. Did you find that one of your options worked best? If so, how did you implement? Option 3? 4? something else? My current solution reads JPEG from GCS. Came to petastorm so I can reduce training time, as my goal is to make training time for this configuration as performant as reading data from local VM (but for use cases where storing training data locally is not ideal). More on that here if interested @selitvin appreciate any thought you may have too. |
So we went with option 3 for a while. Works fine, but epoch counting is confusing. Then we went with option 1, but we didn't use Petastorm. Given a Parquet dataset (i.e. bunch of Parquet files), we build an "index" of Parquet rows, basically a list of triples So we thought how we could have larger rowgroups (benefitting from the storage format), and not throw away data. Currently, we're using a custom Hope this helps! |
just wondering if i use option 3(each rank loads the whole petastorm dataset), might reading the whole data set have any performance bottlenecks ? is it better to set number of row groups equal to 1 in that case ? |
There should not be performance bottlenecks since the parquet dataset is read rowgroup-by-rowgroup. |
just to confirm how option 3 is configured: in make_reader api, just simply set cur_shard=None, shard_count=None, shard_seed=None, num_epochs=real epoch number instead of None. is it fine ? |
Yes (note that cur_shard=None, shard_count=None are the defaults, so you don't need to specify this explicitly) |
I've tested solution 3, and discovered that, in order to equivalently load a global batch in each rank, I must also set BTW, I notice that there is an in progress PR: #767 working on shuflle. Could you please tell me if you have any plan to fix it? @selitvin |
You are right. The order is not deterministic when multiple workers race over reading row groups and we do not have a reordering queue set up (not too hard to implement). #767 will land very soon (day(s) timespan) |
I'm trying to build a DLRM model containing both model parallel layers & data parallel layers. In the model parallel part, I must make sure each rank consumes the same global batch. After that, an all-to-all collective distributes the global output activations to data parallel layers in each rank. In fact, this is a compromise choice. Potentially, I could also generate different mini-batch in each rank, and use an all-to-all collective to recover the global batch before inputting to model parallel layers. However, duo to the communication problems in our hardware, the former is better for my own purpose. |
I see. It should be not too hard to add a reordering buffer to the reader. I think pytorch DataLoader has similar functionality. I am not sure when I'll have time to do this. If you wanna give it a shot and propose a PR, I'll be happy to work with you to get it in. |
I'm too busy to handle dataloader recently:dizzy_face: that's why I resort to petastorm. |
Problem:
I would like to train a PyTorch model on a Parquet dataset in a distributed (multi-GPU, multi-machine) setup, for a fixed number of epochs. For this, I need to shard the dataset and I hoped providing Petastorm's
cur_shard
andshard_count
would be sufficient. I create Petastorm reader withnum_epochs=1
each epoch (or could create once andreset()
).But when training a
DistributedDataParallel
model, PyTorch expects shards to have the same number of examples, so that they have the same number of batches, so that all ranks make the same number of training steps, so that they participate in the same number of allreduces. E.g.torch.utils.data.distributed.DistributedSampler
(used to implement sharding in stock PyTorch's DataLoader) wraps the dataset around to make it evenly divisible by the number of shards.If shards are not even-sized, some ranks have less work to do, finishing their shard early, and start the next epoch while the rest of the ranks still process the previous epoch. If we're training for certain number of epochs, the "fast ranks" eventually finish first and terminate, leaving the rest of the ranks hanging because allreduce is now impossible.
In Petastorm, the even-sized shards are not guaranteed. The len of dataset is unknown and we don't have random-access to rows. Rowgroups are assigned to shards in a round-robin fashion without wraparound, so one rank can get more rowgroups than the other. Moreover, rowgroups might not have the same number of rows, and applying row predicates can change the balance further.
Possible solutions:
I thought about how to make it work.
Option 1 is to scan dataset once at runtime before training, figure out which rows match the predicate, wrap around if needed to make total size divisible, and then assign row indexes to shards in a way that each shard has the same number of rows. The unit of work for Petastorm then becomes
rowgroup index + indexes of rows within rowgroups
, so effectively a poor man's random-access-to-row implementation, which would be pretty slow. Alternatively can assign the whole rowgroups to shards, while keeping them even-sized, which a sort of bin-packing problem.Option 2 is to use Petastorm sharding, but set infinite epochs, so that "fast ranks" wrap around, and cut off the epoch in the train-loop outside of Petastorm. But then the smaller shards effectively get oversampled. Wouldn't help with empty shards.
Option 3 is to not shard at all, make each rank read the whole dataset, but shuffle in a unique way independent of the other ranks. The resulting mega-epoch will be as long as
num_shards
of normal epochs. If shuffling is good, each global batch will be effectively drawing a batch-sized random sample (without replacement)num_shards
times (with replacement).@selitvin is my assessment of the problem valid? Do you have some solutions in mind? Thank you!
The text was updated successfully, but these errors were encountered: