Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Petastorm sharding + Distributed PyTorch #508

Open
megaserg opened this issue Mar 19, 2020 · 14 comments
Open

Petastorm sharding + Distributed PyTorch #508

megaserg opened this issue Mar 19, 2020 · 14 comments

Comments

@megaserg
Copy link
Contributor

Problem:
I would like to train a PyTorch model on a Parquet dataset in a distributed (multi-GPU, multi-machine) setup, for a fixed number of epochs. For this, I need to shard the dataset and I hoped providing Petastorm's cur_shard and shard_count would be sufficient. I create Petastorm reader with num_epochs=1 each epoch (or could create once and reset()).

    for epoch in range(epochs):
        train_loader = petastorm.pytorch.DataLoader(petastorm.make_reader(
            ...
            num_epochs=1,
            cur_shard=ctx.global_rank,
            shard_count=ctx.world_size,
        ))

        for i, (inputs, targets) in enumerate(train_loader):
            inputs = inputs.to(ctx.device)
            targets = targets.to(ctx.device)
           
            predictions = model(inputs)
            loss = loss_fn(predictions, targets)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

But when training a DistributedDataParallel model, PyTorch expects shards to have the same number of examples, so that they have the same number of batches, so that all ranks make the same number of training steps, so that they participate in the same number of allreduces. E.g. torch.utils.data.distributed.DistributedSampler (used to implement sharding in stock PyTorch's DataLoader) wraps the dataset around to make it evenly divisible by the number of shards.

If shards are not even-sized, some ranks have less work to do, finishing their shard early, and start the next epoch while the rest of the ranks still process the previous epoch. If we're training for certain number of epochs, the "fast ranks" eventually finish first and terminate, leaving the rest of the ranks hanging because allreduce is now impossible.

In Petastorm, the even-sized shards are not guaranteed. The len of dataset is unknown and we don't have random-access to rows. Rowgroups are assigned to shards in a round-robin fashion without wraparound, so one rank can get more rowgroups than the other. Moreover, rowgroups might not have the same number of rows, and applying row predicates can change the balance further.

Possible solutions:
I thought about how to make it work.

  • Option 1 is to scan dataset once at runtime before training, figure out which rows match the predicate, wrap around if needed to make total size divisible, and then assign row indexes to shards in a way that each shard has the same number of rows. The unit of work for Petastorm then becomes rowgroup index + indexes of rows within rowgroups, so effectively a poor man's random-access-to-row implementation, which would be pretty slow. Alternatively can assign the whole rowgroups to shards, while keeping them even-sized, which a sort of bin-packing problem.

  • Option 2 is to use Petastorm sharding, but set infinite epochs, so that "fast ranks" wrap around, and cut off the epoch in the train-loop outside of Petastorm. But then the smaller shards effectively get oversampled. Wouldn't help with empty shards.

  • Option 3 is to not shard at all, make each rank read the whole dataset, but shuffle in a unique way independent of the other ranks. The resulting mega-epoch will be as long as num_shards of normal epochs. If shuffling is good, each global batch will be effectively drawing a batch-sized random sample (without replacement) num_shards times (with replacement).

@selitvin is my assessment of the problem valid? Do you have some solutions in mind? Thank you!

@selitvin
Copy link
Collaborator

I absolutely agree with the problem definition and your analysis of possible solutions. For the scenarios our user have encountered, they typically chose option 3 as it does not introduce unintended bias for some samples and the easiest to configure.

Perhaps one more control variable relevant to Option 2, is the size of your rowgroups. Reducing it improves the granularity of the task-unit and for smaller datasets may reducing unintended bias some portions of samples may get during training.

Implementing Option 1 will be tricky since that would introduce coordination between ranks that is not needed with the naive module-N sharding.

@megaserg
Copy link
Contributor Author

megaserg commented Mar 20, 2020

Thank you! Good to know I'm on the right track.

I control the size of the rowgroup, but reducing it also takes away the benefits of Parquet as a format - more reads required.

Option 4 I found in PyTorch discussions on sampling IterableDataset: make each of k ranks read the whole dataset of size n, but apply reservoir sampling to subsample n/k rows. Wastes some resources, and doesn't guarantee that a particular example is included in every epoch. Requires dataset size to be known, which can be computed during the first epoch using Option 3, or with a separate scan.

@selitvin
Copy link
Collaborator

IterableDataset is a very interesting direction. Having a more pytorch native way of parallel loading/processing could probably be used to substitute petastorm custom worker-pools and make pytorch users experience more pytorch look&feel as well as improve performance due to a better, shared-memory-based, IPC communication mechanism already inplace in pytorch.

@tottenjordan
Copy link

@megaserg I am trying to do something similar with TPU Pods (multi-machine), PyTorch XLA reading imagnet petastorm dataset from GCS buckets. Did you find that one of your options worked best? If so, how did you implement? Option 3? 4? something else?

My current solution reads JPEG from GCS. Came to petastorm so I can reduce training time, as my goal is to make training time for this configuration as performant as reading data from local VM (but for use cases where storing training data locally is not ideal). More on that here if interested

@selitvin appreciate any thought you may have too.

@megaserg
Copy link
Contributor Author

So we went with option 3 for a while. Works fine, but epoch counting is confusing.

Then we went with option 1, but we didn't use Petastorm. Given a Parquet dataset (i.e. bunch of Parquet files), we build an "index" of Parquet rows, basically a list of triples (filename, rowgroup index within file, row index within rowgroup). So it basically becomes a MapDataset, and we can use a stock PyTorch DataLoader, with its sharding and sampling and shuffling logic. Each shard reads its own subset of these triples, fetches them via Dataset's __getitem__() (which goes to GCS and reads that filename and gets that rowgroup by index and yields that row by index and throws away the rest of the rowgroup). So this works decently fast, especially if we use large dataloader's num_workers helper processes. But the problem is that we have to throw away data we don't use, so we have to keep rowgroup size to like 1 which kind of kills the performance. But at least we can read Parquet, apply predicates, only read needed columns, and enjoy at least some compression.

So we thought how we could have larger rowgroups (benefitting from the storage format), and not throw away data. Currently, we're using a custom IterableDataset which is aware that it's being used in multi-GPU and multi-dataloader-workers setting. We still use stock PyTorch DataLoader. We still build/read the index as above; but then we divide the whole sorted dataset into equal-sized consecutive pieces, one for each shard; and then we divide each of that pieces into roughly-equal-sized consecutive pieces, one for each dataloader worker. So each dataloader worker gets allocated its own subset of rows, which are likely all from the same rowgroup, or a few consecutive rowgroups within a file, or (rarely) crossing a file boundary. Each dataloader worker then applies its own, short-distance, shuffle to its assigned subset: it only rearranges rows within a rowgroup, or rowgroups within a file, or files. Therefore, when a worker reads a rowgroup, it will yield rows from it in a shuffled order, and only then will proceed to the next rowgroup. So there's still some chance of throwing some data out, because rowgroup size might not be aligned with shard size or num_workers, but that's a question of tuning and experimenting.

Hope this helps!

@weidezhang
Copy link

just wondering if i use option 3(each rank loads the whole petastorm dataset), might reading the whole data set have any performance bottlenecks ? is it better to set number of row groups equal to 1 in that case ?

@selitvin
Copy link
Collaborator

There should not be performance bottlenecks since the parquet dataset is read rowgroup-by-rowgroup.
What do you mean by "set number of row groups equal to 1 "? Is it about having a single row-group in the entire parquet store or having one row in each row-group? (both are not good ideas - the first one is likely to blow up RAM usage in case of a decent size dataset and the later would be slow due to a large number of roundtrips when reading data).

@weidezhang
Copy link

just to confirm how option 3 is configured:

in make_reader api, just simply set cur_shard=None, shard_count=None, shard_seed=None, num_epochs=real epoch number instead of None. is it fine ?

@selitvin
Copy link
Collaborator

Yes (note that cur_shard=None, shard_count=None are the defaults, so you don't need to specify this explicitly)

@zxgx
Copy link

zxgx commented Aug 22, 2022

I've tested solution 3, and discovered that, in order to equivalently load a global batch in each rank, I must also set worker_count=1 when invoking make_batch_reader, otherwise the batch in each rank would still be random.
Another potential solution for this may be setting reader_pool_type="dummy", however, when launching multiple processes, the whole procedure would be stuck.
Anyway, I suppose decreasing the number of workers might incur some performance loss. So Does it the right way to accomplish solution 3 or how could I mitigate this problem?

BTW, I notice that there is an in progress PR: #767 working on shuflle. Could you please tell me if you have any plan to fix it? @selitvin

@selitvin
Copy link
Collaborator

You are right. The order is not deterministic when multiple workers race over reading row groups and we do not have a reordering queue set up (not too hard to implement).
Curious, why would you want to get the same order of rows on multiple ranks? I would assume that you do want to shuffle as much as possible anyway?

#767 will land very soon (day(s) timespan)

@zxgx
Copy link

zxgx commented Aug 23, 2022

I'm trying to build a DLRM model containing both model parallel layers & data parallel layers. In the model parallel part, I must make sure each rank consumes the same global batch. After that, an all-to-all collective distributes the global output activations to data parallel layers in each rank.
I don't know whether this explanation is clear enough, but I do need the same order in the global batch in each rank.

In fact, this is a compromise choice. Potentially, I could also generate different mini-batch in each rank, and use an all-to-all collective to recover the global batch before inputting to model parallel layers. However, duo to the communication problems in our hardware, the former is better for my own purpose.

@selitvin
Copy link
Collaborator

I see. It should be not too hard to add a reordering buffer to the reader. I think pytorch DataLoader has similar functionality. I am not sure when I'll have time to do this. If you wanna give it a shot and propose a PR, I'll be happy to work with you to get it in.

@zxgx
Copy link

zxgx commented Aug 25, 2022

I'm too busy to handle dataloader recently:dizzy_face: that's why I resort to petastorm.
I may have a look into the source code in the future, but I would recommend fixing this load balancing issue for DDP first as this is a very common demand in large scale datasets and distributed training.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants